From bc98f12798560cb9b4790611e6b6f024f076a990 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 12 Mar 2024 14:06:59 +0800 Subject: [PATCH] feat(tsdb): read complex primary key from buf. --- include/common/tdataformat.h | 2 +- source/dnode/vnode/src/tsdb/tsdbRead2.c | 34 ++++++++++++++++++---- source/dnode/vnode/src/tsdb/tsdbReadUtil.h | 4 ++- 3 files changed, 32 insertions(+), 8 deletions(-) diff --git a/include/common/tdataformat.h b/include/common/tdataformat.h index 0005e9027e..73e6837475 100644 --- a/include/common/tdataformat.h +++ b/include/common/tdataformat.h @@ -233,8 +233,8 @@ struct SValue { union { int64_t val; struct { - uint32_t nData; uint8_t *pData; + uint32_t nData; }; }; }; diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 8bc3177e63..9eefaf6a98 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -3339,9 +3339,18 @@ TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* p return NULL; } - TSDBROW* pRow = tsdbTbDataIterGet(pIter->iter); - TSDBKEY key = TSDBROW_KEY(pRow); int32_t order = pReader->info.order; + TSDBROW* pRow = tsdbTbDataIterGet(pIter->iter); + + if (!pReader->pkChecked) { + STsdbRowKey k; + tsdbRowGetKey(pRow, &k); + + pReader->pkComparFn = getComparFunc(k.key.pks[0].type, 0); + pReader->pkChecked = true; + } + + TSDBKEY key = TSDBROW_KEY(pRow); if (outOfTimeWindow(key.ts, &pReader->info.window)) { pIter->hasVal = false; return NULL; @@ -3519,6 +3528,20 @@ int32_t doMergeRowsInSttBlock(SSttBlockReader* pSttBlockReader, STableBlockScanI return TSDB_CODE_SUCCESS; } +static int32_t pkComp(STsdbReader* pReader, TSDBROW* p1, TSDBROW* p2) { + STsdbRowKey k1 = {0}, k2 = {0}; + + if (pReader->pkComparFn == NULL) { + ASSERT(TSDBROW_TS(p1) != TSDBROW_TS(p2)); + ASSERT(pReader->pkChecked); + return 0; + } + + tsdbRowGetKey(p1, &k1); + tsdbRowGetKey(p2, &k2); + return pReader->pkComparFn(&k1.key.pks[0].val, &k2.key.pks[0].val); +} + int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, TSDBROW* pResRow, STsdbReader* pReader, bool* freeTSRow) { TSDBROW* pNextRow = NULL; @@ -3539,7 +3562,7 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, return TSDB_CODE_SUCCESS; } - if (TSDBROW_TS(¤t) != TSDBROW_TS(pNextRow)) { + if (TSDBROW_TS(¤t) != TSDBROW_TS(pNextRow) || (pkComp(pReader, ¤t, pNextRow) != 0)) { *pResRow = current; *freeTSRow = false; return TSDB_CODE_SUCCESS; @@ -3685,7 +3708,7 @@ static int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbRea TSDBKEY ik = TSDBROW_KEY(piRow); int32_t code = TSDB_CODE_SUCCESS; - if (ik.ts != k.ts) { + if (ik.ts != k.ts || (pkComp(pReader, pRow, piRow) != 0)) { if (((ik.ts < k.ts) && asc) || ((ik.ts > k.ts) && (!asc))) { // ik.ts < k.ts code = doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, pResRow, pReader, freeTSRow); } else if (((k.ts < ik.ts) && asc) || ((k.ts > ik.ts) && (!asc))) { @@ -3704,8 +3727,7 @@ static int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbRea } if (pBlockScanInfo->iter.hasVal && pRow != NULL) { - return doMergeMemTableMultiRows(pRow, pBlockScanInfo->uid, &pBlockScanInfo->iter, pDelList, pResRow, pReader, - freeTSRow); + return doMergeMemTableMultiRows(pRow, uid, &pBlockScanInfo->iter, pDelList, pResRow, pReader, freeTSRow); } if (pBlockScanInfo->iiter.hasVal && piRow != NULL) { diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h index c27e9ebe04..7966a87500 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h @@ -266,7 +266,9 @@ struct STsdbReader { STsdbReader* innerReader[2]; bool bFilesetDelimited; // duration by duration output TsdReaderNotifyCbFn notifyFn; - void* notifyParam; + void* notifyParam; + __compar_fn_t pkComparFn; + bool pkChecked; }; typedef struct SBrinRecordIter {