Merge pull request #20798 from taosdata/fix/TS-3025
fix(tsdb/row merger): rewrite row merger and fix tsdb read crash with schemaless
This commit is contained in:
commit
8d7eb07e7f
|
@ -123,12 +123,12 @@ int32_t tsdbRowIterOpen(STSDBRowIter *pIter, TSDBROW *pRow, STSchema *pTSchema)
|
||||||
void tsdbRowClose(STSDBRowIter *pIter);
|
void tsdbRowClose(STSDBRowIter *pIter);
|
||||||
SColVal *tsdbRowIterNext(STSDBRowIter *pIter);
|
SColVal *tsdbRowIterNext(STSDBRowIter *pIter);
|
||||||
// SRowMerger
|
// SRowMerger
|
||||||
int32_t tsdbRowMergerInit2(SRowMerger *pMerger, STSchema *pResTSchema, TSDBROW *pRow, STSchema *pTSchema);
|
int32_t tsdbRowMergerInit(SRowMerger *pMerger, STSchema *pResTSchema, TSDBROW *pRow, STSchema *pTSchema);
|
||||||
int32_t tsdbRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema);
|
int32_t tsdbRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema);
|
||||||
|
|
||||||
int32_t tsdbRowMergerInit(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema);
|
// int32_t tsdbRowMergerInit(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema);
|
||||||
void tsdbRowMergerClear(SRowMerger *pMerger);
|
void tsdbRowMergerClear(SRowMerger *pMerger);
|
||||||
int32_t tsdbRowMerge(SRowMerger *pMerger, TSDBROW *pRow);
|
// int32_t tsdbRowMerge(SRowMerger *pMerger, TSDBROW *pRow);
|
||||||
int32_t tsdbRowMergerGetRow(SRowMerger *pMerger, SRow **ppRow);
|
int32_t tsdbRowMergerGetRow(SRowMerger *pMerger, SRow **ppRow);
|
||||||
// TABLEID
|
// TABLEID
|
||||||
int32_t tTABLEIDCmprFn(const void *p1, const void *p2);
|
int32_t tTABLEIDCmprFn(const void *p1, const void *p2);
|
||||||
|
|
|
@ -966,7 +966,8 @@ static int32_t doCopyColVal(SColumnInfoData* pColInfoData, int32_t rowIndex, int
|
||||||
} else {
|
} else {
|
||||||
varDataSetLen(pSup->buildBuf[colIndex], pColVal->value.nData);
|
varDataSetLen(pSup->buildBuf[colIndex], pColVal->value.nData);
|
||||||
if (pColVal->value.nData > pColInfoData->info.bytes) {
|
if (pColVal->value.nData > pColInfoData->info.bytes) {
|
||||||
tsdbWarn("column cid:%d actual data len %d is bigger than schema len %d", pColVal->cid, pColVal->value.nData, pColInfoData->info.bytes);
|
tsdbWarn("column cid:%d actual data len %d is bigger than schema len %d", pColVal->cid, pColVal->value.nData,
|
||||||
|
pColInfoData->info.bytes);
|
||||||
return TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER;
|
return TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER;
|
||||||
}
|
}
|
||||||
if (pColVal->value.nData > 0) { // pData may be null, if nData is 0
|
if (pColVal->value.nData > 0) { // pData may be null, if nData is 0
|
||||||
|
@ -1843,7 +1844,8 @@ static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBloc
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SLastBlockReader* pLastBlockReader,
|
static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SLastBlockReader* pLastBlockReader,
|
||||||
STableBlockScanInfo* pScanInfo, int64_t ts, STsdbReader* pReader, bool *copied) {
|
STableBlockScanInfo* pScanInfo, int64_t ts, STsdbReader* pReader,
|
||||||
|
bool* copied) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
*copied = false;
|
*copied = false;
|
||||||
|
@ -1967,7 +1969,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
if (pReader->order == TSDB_ORDER_ASC) {
|
if (pReader->order == TSDB_ORDER_ASC) {
|
||||||
if (minKey == key) {
|
if (minKey == key) {
|
||||||
init = true;
|
init = true;
|
||||||
int32_t code = tsdbRowMergerInit(&merge, &fRow, pReader->pSchema);
|
int32_t code = tsdbRowMergerInit(&merge, NULL, &fRow, pReader->pSchema);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -1977,10 +1979,10 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
if (minKey == tsLast) {
|
if (minKey == tsLast) {
|
||||||
TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
|
TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
|
||||||
if (init) {
|
if (init) {
|
||||||
tsdbRowMerge(&merge, &fRow1);
|
tsdbRowMergerAdd(&merge, &fRow1, NULL);
|
||||||
} else {
|
} else {
|
||||||
init = true;
|
init = true;
|
||||||
int32_t code = tsdbRowMergerInit(&merge, &fRow1, pReader->pSchema);
|
int32_t code = tsdbRowMergerInit(&merge, NULL, &fRow1, pReader->pSchema);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -1997,7 +1999,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
tsdbRowMergerAdd(&merge, pRow, pSchema);
|
tsdbRowMergerAdd(&merge, pRow, pSchema);
|
||||||
} else {
|
} else {
|
||||||
init = true;
|
init = true;
|
||||||
int32_t code = tsdbRowMergerInit(&merge, pRow, pSchema);
|
int32_t code = tsdbRowMergerInit(&merge, NULL, pRow, pSchema);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -2011,7 +2013,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
if (minKey == k.ts) {
|
if (minKey == k.ts) {
|
||||||
init = true;
|
init = true;
|
||||||
STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
|
STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
|
||||||
int32_t code = tsdbRowMergerInit(&merge, pRow, pSchema);
|
int32_t code = tsdbRowMergerInit(&merge, NULL, pRow, pSchema);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -2025,10 +2027,10 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
if (minKey == tsLast) {
|
if (minKey == tsLast) {
|
||||||
TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
|
TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
|
||||||
if (init) {
|
if (init) {
|
||||||
tsdbRowMerge(&merge, &fRow1);
|
tsdbRowMergerAdd(&merge, &fRow1, NULL);
|
||||||
} else {
|
} else {
|
||||||
init = true;
|
init = true;
|
||||||
int32_t code = tsdbRowMergerInit(&merge, &fRow1, pReader->pSchema);
|
int32_t code = tsdbRowMergerInit(&merge, NULL, &fRow1, pReader->pSchema);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -2038,10 +2040,10 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
|
|
||||||
if (minKey == key) {
|
if (minKey == key) {
|
||||||
if (init) {
|
if (init) {
|
||||||
tsdbRowMerge(&merge, &fRow);
|
tsdbRowMergerAdd(&merge, &fRow, NULL);
|
||||||
} else {
|
} else {
|
||||||
init = true;
|
init = true;
|
||||||
int32_t code = tsdbRowMergerInit(&merge, &fRow, pReader->pSchema);
|
int32_t code = tsdbRowMergerInit(&merge, NULL, &fRow, pReader->pSchema);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -2086,13 +2088,13 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
|
||||||
pBlockScanInfo->lastKey = tsLastBlock;
|
pBlockScanInfo->lastKey = tsLastBlock;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
} else {
|
} else {
|
||||||
int32_t code = tsdbRowMergerInit(&merge, &fRow, pReader->pSchema);
|
int32_t code = tsdbRowMergerInit(&merge, NULL, &fRow, pReader->pSchema);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
|
TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
|
||||||
tsdbRowMerge(&merge, &fRow1);
|
tsdbRowMergerAdd(&merge, &fRow1, NULL);
|
||||||
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge, &pReader->verRange, pReader->idStr);
|
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge, &pReader->verRange, pReader->idStr);
|
||||||
|
|
||||||
code = tsdbRowMergerGetRow(&merge, &pTSRow);
|
code = tsdbRowMergerGetRow(&merge, &pTSRow);
|
||||||
|
@ -2108,10 +2110,9 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
} else { // not merge block data
|
} else { // not merge block data
|
||||||
int32_t code = tsdbRowMergerInit(&merge, &fRow, pReader->pSchema);
|
int32_t code = tsdbRowMergerInit(&merge, NULL, &fRow, pReader->pSchema);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -2163,7 +2164,7 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader
|
||||||
SRow* pTSRow = NULL;
|
SRow* pTSRow = NULL;
|
||||||
SRowMerger merge = {0};
|
SRowMerger merge = {0};
|
||||||
|
|
||||||
int32_t code = tsdbRowMergerInit(&merge, &fRow, pReader->pSchema);
|
int32_t code = tsdbRowMergerInit(&merge, NULL, &fRow, pReader->pSchema);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -2171,7 +2172,7 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader
|
||||||
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
|
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
|
||||||
|
|
||||||
TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
|
TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
|
||||||
tsdbRowMerge(&merge, &fRow1);
|
tsdbRowMergerAdd(&merge, &fRow1, NULL);
|
||||||
|
|
||||||
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, &merge, &pReader->verRange, pReader->idStr);
|
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, &merge, &pReader->verRange, pReader->idStr);
|
||||||
|
|
||||||
|
@ -2262,7 +2263,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
if (minKey == key) {
|
if (minKey == key) {
|
||||||
init = true;
|
init = true;
|
||||||
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
|
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
|
||||||
code = tsdbRowMergerInit(&merge, &fRow, pReader->pSchema);
|
code = tsdbRowMergerInit(&merge, NULL, &fRow, pReader->pSchema);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -2273,10 +2274,10 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
if (minKey == tsLast) {
|
if (minKey == tsLast) {
|
||||||
TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
|
TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
|
||||||
if (init) {
|
if (init) {
|
||||||
tsdbRowMerge(&merge, &fRow1);
|
tsdbRowMergerAdd(&merge, &fRow1, NULL);
|
||||||
} else {
|
} else {
|
||||||
init = true;
|
init = true;
|
||||||
code = tsdbRowMergerInit(&merge, &fRow1, pReader->pSchema);
|
code = tsdbRowMergerInit(&merge, NULL, &fRow1, pReader->pSchema);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -2287,7 +2288,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
|
|
||||||
if (minKey == ik.ts) {
|
if (minKey == ik.ts) {
|
||||||
if (init) {
|
if (init) {
|
||||||
tsdbRowMerge(&merge, piRow);
|
tsdbRowMergerAdd(&merge, piRow, NULL);
|
||||||
} else {
|
} else {
|
||||||
init = true;
|
init = true;
|
||||||
STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid);
|
STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid);
|
||||||
|
@ -2295,7 +2296,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
code = tsdbRowMergerInit(&merge, piRow, pSchema);
|
code = tsdbRowMergerInit(&merge, NULL, piRow, pSchema);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -2314,10 +2315,10 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
tsdbRowMerge(&merge, pRow);
|
tsdbRowMergerAdd(&merge, pRow, NULL);
|
||||||
} else {
|
} else {
|
||||||
STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
|
STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
|
||||||
code = tsdbRowMergerInit(&merge, pRow, pSchema);
|
code = tsdbRowMergerInit(&merge, NULL, pRow, pSchema);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -2332,7 +2333,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
if (minKey == k.ts) {
|
if (minKey == k.ts) {
|
||||||
init = true;
|
init = true;
|
||||||
STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
|
STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
|
||||||
code = tsdbRowMergerInit(&merge, pRow, pSchema);
|
code = tsdbRowMergerInit(&merge, NULL, pRow, pSchema);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -2346,11 +2347,11 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
|
|
||||||
if (minKey == ik.ts) {
|
if (minKey == ik.ts) {
|
||||||
if (init) {
|
if (init) {
|
||||||
tsdbRowMerge(&merge, piRow);
|
tsdbRowMergerAdd(&merge, piRow, NULL);
|
||||||
} else {
|
} else {
|
||||||
init = true;
|
init = true;
|
||||||
STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid);
|
STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid);
|
||||||
code = tsdbRowMergerInit(&merge, piRow, pSchema);
|
code = tsdbRowMergerInit(&merge, NULL, piRow, pSchema);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -2365,10 +2366,10 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
if (minKey == tsLast) {
|
if (minKey == tsLast) {
|
||||||
TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
|
TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
|
||||||
if (init) {
|
if (init) {
|
||||||
tsdbRowMerge(&merge, &fRow1);
|
tsdbRowMergerAdd(&merge, &fRow1, NULL);
|
||||||
} else {
|
} else {
|
||||||
init = true;
|
init = true;
|
||||||
code = tsdbRowMergerInit(&merge, &fRow1, pReader->pSchema);
|
code = tsdbRowMergerInit(&merge, NULL, &fRow1, pReader->pSchema);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -2379,7 +2380,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
if (minKey == key) {
|
if (minKey == key) {
|
||||||
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
|
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
|
||||||
if (!init) {
|
if (!init) {
|
||||||
code = tsdbRowMergerInit(&merge, &fRow, pReader->pSchema);
|
code = tsdbRowMergerInit(&merge, NULL, &fRow, pReader->pSchema);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -2387,7 +2388,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
if (merge.pTSchema == NULL) {
|
if (merge.pTSchema == NULL) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
tsdbRowMerge(&merge, &fRow);
|
tsdbRowMergerAdd(&merge, &fRow, NULL);
|
||||||
}
|
}
|
||||||
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
|
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
|
||||||
}
|
}
|
||||||
|
@ -2572,7 +2573,7 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc
|
||||||
SRow* pTSRow = NULL;
|
SRow* pTSRow = NULL;
|
||||||
SRowMerger merge = {0};
|
SRowMerger merge = {0};
|
||||||
|
|
||||||
int32_t code = tsdbRowMergerInit(&merge, &fRow, pReader->pSchema);
|
int32_t code = tsdbRowMergerInit(&merge, NULL, &fRow, pReader->pSchema);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -2776,8 +2777,8 @@ _end:
|
||||||
updateComposedBlockInfo(pReader, el, pBlockScanInfo);
|
updateComposedBlockInfo(pReader, el, pBlockScanInfo);
|
||||||
|
|
||||||
if (pResBlock->info.rows > 0) {
|
if (pResBlock->info.rows > 0) {
|
||||||
tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64
|
tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64 " rows:%" PRId64
|
||||||
" rows:%" PRId64 ", elapsed time:%.2f ms %s",
|
", elapsed time:%.2f ms %s",
|
||||||
pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey,
|
pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey,
|
||||||
pResBlock->info.rows, el, pReader->idStr);
|
pResBlock->info.rows, el, pReader->idStr);
|
||||||
}
|
}
|
||||||
|
@ -3028,8 +3029,8 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
|
||||||
updateComposedBlockInfo(pReader, el, pScanInfo);
|
updateComposedBlockInfo(pReader, el, pScanInfo);
|
||||||
|
|
||||||
if (pResBlock->info.rows > 0) {
|
if (pResBlock->info.rows > 0) {
|
||||||
tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64
|
tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64 " rows:%" PRId64
|
||||||
" rows:%" PRId64 ", elapsed time:%.2f ms %s",
|
", elapsed time:%.2f ms %s",
|
||||||
pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey,
|
pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey,
|
||||||
pResBlock->info.rows, el, pReader->idStr);
|
pResBlock->info.rows, el, pReader->idStr);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -3112,8 +3113,8 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
|
||||||
updateComposedBlockInfo(pReader, el, pScanInfo);
|
updateComposedBlockInfo(pReader, el, pScanInfo);
|
||||||
|
|
||||||
if (pResBlock->info.rows > 0) {
|
if (pResBlock->info.rows > 0) {
|
||||||
tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64
|
tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64 " rows:%" PRId64
|
||||||
" rows:%" PRId64 ", elapsed time:%.2f ms %s",
|
", elapsed time:%.2f ms %s",
|
||||||
pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey,
|
pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey,
|
||||||
pResBlock->info.rows, el, pReader->idStr);
|
pResBlock->info.rows, el, pReader->idStr);
|
||||||
}
|
}
|
||||||
|
@ -3139,7 +3140,6 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static int32_t doSumFileBlockRows(STsdbReader* pReader, SDataFReader* pFileReader) {
|
static int32_t doSumFileBlockRows(STsdbReader* pReader, SDataFReader* pFileReader) {
|
||||||
int64_t st = taosGetTimestampUs();
|
int64_t st = taosGetTimestampUs();
|
||||||
LRUHandle* handle = NULL;
|
LRUHandle* handle = NULL;
|
||||||
|
@ -3186,7 +3186,6 @@ _end:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static int32_t doSumSttBlockRows(STsdbReader* pReader) {
|
static int32_t doSumSttBlockRows(STsdbReader* pReader) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader;
|
SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader;
|
||||||
|
@ -3283,7 +3282,6 @@ static int32_t readRowsCountFromMem(STsdbReader* pReader) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) {
|
static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) {
|
||||||
SReaderStatus* pStatus = &pReader->status;
|
SReaderStatus* pStatus = &pReader->status;
|
||||||
STableUidList* pUidList = &pStatus->uidList;
|
STableUidList* pUidList = &pStatus->uidList;
|
||||||
|
@ -3696,7 +3694,7 @@ int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDe
|
||||||
|
|
||||||
tsdbRowMergerAdd(pMerger, pRow, pTSchema);
|
tsdbRowMergerAdd(pMerger, pRow, pTSchema);
|
||||||
} else { // column format
|
} else { // column format
|
||||||
tsdbRowMerge(pMerger, pRow);
|
tsdbRowMergerAdd(pMerger, pRow, NULL);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3712,7 +3710,7 @@ static int32_t doMergeRowsInFileBlockImpl(SBlockData* pBlockData, int32_t rowInd
|
||||||
}
|
}
|
||||||
|
|
||||||
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, rowIndex);
|
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, rowIndex);
|
||||||
tsdbRowMerge(pMerger, &fRow);
|
tsdbRowMergerAdd(pMerger, &fRow, NULL);
|
||||||
rowIndex += step;
|
rowIndex += step;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3790,7 +3788,7 @@ int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockSc
|
||||||
int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader);
|
int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader);
|
||||||
if (next1 == ts) {
|
if (next1 == ts) {
|
||||||
TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
|
TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
|
||||||
tsdbRowMerge(pMerger, &fRow1);
|
tsdbRowMergerAdd(pMerger, &fRow1, NULL);
|
||||||
} else {
|
} else {
|
||||||
tsdbTrace("uid:%" PRIu64 " last del index:%d, del range:%d, lastKeyInStt:%" PRId64 ", %s", pScanInfo->uid,
|
tsdbTrace("uid:%" PRIu64 " last del index:%d, del range:%d, lastKeyInStt:%" PRId64 ", %s", pScanInfo->uid,
|
||||||
pScanInfo->lastBlockDelIndex, (int32_t)taosArrayGetSize(pScanInfo->delSkyline), pScanInfo->lastKeyInStt,
|
pScanInfo->lastBlockDelIndex, (int32_t)taosArrayGetSize(pScanInfo->delSkyline), pScanInfo->lastKeyInStt,
|
||||||
|
@ -3846,7 +3844,7 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter,
|
||||||
pReader->pSchema = pTSchema;
|
pReader->pSchema = pTSchema;
|
||||||
}
|
}
|
||||||
|
|
||||||
code = tsdbRowMergerInit2(&merge, pReader->pSchema, ¤t, pTSchema);
|
code = tsdbRowMergerInit(&merge, pReader->pSchema, ¤t, pTSchema);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -3858,12 +3856,12 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter,
|
||||||
|
|
||||||
tsdbRowMergerAdd(&merge, pNextRow, pTSchema1);
|
tsdbRowMergerAdd(&merge, pNextRow, pTSchema1);
|
||||||
} else { // let's merge rows in file block
|
} else { // let's merge rows in file block
|
||||||
code = tsdbRowMergerInit(&merge, ¤t, pReader->pSchema);
|
code = tsdbRowMergerInit(&merge, NULL, ¤t, pReader->pSchema);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
tsdbRowMerge(&merge, pNextRow);
|
tsdbRowMergerAdd(&merge, pNextRow, NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
code = doMergeRowsInBuf(pIter, uid, TSDBROW_TS(¤t), pDelList, &merge, pReader);
|
code = doMergeRowsInBuf(pIter, uid, TSDBROW_TS(¤t), pDelList, &merge, pReader);
|
||||||
|
@ -3889,12 +3887,11 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p
|
||||||
|
|
||||||
TSDBKEY k = TSDBROW_KEY(pRow);
|
TSDBKEY k = TSDBROW_KEY(pRow);
|
||||||
TSDBKEY ik = TSDBROW_KEY(piRow);
|
TSDBKEY ik = TSDBROW_KEY(piRow);
|
||||||
|
|
||||||
if (ASCENDING_TRAVERSE(pReader->order)) { // ascending order imem --> mem
|
|
||||||
STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
|
STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
|
||||||
STSchema* piSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid);
|
STSchema* piSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid);
|
||||||
|
|
||||||
int32_t code = tsdbRowMergerInit2(&merge, pSchema, piRow, piSchema);
|
if (ASCENDING_TRAVERSE(pReader->order)) { // ascending order imem --> mem
|
||||||
|
int32_t code = tsdbRowMergerInit(&merge, pSchema, piRow, piSchema);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -3913,9 +3910,7 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p
|
||||||
}
|
}
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
|
int32_t code = tsdbRowMergerInit(&merge, NULL, pRow, pSchema);
|
||||||
|
|
||||||
int32_t code = tsdbRowMergerInit(&merge, pRow, pSchema);
|
|
||||||
if (code != TSDB_CODE_SUCCESS || merge.pTSchema == NULL) {
|
if (code != TSDB_CODE_SUCCESS || merge.pTSchema == NULL) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -3926,7 +3921,7 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
tsdbRowMerge(&merge, piRow);
|
tsdbRowMergerAdd(&merge, piRow, piSchema);
|
||||||
code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge,
|
code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge,
|
||||||
pReader);
|
pReader);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
|
|
@ -638,13 +638,17 @@ SColVal *tsdbRowIterNext(STSDBRowIter *pIter) {
|
||||||
|
|
||||||
// SRowMerger ======================================================
|
// SRowMerger ======================================================
|
||||||
|
|
||||||
int32_t tsdbRowMergerInit2(SRowMerger *pMerger, STSchema *pResTSchema, TSDBROW *pRow, STSchema *pTSchema) {
|
int32_t tsdbRowMergerInit(SRowMerger *pMerger, STSchema *pResTSchema, TSDBROW *pRow, STSchema *pTSchema) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
TSDBKEY key = TSDBROW_KEY(pRow);
|
TSDBKEY key = TSDBROW_KEY(pRow);
|
||||||
SColVal *pColVal = &(SColVal){0};
|
SColVal *pColVal = &(SColVal){0};
|
||||||
STColumn *pTColumn;
|
STColumn *pTColumn;
|
||||||
int32_t iCol, jCol = 0;
|
int32_t iCol, jCol = 0;
|
||||||
|
|
||||||
|
if (NULL == pResTSchema) {
|
||||||
|
pResTSchema = pTSchema;
|
||||||
|
}
|
||||||
|
|
||||||
pMerger->pTSchema = pResTSchema;
|
pMerger->pTSchema = pResTSchema;
|
||||||
pMerger->version = key.version;
|
pMerger->version = key.version;
|
||||||
|
|
||||||
|
@ -712,6 +716,9 @@ int32_t tsdbRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema)
|
||||||
STColumn *pTColumn;
|
STColumn *pTColumn;
|
||||||
int32_t iCol, jCol = 1;
|
int32_t iCol, jCol = 1;
|
||||||
|
|
||||||
|
if (NULL == pTSchema) {
|
||||||
|
pTSchema = pMerger->pTSchema;
|
||||||
|
}
|
||||||
ASSERT(((SColVal *)pMerger->pArray->pData)->value.val == key.ts);
|
ASSERT(((SColVal *)pMerger->pArray->pData)->value.val == key.ts);
|
||||||
|
|
||||||
for (iCol = 1; iCol < pMerger->pTSchema->numOfCols && jCol < pTSchema->numOfCols; ++iCol) {
|
for (iCol = 1; iCol < pMerger->pTSchema->numOfCols && jCol < pTSchema->numOfCols; ++iCol) {
|
||||||
|
@ -727,23 +734,6 @@ int32_t tsdbRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema)
|
||||||
tsdbRowGetColVal(pRow, pTSchema, jCol++, pColVal);
|
tsdbRowGetColVal(pRow, pTSchema, jCol++, pColVal);
|
||||||
|
|
||||||
if (key.version > pMerger->version) {
|
if (key.version > pMerger->version) {
|
||||||
#if 0
|
|
||||||
if (!COL_VAL_IS_NONE(pColVal)) {
|
|
||||||
if ((!COL_VAL_IS_NULL(pColVal)) && IS_VAR_DATA_TYPE(pColVal->type)) {
|
|
||||||
SColVal *tColVal = taosArrayGet(pMerger->pArray, iCol);
|
|
||||||
code = tRealloc(&tColVal->value.pData, pColVal->value.nData);
|
|
||||||
if (code) return code;
|
|
||||||
|
|
||||||
tColVal->value.nData = pColVal->value.nData;
|
|
||||||
if (pColVal->value.nData) {
|
|
||||||
memcpy(tColVal->value.pData, pColVal->value.pData, pColVal->value.nData);
|
|
||||||
}
|
|
||||||
tColVal->flag = 0;
|
|
||||||
} else {
|
|
||||||
taosArraySet(pMerger->pArray, iCol, pColVal);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
if (!COL_VAL_IS_NONE(pColVal)) {
|
if (!COL_VAL_IS_NONE(pColVal)) {
|
||||||
if (IS_VAR_DATA_TYPE(pColVal->type)) {
|
if (IS_VAR_DATA_TYPE(pColVal->type)) {
|
||||||
SColVal *pTColVal = taosArrayGet(pMerger->pArray, iCol);
|
SColVal *pTColVal = taosArrayGet(pMerger->pArray, iCol);
|
||||||
|
@ -758,7 +748,6 @@ int32_t tsdbRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema)
|
||||||
pTColVal->flag = 0;
|
pTColVal->flag = 0;
|
||||||
} else {
|
} else {
|
||||||
tFree(pTColVal->value.pData);
|
tFree(pTColVal->value.pData);
|
||||||
pTColVal->value.pData = NULL;
|
|
||||||
taosArraySet(pMerger->pArray, iCol, pColVal);
|
taosArraySet(pMerger->pArray, iCol, pColVal);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -789,7 +778,7 @@ int32_t tsdbRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema)
|
||||||
pMerger->version = key.version;
|
pMerger->version = key.version;
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
/*
|
||||||
int32_t tsdbRowMergerInit(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) {
|
int32_t tsdbRowMergerInit(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
TSDBKEY key = TSDBROW_KEY(pRow);
|
TSDBKEY key = TSDBROW_KEY(pRow);
|
||||||
|
@ -840,7 +829,7 @@ int32_t tsdbRowMergerInit(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema
|
||||||
_exit:
|
_exit:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
*/
|
||||||
void tsdbRowMergerClear(SRowMerger *pMerger) {
|
void tsdbRowMergerClear(SRowMerger *pMerger) {
|
||||||
for (int32_t iCol = 1; iCol < pMerger->pTSchema->numOfCols; iCol++) {
|
for (int32_t iCol = 1; iCol < pMerger->pTSchema->numOfCols; iCol++) {
|
||||||
SColVal *pTColVal = taosArrayGet(pMerger->pArray, iCol);
|
SColVal *pTColVal = taosArrayGet(pMerger->pArray, iCol);
|
||||||
|
@ -851,7 +840,7 @@ void tsdbRowMergerClear(SRowMerger *pMerger) {
|
||||||
|
|
||||||
taosArrayDestroy(pMerger->pArray);
|
taosArrayDestroy(pMerger->pArray);
|
||||||
}
|
}
|
||||||
|
/*
|
||||||
int32_t tsdbRowMerge(SRowMerger *pMerger, TSDBROW *pRow) {
|
int32_t tsdbRowMerge(SRowMerger *pMerger, TSDBROW *pRow) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
TSDBKEY key = TSDBROW_KEY(pRow);
|
TSDBKEY key = TSDBROW_KEY(pRow);
|
||||||
|
@ -916,7 +905,7 @@ int32_t tsdbRowMerge(SRowMerger *pMerger, TSDBROW *pRow) {
|
||||||
_exit:
|
_exit:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
*/
|
||||||
int32_t tsdbRowMergerGetRow(SRowMerger *pMerger, SRow **ppRow) {
|
int32_t tsdbRowMergerGetRow(SRowMerger *pMerger, SRow **ppRow) {
|
||||||
return tRowBuild(pMerger->pArray, pMerger->pTSchema, ppRow);
|
return tRowBuild(pMerger->pArray, pMerger->pTSchema, ppRow);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue