fix(tsdb): check the schema before merge rows in buffer, and do some refactor.
This commit is contained in:
parent
24524e1a8d
commit
e8f7146a2c
|
@ -1504,6 +1504,12 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
if (ps == NULL) {
|
if (ps == NULL) {
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t code = tsdbRowMergerInit(pMerger, ps);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
tsdbError("failed to init row merger, code:%s", tstrerror(code));
|
||||||
|
return code;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t minKey = 0;
|
int64_t minKey = 0;
|
||||||
|
@ -1535,15 +1541,11 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo remove init
|
|
||||||
bool init = false;
|
|
||||||
|
|
||||||
// ASC: file block ---> last block -----> imem -----> mem
|
// ASC: file block ---> last block -----> imem -----> mem
|
||||||
// DESC: mem -----> imem -----> last block -----> file block
|
// DESC: mem -----> imem -----> last block -----> file block
|
||||||
if (pReader->info.order == TSDB_ORDER_ASC) {
|
if (pReader->info.order == TSDB_ORDER_ASC) {
|
||||||
if (minKey == key) {
|
if (minKey == key) {
|
||||||
init = true;
|
int32_t code = tsdbRowMergerAdd(pMerger, &fRow, NULL);
|
||||||
int32_t code = tsdbRowMergerAdd(pMerger, &fRow, pReader->info.pSchema);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -1552,47 +1554,44 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
|
|
||||||
if (minKey == tsLast) {
|
if (minKey == tsLast) {
|
||||||
TSDBROW* fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
|
TSDBROW* fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
|
||||||
if (init) {
|
int32_t code = tsdbRowMergerAdd(pMerger, fRow1, NULL);
|
||||||
tsdbRowMergerAdd(pMerger, fRow1, NULL);
|
|
||||||
} else {
|
|
||||||
init = true;
|
|
||||||
int32_t code = tsdbRowMergerAdd(pMerger, fRow1, pReader->info.pSchema);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange,
|
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange,
|
||||||
pReader->idStr);
|
pReader->idStr);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (minKey == k.ts) {
|
if (minKey == k.ts) {
|
||||||
STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
|
STSchema* pTSchema = NULL;
|
||||||
if (pSchema == NULL) {
|
if (pRow->type == TSDBROW_ROW_FMT) {
|
||||||
|
pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
|
||||||
|
if (pTSchema == NULL) {
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
if (init) {
|
}
|
||||||
tsdbRowMergerAdd(pMerger, pRow, pSchema);
|
|
||||||
} else {
|
int32_t code = tsdbRowMergerAdd(pMerger, pRow, pTSchema);
|
||||||
init = true;
|
|
||||||
int32_t code = tsdbRowMergerAdd(pMerger, pRow, pSchema);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
int32_t code = doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, pReader);
|
code = doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, pReader);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (minKey == k.ts) {
|
if (minKey == k.ts) {
|
||||||
init = true;
|
STSchema* pTSchema = NULL;
|
||||||
STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
|
if (pRow->type == TSDBROW_ROW_FMT) {
|
||||||
if (pSchema == NULL) {
|
pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
|
||||||
|
if (pTSchema == NULL) {
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
int32_t code = tsdbRowMergerAdd(pMerger, pRow, pSchema);
|
int32_t code = tsdbRowMergerAdd(pMerger, pRow, pTSchema);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -1605,29 +1604,19 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
|
|
||||||
if (minKey == tsLast) {
|
if (minKey == tsLast) {
|
||||||
TSDBROW* fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
|
TSDBROW* fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
|
||||||
if (init) {
|
int32_t code = tsdbRowMergerAdd(pMerger, fRow1, NULL);
|
||||||
tsdbRowMergerAdd(pMerger, fRow1, NULL);
|
|
||||||
} else {
|
|
||||||
init = true;
|
|
||||||
int32_t code = tsdbRowMergerAdd(pMerger, fRow1, pReader->info.pSchema);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange,
|
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange,
|
||||||
pReader->idStr);
|
pReader->idStr);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (minKey == key) {
|
if (minKey == key) {
|
||||||
if (init) {
|
int32_t code = tsdbRowMergerAdd(pMerger, &fRow, NULL);
|
||||||
tsdbRowMergerAdd(pMerger, &fRow, NULL);
|
|
||||||
} else {
|
|
||||||
init = true;
|
|
||||||
int32_t code = tsdbRowMergerAdd(pMerger, &fRow, pReader->info.pSchema);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader);
|
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1674,7 +1663,7 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
|
||||||
pBlockScanInfo->lastKey = tsLastBlock;
|
pBlockScanInfo->lastKey = tsLastBlock;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
} else {
|
} else {
|
||||||
code = tsdbRowMergerAdd(pMerger, &fRow, pReader->info.pSchema);
|
code = tsdbRowMergerAdd(pMerger, &fRow, NULL);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -1699,7 +1688,7 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else { // not merge block data
|
} else { // not merge block data
|
||||||
code = tsdbRowMergerAdd(pMerger, &fRow, pReader->info.pSchema);
|
code = tsdbRowMergerAdd(pMerger, &fRow, NULL);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -1742,6 +1731,12 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader
|
||||||
if (ps == NULL) {
|
if (ps == NULL) {
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t code = tsdbRowMergerInit(pMerger, ps);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
tsdbError("failed to init row merger, code:%s", tstrerror(code));
|
||||||
|
return code;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (hasDataInFileBlock(pBlockData, pDumpInfo)) {
|
if (hasDataInFileBlock(pBlockData, pDumpInfo)) {
|
||||||
|
@ -1768,7 +1763,7 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader
|
||||||
}
|
}
|
||||||
// the following for key == tsLast
|
// the following for key == tsLast
|
||||||
SRow* pTSRow = NULL;
|
SRow* pTSRow = NULL;
|
||||||
int32_t code = tsdbRowMergerAdd(pMerger, &fRow, pReader->info.pSchema);
|
int32_t code = tsdbRowMergerAdd(pMerger, &fRow, NULL);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -1776,7 +1771,10 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader
|
||||||
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader);
|
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader);
|
||||||
|
|
||||||
TSDBROW* pRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
|
TSDBROW* pRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
|
||||||
tsdbRowMergerAdd(pMerger, pRow1, NULL);
|
code = tsdbRowMergerAdd(pMerger, pRow1, NULL);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, pReader->idStr);
|
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, pReader->idStr);
|
||||||
|
|
||||||
|
@ -1816,15 +1814,22 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
|
|
||||||
TSDBKEY k = TSDBROW_KEY(pRow);
|
TSDBKEY k = TSDBROW_KEY(pRow);
|
||||||
TSDBKEY ik = TSDBROW_KEY(piRow);
|
TSDBKEY ik = TSDBROW_KEY(piRow);
|
||||||
STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
|
|
||||||
|
STSchema* pSchema = NULL;
|
||||||
|
if (pRow->type == TSDBROW_ROW_FMT) {
|
||||||
|
pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
|
||||||
if (pSchema == NULL) {
|
if (pSchema == NULL) {
|
||||||
return code;
|
return terrno;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
STSchema* piSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid);
|
STSchema* piSchema = NULL;
|
||||||
|
if (piRow->type == TSDBROW_ROW_FMT) {
|
||||||
|
piSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid);
|
||||||
if (piSchema == NULL) {
|
if (piSchema == NULL) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// merge is not initialized yet, due to the fact that the pReader->info.pSchema is not initialized
|
// merge is not initialized yet, due to the fact that the pReader->info.pSchema is not initialized
|
||||||
if (pMerger->pArray == NULL) {
|
if (pMerger->pArray == NULL) {
|
||||||
|
@ -1833,6 +1838,12 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
if (ps == NULL) {
|
if (ps == NULL) {
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
code = tsdbRowMergerInit(pMerger, ps);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
tsdbError("failed to init row merger, code:%s", tstrerror(code));
|
||||||
|
return code;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t minKey = 0;
|
int64_t minKey = 0;
|
||||||
|
@ -1872,15 +1883,12 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
bool init = false;
|
|
||||||
|
|
||||||
// ASC: file block -----> last block -----> imem -----> mem
|
// ASC: file block -----> last block -----> imem -----> mem
|
||||||
// DESC: mem -----> imem -----> last block -----> file block
|
// DESC: mem -----> imem -----> last block -----> file block
|
||||||
if (ASCENDING_TRAVERSE(pReader->info.order)) {
|
if (ASCENDING_TRAVERSE(pReader->info.order)) {
|
||||||
if (minKey == key) {
|
if (minKey == key) {
|
||||||
init = true;
|
|
||||||
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
|
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
|
||||||
code = tsdbRowMergerAdd(pMerger, &fRow, pReader->info.pSchema);
|
code = tsdbRowMergerAdd(pMerger, &fRow, NULL);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -1890,30 +1898,20 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
|
|
||||||
if (minKey == tsLast) {
|
if (minKey == tsLast) {
|
||||||
TSDBROW* pRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
|
TSDBROW* pRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
|
||||||
if (init) {
|
code = tsdbRowMergerAdd(pMerger, pRow1, NULL);
|
||||||
tsdbRowMergerAdd(pMerger, pRow1, NULL);
|
|
||||||
} else {
|
|
||||||
init = true;
|
|
||||||
code = tsdbRowMergerAdd(pMerger, pRow1, pReader->info.pSchema);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange,
|
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange,
|
||||||
pReader->idStr);
|
pReader->idStr);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (minKey == ik.ts) {
|
if (minKey == ik.ts) {
|
||||||
if (init) {
|
|
||||||
tsdbRowMergerAdd(pMerger, piRow, piSchema);
|
|
||||||
} else {
|
|
||||||
init = true;
|
|
||||||
code = tsdbRowMergerAdd(pMerger, piRow, piSchema);
|
code = tsdbRowMergerAdd(pMerger, piRow, piSchema);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, pReader);
|
code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, pReader);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -1922,15 +1920,11 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
}
|
}
|
||||||
|
|
||||||
if (minKey == k.ts) {
|
if (minKey == k.ts) {
|
||||||
if (init) {
|
|
||||||
tsdbRowMergerAdd(pMerger, pRow, pSchema);
|
|
||||||
} else {
|
|
||||||
// STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
|
|
||||||
code = tsdbRowMergerAdd(pMerger, pRow, pSchema);
|
code = tsdbRowMergerAdd(pMerger, pRow, pSchema);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, pReader);
|
code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, pReader);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
|
@ -1938,7 +1932,6 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (minKey == k.ts) {
|
if (minKey == k.ts) {
|
||||||
init = true;
|
|
||||||
code = tsdbRowMergerAdd(pMerger, pRow, pSchema);
|
code = tsdbRowMergerAdd(pMerger, pRow, pSchema);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
|
@ -1951,15 +1944,11 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
}
|
}
|
||||||
|
|
||||||
if (minKey == ik.ts) {
|
if (minKey == ik.ts) {
|
||||||
if (init) {
|
|
||||||
tsdbRowMergerAdd(pMerger, piRow, piSchema);
|
|
||||||
} else {
|
|
||||||
init = true;
|
|
||||||
code = tsdbRowMergerAdd(pMerger, piRow, piSchema);
|
code = tsdbRowMergerAdd(pMerger, piRow, piSchema);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, pReader);
|
code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, pReader);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
|
@ -1968,29 +1957,22 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
|
|
||||||
if (minKey == tsLast) {
|
if (minKey == tsLast) {
|
||||||
TSDBROW* pRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
|
TSDBROW* pRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
|
||||||
if (init) {
|
code = tsdbRowMergerAdd(pMerger, pRow1, NULL);
|
||||||
tsdbRowMergerAdd(pMerger, pRow1, NULL);
|
|
||||||
} else {
|
|
||||||
init = true;
|
|
||||||
code = tsdbRowMergerAdd(pMerger, pRow1, pReader->info.pSchema);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange,
|
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange,
|
||||||
pReader->idStr);
|
pReader->idStr);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (minKey == key) {
|
if (minKey == key) {
|
||||||
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
|
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
|
||||||
if (!init) {
|
code = tsdbRowMergerAdd(pMerger, &fRow, NULL);
|
||||||
code = tsdbRowMergerAdd(pMerger, &fRow, pReader->info.pSchema);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
tsdbRowMergerAdd(pMerger, &fRow, NULL);
|
|
||||||
}
|
|
||||||
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader);
|
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2184,6 +2166,12 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc
|
||||||
if (ps == NULL) {
|
if (ps == NULL) {
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
code = tsdbRowMergerInit(pMerger, ps);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
tsdbError("failed to init row merger, code:%s", tstrerror(code));
|
||||||
|
return code;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (copied) {
|
if (copied) {
|
||||||
|
@ -2193,7 +2181,7 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc
|
||||||
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
|
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
|
||||||
|
|
||||||
SRow* pTSRow = NULL;
|
SRow* pTSRow = NULL;
|
||||||
code = tsdbRowMergerAdd(pMerger, &fRow, pReader->info.pSchema);
|
code = tsdbRowMergerAdd(pMerger, &fRow, NULL);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -3328,16 +3316,15 @@ int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDe
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
STSchema* pTSchema = NULL;
|
||||||
if (pRow->type == TSDBROW_ROW_FMT) {
|
if (pRow->type == TSDBROW_ROW_FMT) {
|
||||||
STSchema* pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, uid);
|
pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, uid);
|
||||||
if (pTSchema == NULL) {
|
if (pTSchema == NULL) {
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
tsdbRowMergerAdd(pMerger, pRow, pTSchema);
|
tsdbRowMergerAdd(pMerger, pRow, pTSchema);
|
||||||
} else { // column format
|
|
||||||
tsdbRowMergerAdd(pMerger, pRow, NULL);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -3473,31 +3460,30 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter,
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
// start to merge duplicated rows
|
// start to merge duplicated rows
|
||||||
if (current.type == TSDBROW_ROW_FMT) {
|
STSchema* pTSchema = NULL;
|
||||||
// get the correct schema for data in memory
|
if (current.type == TSDBROW_ROW_FMT) { // get the correct schema for row-wise data in memory
|
||||||
STSchema* pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(¤t), pReader, uid);
|
pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(¤t), pReader, uid);
|
||||||
if (pTSchema == NULL) {
|
if (pTSchema == NULL) {
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
code = tsdbRowMergerAdd(&pReader->status.merger, ¤t, pTSchema);
|
code = tsdbRowMergerAdd(&pReader->status.merger, ¤t, pTSchema);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
STSchema* pTSchema1 = doGetSchemaForTSRow(TSDBROW_SVERSION(pNextRow), pReader, uid);
|
STSchema* pTSchema1 = NULL;
|
||||||
|
if (pNextRow->type == TSDBROW_ROW_FMT) { // get the correct schema for row-wise data in memory
|
||||||
|
pTSchema1 = doGetSchemaForTSRow(TSDBROW_SVERSION(pNextRow), pReader, uid);
|
||||||
if (pTSchema1 == NULL) {
|
if (pTSchema1 == NULL) {
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
tsdbRowMergerAdd(&pReader->status.merger, pNextRow, pTSchema1);
|
|
||||||
} else { // let's merge rows in file block
|
|
||||||
code = tsdbRowMergerAdd(&pReader->status.merger, ¤t, pReader->info.pSchema);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
return code;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
tsdbRowMergerAdd(&pReader->status.merger, pNextRow, NULL);
|
code = tsdbRowMergerAdd(&pReader->status.merger, pNextRow, pTSchema1);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
code = doMergeRowsInBuf(pIter, uid, TSDBROW_TS(¤t), pDelList, pReader);
|
code = doMergeRowsInBuf(pIter, uid, TSDBROW_TS(¤t), pDelList, pReader);
|
||||||
|
@ -3523,15 +3509,22 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p
|
||||||
|
|
||||||
TSDBKEY k = TSDBROW_KEY(pRow);
|
TSDBKEY k = TSDBROW_KEY(pRow);
|
||||||
TSDBKEY ik = TSDBROW_KEY(piRow);
|
TSDBKEY ik = TSDBROW_KEY(piRow);
|
||||||
STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
|
|
||||||
|
STSchema* pSchema = NULL;
|
||||||
|
if (pRow->type == TSDBROW_ROW_FMT) {
|
||||||
|
pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
|
||||||
if (pSchema == NULL) {
|
if (pSchema == NULL) {
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
STSchema* piSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid);
|
STSchema* piSchema = NULL;
|
||||||
|
if (piRow->type == TSDBROW_ROW_FMT) {
|
||||||
|
piSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid);
|
||||||
if (piSchema == NULL) {
|
if (piSchema == NULL) {
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (ASCENDING_TRAVERSE(pReader->info.order)) { // ascending order imem --> mem
|
if (ASCENDING_TRAVERSE(pReader->info.order)) { // ascending order imem --> mem
|
||||||
int32_t code = tsdbRowMergerAdd(&pReader->status.merger, piRow, piSchema);
|
int32_t code = tsdbRowMergerAdd(&pReader->status.merger, piRow, piSchema);
|
||||||
|
@ -4898,10 +4891,10 @@ int32_t tsdbTakeReadSnap2(STsdbReader* pReader, _query_reseek_func_t reseek, STs
|
||||||
pSnap->pMem = pTsdb->mem;
|
pSnap->pMem = pTsdb->mem;
|
||||||
pSnap->pNode = taosMemoryMalloc(sizeof(*pSnap->pNode));
|
pSnap->pNode = taosMemoryMalloc(sizeof(*pSnap->pNode));
|
||||||
if (pSnap->pNode == NULL) {
|
if (pSnap->pNode == NULL) {
|
||||||
taosThreadRwlockUnlock(&pTsdb->rwLock);
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto _exit;
|
goto _exit;
|
||||||
}
|
}
|
||||||
|
|
||||||
pSnap->pNode->pQHandle = pReader;
|
pSnap->pNode->pQHandle = pReader;
|
||||||
pSnap->pNode->reseek = reseek;
|
pSnap->pNode->reseek = reseek;
|
||||||
|
|
||||||
|
@ -4912,10 +4905,10 @@ int32_t tsdbTakeReadSnap2(STsdbReader* pReader, _query_reseek_func_t reseek, STs
|
||||||
pSnap->pIMem = pTsdb->imem;
|
pSnap->pIMem = pTsdb->imem;
|
||||||
pSnap->pINode = taosMemoryMalloc(sizeof(*pSnap->pINode));
|
pSnap->pINode = taosMemoryMalloc(sizeof(*pSnap->pINode));
|
||||||
if (pSnap->pINode == NULL) {
|
if (pSnap->pINode == NULL) {
|
||||||
taosThreadRwlockUnlock(&pTsdb->rwLock);
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto _exit;
|
goto _exit;
|
||||||
}
|
}
|
||||||
|
|
||||||
pSnap->pINode->pQHandle = pReader;
|
pSnap->pINode->pQHandle = pReader;
|
||||||
pSnap->pINode->reseek = reseek;
|
pSnap->pINode->reseek = reseek;
|
||||||
|
|
||||||
|
@ -4924,18 +4917,16 @@ int32_t tsdbTakeReadSnap2(STsdbReader* pReader, _query_reseek_func_t reseek, STs
|
||||||
|
|
||||||
// fs
|
// fs
|
||||||
code = tsdbFSCreateRefSnapshot(pTsdb->pFS, &pSnap->pfSetArray);
|
code = tsdbFSCreateRefSnapshot(pTsdb->pFS, &pSnap->pfSetArray);
|
||||||
if (code) {
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
taosThreadRwlockUnlock(&pTsdb->rwLock);
|
tsdbTrace("vgId:%d, take read snapshot, %s", TD_VID(pTsdb->pVnode), pReader->idStr);
|
||||||
goto _exit;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// unlock
|
_exit:
|
||||||
taosThreadRwlockUnlock(&pTsdb->rwLock);
|
taosThreadRwlockUnlock(&pTsdb->rwLock);
|
||||||
|
|
||||||
tsdbTrace("vgId:%d, take read snapshot", TD_VID(pTsdb->pVnode));
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
tsdbError("vgId:%d take read snapshot failed, %s code:%s", TD_VID(pTsdb->pVnode), pReader->idStr, tstrerror(code));
|
||||||
|
|
||||||
_exit:
|
|
||||||
if (code) {
|
|
||||||
*ppSnap = NULL;
|
*ppSnap = NULL;
|
||||||
if (pSnap) {
|
if (pSnap) {
|
||||||
if (pSnap->pNode) taosMemoryFree(pSnap->pNode);
|
if (pSnap->pNode) taosMemoryFree(pSnap->pNode);
|
||||||
|
|
Loading…
Reference in New Issue