fix(tsdb): fix error in reading data from file block
This commit is contained in:
parent
1e88193e57
commit
ecd09059f5
|
@ -171,6 +171,7 @@ void createNewTable(TAOS* pConn, int32_t index, int32_t numOfRows, int64_t start
|
|||
printf("failed to insert data, reason:%s\n", taos_errstr(p));
|
||||
}
|
||||
|
||||
// startTs += 20;
|
||||
taos_free_result(p);
|
||||
}
|
||||
}
|
||||
|
@ -826,19 +827,19 @@ TEST(clientCase, projection_query_tables) {
|
|||
// }
|
||||
// taos_free_result(pRes);
|
||||
|
||||
TAOS_RES* pRes = taos_query(pConn, "use abc1");
|
||||
TAOS_RES* pRes = taos_query(pConn, "use test");
|
||||
taos_free_result(pRes);
|
||||
|
||||
// TAOS_RES* pRes = taos_query(pConn, "select tbname, last(ts) from abc1.stable_1 group by tbname");
|
||||
pRes = taos_query(pConn, "create table st2 (ts timestamp, k int primary key, j varchar(1000)) tags(a int)");
|
||||
taos_free_result(pRes);
|
||||
|
||||
// pRes = taos_query(pConn, "create stream stream_1 trigger at_once fill_history 1 ignore expired 0 into str_res1 as select _wstart as ts, count(*) from stable_1 interval(10s);");
|
||||
// if (taos_errno(pRes) != 0) {
|
||||
// printf("failed to create table tu, reason:%s\n", taos_errstr(pRes));
|
||||
// }
|
||||
// taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "create stream stream_1 trigger at_once fill_history 1 ignore expired 0 into str_res1 as select _wstart as ts, count(*) from stable_1 interval(10s);");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to create table tu, reason:%s\n", taos_errstr(pRes));
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "create table tu using st1 tags(1)");
|
||||
pRes = taos_query(pConn, "create table tu using st2 tags(2)");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to create table tu, reason:%s\n", taos_errstr(pRes));
|
||||
}
|
||||
|
@ -853,7 +854,7 @@ TEST(clientCase, projection_query_tables) {
|
|||
"ghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz!@#$%^&&*&^^%$#@!qQWERTYUIOPASDFGHJKL:"
|
||||
"QWERTYUIOP{}";
|
||||
|
||||
for(int32_t i = 0; i < 10000; ++i) {
|
||||
for(int32_t i = 0; i < 1; ++i) {
|
||||
char str[1024] = {0};
|
||||
sprintf(str, "create table if not exists tu%d using st2 tags(%d)", i, i);
|
||||
|
||||
|
@ -864,10 +865,10 @@ TEST(clientCase, projection_query_tables) {
|
|||
taos_free_result(px);
|
||||
}
|
||||
|
||||
for(int32_t j = 0; j < 5000; ++j) {
|
||||
for(int32_t j = 0; j < 1; ++j) {
|
||||
start += 20;
|
||||
for (int32_t i = 0; i < 10000; ++i) {
|
||||
createNewTable(pConn, i, 20, start, pstr);
|
||||
for (int32_t i = 0; i < 1; ++i) {
|
||||
createNewTable(pConn, i, 100, start, pstr);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -48,7 +48,7 @@ static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter);
|
|||
static int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity,
|
||||
STsdbReader* pReader);
|
||||
static TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader);
|
||||
static int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader);
|
||||
static int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, SRowKey* pKey, STsdbReader* pReader);
|
||||
static int32_t doMergeRowsInSttBlock(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo,
|
||||
SRowKey* pRowKey, SRowMerger* pMerger, SVersionRange* pVerRange, const char* id);
|
||||
static int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, SRowKey* pCurKey, SArray* pDelList,
|
||||
|
@ -65,7 +65,7 @@ static int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo*
|
|||
TSDBROW* pResRow, STsdbReader* pReader, bool* freeTSRow);
|
||||
static int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo,
|
||||
STsdbReader* pReader, SRow** pTSRow);
|
||||
static int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key,
|
||||
static int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, SRowKey* pKey,
|
||||
STsdbReader* pReader);
|
||||
static int32_t mergeRowsInSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pBlockScanInfo,
|
||||
STsdbReader* pReader);
|
||||
|
@ -203,7 +203,7 @@ static int32_t setColumnIdSlotList(SBlockLoadSuppInfo* pSupInfo, SColumnInfo* pC
|
|||
|
||||
if (pCols[i].pk) {
|
||||
pSupInfo->pk = pCols[i];
|
||||
pSupInfo->pkSrcSlot = i;
|
||||
pSupInfo->pkSrcSlot = i - 1;
|
||||
pSupInfo->pkDstSlot = pSlotIdList[i];
|
||||
pSupInfo->numOfPks += 1;
|
||||
}
|
||||
|
@ -1504,7 +1504,7 @@ static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo*
|
|||
return code;
|
||||
}
|
||||
|
||||
static bool tryCopyDistinctRowFromFileBlock(STsdbReader* pReader, SBlockData* pBlockData, int64_t key,
|
||||
static bool tryCopyDistinctRowFromFileBlock(STsdbReader* pReader, SBlockData* pBlockData, SRowKey* pKey,
|
||||
SFileBlockDumpInfo* pDumpInfo, bool* copied) {
|
||||
// opt version
|
||||
// 1. it is not a border point
|
||||
|
@ -1516,11 +1516,10 @@ static bool tryCopyDistinctRowFromFileBlock(STsdbReader* pReader, SBlockData* pB
|
|||
if ((pDumpInfo->rowIndex < pDumpInfo->totalRows - 1 && asc) || (pDumpInfo->rowIndex > 0 && (!asc))) {
|
||||
int32_t step = ASCENDING_TRAVERSE(pReader->info.order)? 1 : -1;
|
||||
|
||||
SRowKey rowKey, nextRowKey;
|
||||
tColRowGetKey(pBlockData, pDumpInfo->rowIndex, &rowKey);
|
||||
SRowKey nextRowKey;
|
||||
tColRowGetKey(pBlockData, pDumpInfo->rowIndex + step, &nextRowKey);
|
||||
|
||||
if (rowKey.ts != nextRowKey.ts || (pkComp2(pReader->pkComparFn, &rowKey, &nextRowKey) != 0)) { // merge is not needed
|
||||
if (pKey->ts != nextRowKey.ts || (pkComp2(pReader->pkComparFn, pKey, &nextRowKey) != 0)) { // merge is not needed
|
||||
code = doAppendRowFromFileBlock(pReader->resBlockInfo.pResBlock, pReader, pBlockData, pDumpInfo->rowIndex);
|
||||
if (code) {
|
||||
return code;
|
||||
|
@ -1578,6 +1577,14 @@ static void doPinSttBlock(SSttBlockReader* pSttBlockReader) { tMergeTreePinSttBl
|
|||
static void doUnpinSttBlock(SSttBlockReader* pSttBlockReader) { tMergeTreeUnpinSttBlock(&pSttBlockReader->mergeTree); }
|
||||
|
||||
static int32_t pkCompEx(__compar_fn_t comparFn, SRowKey* p1, SRowKey* p2) {
|
||||
if (p2 == NULL) {
|
||||
return 1;
|
||||
}
|
||||
|
||||
if (p1 == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (p1->ts < p2->ts) {
|
||||
return -1;
|
||||
} else if (p1->ts > p2->ts) {
|
||||
|
@ -1722,7 +1729,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
|
|||
minKey = *pfKey;
|
||||
}
|
||||
|
||||
if (pSttKey != NULL && (pkCompEx(compFn, pSttKey, &minKey) > 0)) {
|
||||
if (pkCompEx(compFn, pSttKey, &minKey) > 0) {
|
||||
minKey = *pSttKey;
|
||||
}
|
||||
}
|
||||
|
@ -1736,7 +1743,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
|
|||
return code;
|
||||
}
|
||||
|
||||
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader);
|
||||
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pfKey, pReader);
|
||||
}
|
||||
|
||||
if (pkCompEx(compFn, &minKey, pSttKey) == 0) {
|
||||
|
@ -1786,7 +1793,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
|
|||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader);
|
||||
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pfKey, pReader);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1803,7 +1810,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t mergeFileBlockAndSttBlock(STsdbReader* pReader, SSttBlockReader* pSttBlockReader, int64_t key,
|
||||
static int32_t mergeFileBlockAndSttBlock(STsdbReader* pReader, SSttBlockReader* pSttBlockReader, SRowKey* pKey,
|
||||
STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) {
|
||||
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
|
||||
SRowMerger* pMerger = &pReader->status.merger;
|
||||
|
@ -1823,50 +1830,32 @@ static int32_t mergeFileBlockAndSttBlock(STsdbReader* pReader, SSttBlockReader*
|
|||
|
||||
if (dataInDataFile && (!dataInSttFile)) {
|
||||
// no stt file block available, only data block exists
|
||||
return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
|
||||
return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pKey, pReader);
|
||||
} else if ((!dataInDataFile) && dataInSttFile) {
|
||||
// no data in data file exists
|
||||
return mergeRowsInSttBlocks(pSttBlockReader, pBlockScanInfo, pReader);
|
||||
} else if (pBlockScanInfo->cleanSttBlocks && pReader->info.execMode == READER_EXEC_ROWS) {
|
||||
// opt model for count data in stt file, which is not overlap with data blocks in files.
|
||||
return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
|
||||
return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pKey, pReader);
|
||||
} else {
|
||||
// row in both stt file blocks and data file blocks
|
||||
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
|
||||
SRowKey* pSttKey = getCurrentKeyInSttBlock(pSttBlockReader);
|
||||
|
||||
if (ASCENDING_TRAVERSE(pReader->info.order)) {
|
||||
if (key < pSttKey->ts) { // asc
|
||||
return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
|
||||
} else if (key > pSttKey->ts) {
|
||||
return mergeRowsInSttBlocks(pSttBlockReader, pBlockScanInfo, pReader);
|
||||
}
|
||||
int32_t ret = pkCompEx(pSttBlockReader->pkComparFn, pKey, pSttKey);
|
||||
|
||||
// key == tsLast. ts is equal and the primary key exists
|
||||
if (pSttBlockReader->numOfPks > 0) {
|
||||
int32_t res = pkComp1(pReader, pSttKey, &fRow);
|
||||
if (res > 0) {
|
||||
return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
|
||||
} else if (res < 0) {
|
||||
return mergeRowsInSttBlocks(pSttBlockReader, pBlockScanInfo, pReader);
|
||||
}
|
||||
if (ASCENDING_TRAVERSE(pReader->info.order)) {
|
||||
if (ret < 0) { // asc
|
||||
return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pKey, pReader);
|
||||
} else if (ret > 0) {
|
||||
return mergeRowsInSttBlocks(pSttBlockReader, pBlockScanInfo, pReader);
|
||||
}
|
||||
} else { // desc
|
||||
if (key > pSttKey->ts) {
|
||||
return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
|
||||
} else if (key < pSttKey->ts) {
|
||||
if (ret > 0) {
|
||||
return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pKey, pReader);
|
||||
} else if (ret < 0) {
|
||||
return mergeRowsInSttBlocks(pSttBlockReader, pBlockScanInfo, pReader);
|
||||
}
|
||||
|
||||
// key == tsLast. ts is equal and the primary key exists
|
||||
if (pReader->suppInfo.numOfPks > 0) {
|
||||
int32_t res = pkComp1(pReader, pSttKey, &fRow);
|
||||
if (res < 0) {
|
||||
return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
|
||||
} else if (res > 0) {
|
||||
return mergeRowsInSttBlocks(pSttBlockReader, pBlockScanInfo, pReader);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// the following for key == sttKey->key.ts
|
||||
|
@ -1879,7 +1868,7 @@ static int32_t mergeFileBlockAndSttBlock(STsdbReader* pReader, SSttBlockReader*
|
|||
return code;
|
||||
}
|
||||
|
||||
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader);
|
||||
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pKey, pReader);
|
||||
|
||||
TSDBROW* pRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree);
|
||||
code = tsdbRowMergerAdd(pMerger, pRow1, NULL);
|
||||
|
@ -1902,7 +1891,7 @@ static int32_t mergeFileBlockAndSttBlock(STsdbReader* pReader, SSttBlockReader*
|
|||
return code;
|
||||
}
|
||||
|
||||
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader);
|
||||
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pKey, pReader);
|
||||
}
|
||||
|
||||
code = tsdbRowMergerGetRow(pMerger, &pTSRow);
|
||||
|
@ -2011,7 +2000,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
|||
return code;
|
||||
}
|
||||
|
||||
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader);
|
||||
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pfKey, pReader);
|
||||
}
|
||||
|
||||
if (pkCompEx(compFn, &minKey, pSttKey) == 0) {
|
||||
|
@ -2089,7 +2078,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
|||
return code;
|
||||
}
|
||||
|
||||
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader);
|
||||
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pfKey, pReader);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2333,13 +2322,13 @@ bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo*
|
|||
return pBlockData->nRow > 0 && (!pDumpInfo->allDumped);
|
||||
}
|
||||
|
||||
int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key,
|
||||
int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, SRowKey* pKey,
|
||||
STsdbReader* pReader) {
|
||||
SRowMerger* pMerger = &pReader->status.merger;
|
||||
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
|
||||
bool copied = false;
|
||||
|
||||
int32_t code = tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo, &copied);
|
||||
int32_t code = tryCopyDistinctRowFromFileBlock(pReader, pBlockData, pKey, pDumpInfo, &copied);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
@ -2354,12 +2343,6 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc
|
|||
}
|
||||
|
||||
if (copied) {
|
||||
// if (pReader->suppInfo.numOfPks == 0) {
|
||||
// pBlockScanInfo->lastProcKey.ts = key;
|
||||
// } else { // todo use deep copy instead of shallow copy
|
||||
// int32_t step = ASCENDING_TRAVERSE(pReader->info.order)? 1 : -1;
|
||||
// tColRowGetKey(pBlockData, pDumpInfo->rowIndex - step, &pBlockScanInfo->lastProcKey);
|
||||
// }
|
||||
return TSDB_CODE_SUCCESS;
|
||||
} else {
|
||||
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
|
||||
|
@ -2370,7 +2353,7 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc
|
|||
return code;
|
||||
}
|
||||
|
||||
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader);
|
||||
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pKey, pReader);
|
||||
code = tsdbRowMergerGetRow(pMerger, &pTSRow);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
|
@ -2435,9 +2418,14 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI
|
|||
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
|
||||
|
||||
TSDBROW *pRow = NULL, *piRow = NULL;
|
||||
int64_t key = (pBlockData->nRow > 0 && (!pDumpInfo->allDumped))
|
||||
? pBlockData->aTSKEY[pDumpInfo->rowIndex]
|
||||
: (ASCENDING_TRAVERSE(pReader->info.order) ? INT64_MAX : INT64_MIN);
|
||||
|
||||
SRowKey* pKey = &(SRowKey){0};
|
||||
if (hasDataInFileBlock(pBlockData, pDumpInfo)) {
|
||||
tColRowGetKey(pBlockData, pDumpInfo->rowIndex, pKey);
|
||||
} else {
|
||||
pKey = NULL;
|
||||
}
|
||||
|
||||
if (pBlockScanInfo->iter.hasVal) {
|
||||
pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
|
||||
}
|
||||
|
@ -2462,7 +2450,7 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI
|
|||
}
|
||||
|
||||
// files data blocks + stt block
|
||||
return mergeFileBlockAndSttBlock(pReader, pSttBlockReader, key, pBlockScanInfo, pBlockData);
|
||||
return mergeFileBlockAndSttBlock(pReader, pSttBlockReader, pKey, pBlockScanInfo, pBlockData);
|
||||
}
|
||||
|
||||
static int32_t loadNeighborIfOverlap(SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pBlockScanInfo,
|
||||
|
@ -2886,7 +2874,7 @@ static void buildCleanBlockFromDataFiles(STsdbReader* pReader, STableBlockScanIn
|
|||
pInfo->version = pReader->info.verRange.maxVer;
|
||||
pInfo->window = (STimeWindow){.skey = pBlockInfo->firstKey, .ekey = pBlockInfo->lastKey};
|
||||
|
||||
if (pReader->suppInfo.pk.pk) {
|
||||
if (pReader->suppInfo.numOfPks > 0) {
|
||||
if (IS_NUMERIC_TYPE(pReader->suppInfo.pk.type)) {
|
||||
pInfo->pks[0].val = pBlockInfo->firstPk.val;
|
||||
pInfo->pks[1].val = pBlockInfo->lastPk.val;
|
||||
|
@ -3645,9 +3633,15 @@ int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, SRowKey *pCurKey, SArra
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t doMergeRowsInFileBlockImpl(SBlockData* pBlockData, int32_t rowIndex, int64_t key, SRowMerger* pMerger,
|
||||
SVersionRange* pVerRange, int32_t step) {
|
||||
while (rowIndex < pBlockData->nRow && rowIndex >= 0 && pBlockData->aTSKEY[rowIndex] == key) {
|
||||
static int32_t doMergeRowsInFileBlockImpl(SBlockData* pBlockData, int32_t rowIndex, SRowKey* pKey, SRowMerger* pMerger,
|
||||
SVersionRange* pVerRange, int32_t step, __compar_fn_t comparFn) {
|
||||
while (rowIndex < pBlockData->nRow && rowIndex >= 0) {
|
||||
SRowKey cur;
|
||||
tColRowGetKey(pBlockData, rowIndex, &cur);
|
||||
if (pkCompEx(comparFn, &cur, pKey) != 0) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (pBlockData->aVersion[rowIndex] > pVerRange->maxVer || pBlockData->aVersion[rowIndex] < pVerRange->minVer) {
|
||||
rowIndex += step;
|
||||
continue;
|
||||
|
@ -3667,21 +3661,20 @@ typedef enum {
|
|||
} CHECK_FILEBLOCK_STATE;
|
||||
|
||||
static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanInfo* pScanInfo,
|
||||
SFileDataBlockInfo* pFBlock, SRowMerger* pMerger, int64_t key,
|
||||
SFileDataBlockInfo* pFBlock, SRowMerger* pMerger, SRowKey* pKey,
|
||||
CHECK_FILEBLOCK_STATE* state) {
|
||||
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
|
||||
SBlockData* pBlockData = &pReader->status.fileBlockData;
|
||||
bool asc = ASCENDING_TRAVERSE(pReader->info.order);
|
||||
SVersionRange* pVerRange = &pReader->info.verRange;
|
||||
bool loadNeighbor = true;
|
||||
int32_t step = ASCENDING_TRAVERSE(pReader->info.order) ? 1 : -1;
|
||||
|
||||
*state = CHECK_FILEBLOCK_QUIT;
|
||||
int32_t step = ASCENDING_TRAVERSE(pReader->info.order) ? 1 : -1;
|
||||
|
||||
bool loadNeighbor = true;
|
||||
int32_t code = loadNeighborIfOverlap(pFBlock, pScanInfo, pReader, &loadNeighbor);
|
||||
*state = CHECK_FILEBLOCK_QUIT;
|
||||
|
||||
if (loadNeighbor && (code == TSDB_CODE_SUCCESS)) {
|
||||
pDumpInfo->rowIndex =
|
||||
doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->info.verRange, step);
|
||||
pDumpInfo->rowIndex = doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, pKey, pMerger, pVerRange, step, pReader->pkComparFn);
|
||||
if ((pDumpInfo->rowIndex >= pDumpInfo->totalRows && asc) || (pDumpInfo->rowIndex < 0 && !asc)) {
|
||||
*state = CHECK_FILEBLOCK_CONT;
|
||||
}
|
||||
|
@ -3690,18 +3683,17 @@ static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanIn
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader) {
|
||||
int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, SRowKey* pKey, STsdbReader* pReader) {
|
||||
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
|
||||
|
||||
SRowMerger* pMerger = &pReader->status.merger;
|
||||
bool asc = ASCENDING_TRAVERSE(pReader->info.order);
|
||||
int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex];
|
||||
int32_t step = asc ? 1 : -1;
|
||||
SRowMerger* pMerger = &pReader->status.merger;
|
||||
bool asc = ASCENDING_TRAVERSE(pReader->info.order);
|
||||
int32_t step = asc ? 1 : -1;
|
||||
SVersionRange* pRange = &pReader->info.verRange;
|
||||
|
||||
pDumpInfo->rowIndex += step;
|
||||
if ((pDumpInfo->rowIndex <= pBlockData->nRow - 1 && asc) || (pDumpInfo->rowIndex >= 0 && !asc)) {
|
||||
pDumpInfo->rowIndex =
|
||||
doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->info.verRange, step);
|
||||
doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, pKey, pMerger, pRange, step, pReader->pkComparFn);
|
||||
}
|
||||
|
||||
// all rows are consumed, let's try next file block
|
||||
|
@ -3715,7 +3707,7 @@ int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pSc
|
|||
break;
|
||||
}
|
||||
|
||||
checkForNeighborFileBlock(pReader, pScanInfo, pFileBlockInfo, pMerger, key, &st);
|
||||
checkForNeighborFileBlock(pReader, pScanInfo, pFileBlockInfo, pMerger, pKey, &st);
|
||||
if (st == CHECK_FILEBLOCK_QUIT) {
|
||||
break;
|
||||
}
|
||||
|
@ -3725,8 +3717,6 @@ int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pSc
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
|
||||
int32_t doMergeRowsInSttBlock(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, SRowKey* pRowKey,
|
||||
SRowMerger* pMerger, SVersionRange* pVerRange, const char* idStr) {
|
||||
while (nextRowFromSttBlocks(pSttBlockReader, pScanInfo, pVerRange)) {
|
||||
|
|
|
@ -163,25 +163,30 @@ SSHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf* pBuf
|
|||
|
||||
pUidList->tableUidList[j] = idList[j].uid;
|
||||
|
||||
SRowKey* pRowKey = &pScanInfo->lastProcKey;
|
||||
if (ASCENDING_TRAVERSE(pTsdbReader->info.order)) {
|
||||
int64_t skey = pTsdbReader->info.window.skey;
|
||||
pScanInfo->lastProcKey.ts = (skey > INT64_MIN) ? (skey - 1) : skey;
|
||||
pRowKey->ts = (skey > INT64_MIN) ? (skey - 1) : skey;
|
||||
pScanInfo->sttKeyInfo.nextProcKey = skey;
|
||||
} else {
|
||||
int64_t ekey = pTsdbReader->info.window.ekey;
|
||||
pScanInfo->lastProcKey.ts = (ekey < INT64_MAX) ? (ekey + 1) : ekey;
|
||||
pRowKey->ts = (ekey < INT64_MAX) ? (ekey + 1) : ekey;
|
||||
pScanInfo->sttKeyInfo.nextProcKey = ekey;
|
||||
}
|
||||
|
||||
pScanInfo->lastProcKey.numOfPKs = pTsdbReader->suppInfo.numOfPks;
|
||||
if (pTsdbReader->suppInfo.numOfPks > 0 && IS_VAR_DATA_TYPE(pTsdbReader->suppInfo.pk.type)) {
|
||||
pScanInfo->lastProcKey.pks[0].pData = taosMemoryCalloc(1, pTsdbReader->suppInfo.pk.bytes);
|
||||
// only handle the first primary key.
|
||||
pRowKey->numOfPKs = pTsdbReader->suppInfo.numOfPks;
|
||||
if (pTsdbReader->suppInfo.numOfPks > 0) {
|
||||
if (IS_VAR_DATA_TYPE(pTsdbReader->suppInfo.pk.type)) {
|
||||
pRowKey->pks[0].pData = taosMemoryCalloc(1, pTsdbReader->suppInfo.pk.bytes);
|
||||
}
|
||||
pRowKey->pks[0].type = pTsdbReader->suppInfo.pk.type;
|
||||
}
|
||||
|
||||
pScanInfo->sttKeyInfo.status = STT_FILE_READER_UNINIT;
|
||||
tSimpleHashPut(pTableMap, &pScanInfo->uid, sizeof(uint64_t), &pScanInfo, POINTER_BYTES);
|
||||
tsdbTrace("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pTsdbReader, pScanInfo->uid,
|
||||
pScanInfo->lastProcKey.ts, pTsdbReader->idStr);
|
||||
tsdbTrace("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pTsdbReader, pScanInfo->uid, pRowKey->ts,
|
||||
pTsdbReader->idStr);
|
||||
}
|
||||
|
||||
taosSort(pUidList->tableUidList, numOfTables, sizeof(uint64_t), uidComparFunc);
|
||||
|
|
|
@ -1198,16 +1198,17 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
|
|||
pInfo->pResBlock = createDataBlockFromDescNode(pDescNode);
|
||||
|
||||
{ // todo :refactor:
|
||||
SDataBlockInfo* pBlockInfo = &pInfo->pResBlock->info;
|
||||
for(int32_t i = 0; i < taosArrayGetSize(pInfo->base.matchInfo.pList); ++i) {
|
||||
SColMatchItem* pItem = taosArrayGet(pInfo->base.matchInfo.pList, i);
|
||||
if (pItem->isPk) {
|
||||
SColumnInfoData* pInfoData = taosArrayGet(pInfo->pResBlock->pDataBlock, pItem->dstSlotId);
|
||||
pInfo->pResBlock->info.pks[0].type = pInfoData->info.type;
|
||||
pInfo->pResBlock->info.pks[1].type = pInfoData->info.type;
|
||||
pBlockInfo->pks[0].type = pInfoData->info.type;
|
||||
pBlockInfo->pks[1].type = pInfoData->info.type;
|
||||
|
||||
if (IS_VAR_DATA_TYPE(pItem->dataType.type)) {
|
||||
pInfo->pResBlock->info.pks[0].pData = taosMemoryCalloc(1, pInfoData->info.bytes);
|
||||
pInfo->pResBlock->info.pks[1].pData = taosMemoryCalloc(1, pInfoData->info.bytes);
|
||||
pBlockInfo->pks[0].pData = taosMemoryCalloc(1, pInfoData->info.bytes);
|
||||
pBlockInfo->pks[1].pData = taosMemoryCalloc(1, pInfoData->info.bytes);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue