feat(tsdb): read complex primary key from buf.

This commit is contained in:
Haojun Liao 2024-03-12 14:06:59 +08:00
parent 3d64b10350
commit bc98f12798
3 changed files with 32 additions and 8 deletions

View File

@ -233,8 +233,8 @@ struct SValue {
union {
int64_t val;
struct {
uint32_t nData;
uint8_t *pData;
uint32_t nData;
};
};
};

View File

@ -3339,9 +3339,18 @@ TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* p
return NULL;
}
TSDBROW* pRow = tsdbTbDataIterGet(pIter->iter);
TSDBKEY key = TSDBROW_KEY(pRow);
int32_t order = pReader->info.order;
TSDBROW* pRow = tsdbTbDataIterGet(pIter->iter);
if (!pReader->pkChecked) {
STsdbRowKey k;
tsdbRowGetKey(pRow, &k);
pReader->pkComparFn = getComparFunc(k.key.pks[0].type, 0);
pReader->pkChecked = true;
}
TSDBKEY key = TSDBROW_KEY(pRow);
if (outOfTimeWindow(key.ts, &pReader->info.window)) {
pIter->hasVal = false;
return NULL;
@ -3519,6 +3528,20 @@ int32_t doMergeRowsInSttBlock(SSttBlockReader* pSttBlockReader, STableBlockScanI
return TSDB_CODE_SUCCESS;
}
static int32_t pkComp(STsdbReader* pReader, TSDBROW* p1, TSDBROW* p2) {
STsdbRowKey k1 = {0}, k2 = {0};
if (pReader->pkComparFn == NULL) {
ASSERT(TSDBROW_TS(p1) != TSDBROW_TS(p2));
ASSERT(pReader->pkChecked);
return 0;
}
tsdbRowGetKey(p1, &k1);
tsdbRowGetKey(p2, &k2);
return pReader->pkComparFn(&k1.key.pks[0].val, &k2.key.pks[0].val);
}
int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, TSDBROW* pResRow,
STsdbReader* pReader, bool* freeTSRow) {
TSDBROW* pNextRow = NULL;
@ -3539,7 +3562,7 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter,
return TSDB_CODE_SUCCESS;
}
if (TSDBROW_TS(&current) != TSDBROW_TS(pNextRow)) {
if (TSDBROW_TS(&current) != TSDBROW_TS(pNextRow) || (pkComp(pReader, &current, pNextRow) != 0)) {
*pResRow = current;
*freeTSRow = false;
return TSDB_CODE_SUCCESS;
@ -3685,7 +3708,7 @@ static int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbRea
TSDBKEY ik = TSDBROW_KEY(piRow);
int32_t code = TSDB_CODE_SUCCESS;
if (ik.ts != k.ts) {
if (ik.ts != k.ts || (pkComp(pReader, pRow, piRow) != 0)) {
if (((ik.ts < k.ts) && asc) || ((ik.ts > k.ts) && (!asc))) { // ik.ts < k.ts
code = doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, pResRow, pReader, freeTSRow);
} else if (((k.ts < ik.ts) && asc) || ((k.ts > ik.ts) && (!asc))) {
@ -3704,8 +3727,7 @@ static int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbRea
}
if (pBlockScanInfo->iter.hasVal && pRow != NULL) {
return doMergeMemTableMultiRows(pRow, pBlockScanInfo->uid, &pBlockScanInfo->iter, pDelList, pResRow, pReader,
freeTSRow);
return doMergeMemTableMultiRows(pRow, uid, &pBlockScanInfo->iter, pDelList, pResRow, pReader, freeTSRow);
}
if (pBlockScanInfo->iiter.hasVal && piRow != NULL) {

View File

@ -266,7 +266,9 @@ struct STsdbReader {
STsdbReader* innerReader[2];
bool bFilesetDelimited; // duration by duration output
TsdReaderNotifyCbFn notifyFn;
void* notifyParam;
void* notifyParam;
__compar_fn_t pkComparFn;
bool pkChecked;
};
typedef struct SBrinRecordIter {