more code
This commit is contained in:
parent
b07ad168c3
commit
7acc124dfc
|
@ -16,6 +16,7 @@
|
||||||
#include "tsdbDataFReaderWriter.h"
|
#include "tsdbDataFReaderWriter.h"
|
||||||
#include "tsdbFS.h"
|
#include "tsdbFS.h"
|
||||||
#include "tsdbSttFReaderWriter.h"
|
#include "tsdbSttFReaderWriter.h"
|
||||||
|
#include "tsdbUtil.h"
|
||||||
|
|
||||||
#ifndef _TD_TSDB_MERGE_H_
|
#ifndef _TD_TSDB_MERGE_H_
|
||||||
#define _TD_TSDB_MERGE_H_
|
#define _TD_TSDB_MERGE_H_
|
||||||
|
|
|
@ -30,6 +30,7 @@ typedef struct {
|
||||||
SMergeCtx ctx;
|
SMergeCtx ctx;
|
||||||
// config
|
// config
|
||||||
int32_t maxRow;
|
int32_t maxRow;
|
||||||
|
int32_t minRow;
|
||||||
int32_t szPage;
|
int32_t szPage;
|
||||||
int8_t cmprAlg;
|
int8_t cmprAlg;
|
||||||
int64_t cid;
|
int64_t cid;
|
||||||
|
@ -82,6 +83,28 @@ static int32_t tsdbMergeNextRow(SMerger *merger) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t tsdbMergeToDataWriteTSDataBlock(SMerger *merger) {
|
||||||
|
if (merger->ctx.bData.nRow == 0) return 0;
|
||||||
|
|
||||||
|
int32_t code = 0;
|
||||||
|
int32_t lino = 0;
|
||||||
|
int32_t vid = TD_VID(merger->tsdb->pVnode);
|
||||||
|
if (merger->ctx.bData.nRow >= merger->minRow) {
|
||||||
|
// code = tsdbDataFWriteTSDataBlock(merger->dataWriter, &merger->ctx.bData);
|
||||||
|
// TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
} else {
|
||||||
|
code = tsdbSttFWriteTSDataBlock(merger->sttWriter, &merger->ctx.bData);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
}
|
||||||
|
|
||||||
|
tBlockDataReset(&merger->ctx.bData);
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
if (code) {
|
||||||
|
tsdbError("vgId:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code));
|
||||||
|
}
|
||||||
|
return code;
|
||||||
|
}
|
||||||
static int32_t tsdbMergeToData(SMerger *merger) {
|
static int32_t tsdbMergeToData(SMerger *merger) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
|
@ -91,16 +114,30 @@ static int32_t tsdbMergeToData(SMerger *merger) {
|
||||||
code = tsdbMergeNextRow(merger);
|
code = tsdbMergeNextRow(merger);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
if (!merger->ctx.pRowInfo) break;
|
if (!merger->ctx.pRowInfo) {
|
||||||
|
code = tsdbMergeToDataWriteTSDataBlock(merger);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!TABLE_SAME_SCHEMA(merger->ctx.bData.suid, merger->ctx.bData.suid, merger->ctx.pRowInfo->suid,
|
||||||
|
merger->ctx.pRowInfo->uid)) {
|
||||||
|
code = tsdbMergeToDataWriteTSDataBlock(merger);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
|
code = tsdbUpdateSkmTb(merger->tsdb, (TABLEID *)merger->ctx.pRowInfo, &merger->skmTb);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
|
code = tBlockDataInit(&merger->ctx.bData, (TABLEID *)merger->ctx.pRowInfo, merger->skmTb.pTSchema, NULL, 0);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
}
|
||||||
|
|
||||||
code = tBlockDataAppendRow(&merger->ctx.bData, &merger->ctx.pRowInfo->row, NULL, merger->ctx.pRowInfo->uid);
|
code = tBlockDataAppendRow(&merger->ctx.bData, &merger->ctx.pRowInfo->row, NULL, merger->ctx.pRowInfo->uid);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
if (merger->ctx.bData.nRow >= merger->maxRow) {
|
if (merger->ctx.bData.nRow >= merger->maxRow) {
|
||||||
// code = tsdbDataFWriteTSDataBlock(merger->dataWriter, &merger->ctx.bData);
|
code = tsdbMergeToDataWriteTSDataBlock(merger);
|
||||||
// TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
tBlockDataReset(&merger->ctx.bData);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -673,7 +673,24 @@ _exit:
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbSttFWriteTSDataBlock(SSttFileWriter *pWriter, SBlockData *pBlockData) {
|
int32_t tsdbSttFWriteTSDataBlock(SSttFileWriter *pWriter, SBlockData *pBlockData) {
|
||||||
// TODO
|
int32_t code = 0;
|
||||||
|
int32_t lino = 0;
|
||||||
|
|
||||||
|
SRowInfo rowInfo;
|
||||||
|
rowInfo.suid = pBlockData->suid;
|
||||||
|
for (int32_t i = 0; i < pBlockData->nRow; i++) {
|
||||||
|
rowInfo.uid = pBlockData->uid ? pBlockData->uid : pBlockData->aUid[i];
|
||||||
|
rowInfo.row = tsdbRowFromBlockData(pBlockData, i);
|
||||||
|
|
||||||
|
code = tsdbSttFWriteTSData(pWriter, &rowInfo);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
}
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
if (code) {
|
||||||
|
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->config.pTsdb->pVnode), __func__, lino,
|
||||||
|
tstrerror(code));
|
||||||
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue