fix(tsdb): set correct compare func for merge tree.

This commit is contained in:
Haojun Liao 2024-04-19 00:32:41 +08:00
parent 4c5f13873d
commit eb1fed177f
3 changed files with 30 additions and 17 deletions

View File

@ -828,17 +828,22 @@ static FORCE_INLINE int32_t tLDataIterCmprFn(const SRBTreeNode *p1, const SRBTre
SLDataIter *pIter1 = (SLDataIter *)(((uint8_t *)p1) - offsetof(SLDataIter, node)); SLDataIter *pIter1 = (SLDataIter *)(((uint8_t *)p1) - offsetof(SLDataIter, node));
SLDataIter *pIter2 = (SLDataIter *)(((uint8_t *)p2) - offsetof(SLDataIter, node)); SLDataIter *pIter2 = (SLDataIter *)(((uint8_t *)p2) - offsetof(SLDataIter, node));
TSDBKEY key1 = TSDBROW_KEY(&pIter1->rInfo.row); SRowKey rkey1, rkey2;
TSDBKEY key2 = TSDBROW_KEY(&pIter2->rInfo.row); tRowGetKeyEx(&pIter1->rInfo.row, &rkey1);
tRowGetKeyEx(&pIter2->rInfo.row, &rkey2);
if (key1.ts < key2.ts) { int32_t ret = tRowKeyCompare(&rkey1, &rkey2);
if (ret < 0) {
return -1; return -1;
} else if (key1.ts > key2.ts) { } else if (ret > 0) {
return 1; return 1;
} else { } else {
if (key1.version < key2.version) { int64_t ver1 = TSDBROW_VERSION(&pIter1->rInfo.row);
int64_t ver2 = TSDBROW_VERSION(&pIter2->rInfo.row);
if (ver1 < ver2) {
return -1; return -1;
} else if (key1.version > key2.version) { } else if (ver1 > ver2) {
return 1; return 1;
} else { } else {
return 0; return 0;

View File

@ -25,15 +25,6 @@
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC) #define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
#define getCurrentKeyInSttBlock(_r) (&((_r)->currentKey)) #define getCurrentKeyInSttBlock(_r) (&((_r)->currentKey))
#define tRowGetKeyEx(_pRow, _pKey) \
do { \
if ((_pRow)->type == TSDBROW_ROW_FMT) { \
tRowGetKey((_pRow)->pTSRow, (_pKey)); \
} else { \
tColRowGetKey((_pRow)->pBlockData, (_pRow)->iRow, (_pKey)); \
} \
} while (0)
typedef struct { typedef struct {
bool overlapWithNeighborBlock; bool overlapWithNeighborBlock;
bool hasDupTs; bool hasDupTs;
@ -1619,6 +1610,11 @@ static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SSttBlockReader* pSttB
doUnpinSttBlock(pSttBlockReader); doUnpinSttBlock(pSttBlockReader);
if (hasVal) { if (hasVal) {
SRowKey* pNext = getCurrentKeyInSttBlock(pSttBlockReader); SRowKey* pNext = getCurrentKeyInSttBlock(pSttBlockReader);
if (IS_VAR_DATA_TYPE(pSttKey->pks[0].type)) {
tsdbInfo("current pk:%s, next pk:%s", pSttKey->pks[0].pData, pNext->pks[0].pData);
}
if (pkCompEx(pSttKey, pNext) != 0) { if (pkCompEx(pSttKey, pNext) != 0) {
code = doAppendRowFromFileBlock(pReader->resBlockInfo.pResBlock, pReader, fRow->pBlockData, fRow->iRow); code = doAppendRowFromFileBlock(pReader->resBlockInfo.pResBlock, pReader, fRow->pBlockData, fRow->iRow);
*copied = (code == TSDB_CODE_SUCCESS); *copied = (code == TSDB_CODE_SUCCESS);
@ -2311,8 +2307,11 @@ int32_t mergeRowsInSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockScanIn
TSDBROW* pRow = tMergeTreeGetRow(&pSttBlockReader->mergeTree); TSDBROW* pRow = tMergeTreeGetRow(&pSttBlockReader->mergeTree);
TSDBROW fRow = {.iRow = pRow->iRow, .type = TSDBROW_COL_FMT, .pBlockData = pRow->pBlockData}; TSDBROW fRow = {.iRow = pRow->iRow, .type = TSDBROW_COL_FMT, .pBlockData = pRow->pBlockData};
tsdbTrace("fRow ptr:%p, %d, uid:%" PRIu64 ", ts:%" PRId64 " %s", pRow->pBlockData, pRow->iRow, pSttBlockReader->uid,
fRow.pBlockData->aTSKEY[fRow.iRow], pReader->idStr); if (IS_VAR_DATA_TYPE(pScanInfo->lastProcKey.pks[0].type)) {
tsdbTrace("fRow ptr:%p, %d, uid:%" PRIu64 ", ts:%" PRId64 " pk:%s %s", pRow->pBlockData, pRow->iRow, pSttBlockReader->uid,
fRow.pBlockData->aTSKEY[fRow.iRow], pScanInfo->lastProcKey.pks[0].pData, pReader->idStr);
}
int32_t code = int32_t code =
tryCopyDistinctRowFromSttBlock(&fRow, pSttBlockReader, pScanInfo, &pScanInfo->lastProcKey, pReader, &copied); tryCopyDistinctRowFromSttBlock(&fRow, pSttBlockReader, pScanInfo, &pScanInfo->lastProcKey, pReader, &copied);

View File

@ -38,6 +38,15 @@ extern "C" {
(_k)->ekey.ts = INT64_MIN; \ (_k)->ekey.ts = INT64_MIN; \
} while (0); } while (0);
#define tRowGetKeyEx(_pRow, _pKey) \
do { \
if ((_pRow)->type == TSDBROW_ROW_FMT) { \
tRowGetKey((_pRow)->pTSRow, (_pKey)); \
} else { \
tColRowGetKey((_pRow)->pBlockData, (_pRow)->iRow, (_pKey)); \
} \
} while (0)
typedef enum { typedef enum {
READER_STATUS_SUSPEND = 0x1, READER_STATUS_SUSPEND = 0x1,
READER_STATUS_NORMAL = 0x2, READER_STATUS_NORMAL = 0x2,