feat(tsdb): read complex primary key from buf.

This commit is contained in:
Haojun Liao 2024-03-12 16:22:56 +08:00
parent 6189ebabef
commit 0315e5ac63
1 changed files with 76 additions and 53 deletions

View File

@ -42,7 +42,8 @@ static TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbRe
static int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader); static int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader);
static int32_t doMergeRowsInSttBlock(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts, static int32_t doMergeRowsInSttBlock(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts,
SRowMerger* pMerger, SVersionRange* pVerRange, const char* id); SRowMerger* pMerger, SVersionRange* pVerRange, const char* id);
static int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, STsdbReader* pReader); static int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, STsdbRowKey* pKey, SArray* pDelList,
STsdbReader* pReader);
static int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, SRow* pTSRow, static int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, SRow* pTSRow,
STableBlockScanInfo* pScanInfo); STableBlockScanInfo* pScanInfo);
static int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData, static int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData,
@ -79,6 +80,32 @@ static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWi
static void resetPreFilesetMemTableListIndex(SReaderStatus* pStatus); static void resetPreFilesetMemTableListIndex(SReaderStatus* pStatus);
static int32_t pkComp(STsdbReader* pReader, TSDBROW* p1, TSDBROW* p2) {
STsdbRowKey k1 = {0}, k2 = {0};
if (pReader->pkComparFn == NULL) {
ASSERT(TSDBROW_TS(p1) != TSDBROW_TS(p2));
ASSERT(pReader->pkChecked);
return 0;
}
tsdbRowGetKey(p1, &k1);
tsdbRowGetKey(p2, &k2);
return pReader->pkComparFn(&k1.key.pks[0].val, &k2.key.pks[0].val);
}
static int32_t pkComp1(STsdbReader* pReader, STsdbRowKey* p1, TSDBROW* p2) {
if (pReader->pkComparFn == NULL) {
ASSERT(p1->key.ts != TSDBROW_TS(p2));
ASSERT(pReader->pkChecked);
return 0;
}
STsdbRowKey k2 = {0};
tsdbRowGetKey(p2, &k2);
return pReader->pkComparFn(&p1->key.pks[0].val, &k2.key.pks[0].val);
}
static int32_t setColumnIdSlotList(SBlockLoadSuppInfo* pSupInfo, SColumnInfo* pCols, const int32_t* pSlotIdList, static int32_t setColumnIdSlotList(SBlockLoadSuppInfo* pSupInfo, SColumnInfo* pCols, const int32_t* pSlotIdList,
int32_t numOfCols) { int32_t numOfCols) {
pSupInfo->smaValid = true; pSupInfo->smaValid = true;
@ -1503,7 +1530,9 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
tsLast = getCurrentKeyInSttBlock(pSttBlockReader); tsLast = getCurrentKeyInSttBlock(pSttBlockReader);
} }
TSDBKEY k = TSDBROW_KEY(pRow); STsdbRowKey k;
tsdbRowGetKey(pRow, &k);
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
// merge is not initialized yet, due to the fact that the pReader->info.pSchema is not initialized // merge is not initialized yet, due to the fact that the pReader->info.pSchema is not initialized
@ -1522,8 +1551,8 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
minKey = tsLast; minKey = tsLast;
} }
if (minKey > k.ts) { if (minKey > k.key.ts) {
minKey = k.ts; minKey = k.key.ts;
} }
if (minKey > key && hasDataInFileBlock(pBlockData, pDumpInfo)) { if (minKey > key && hasDataInFileBlock(pBlockData, pDumpInfo)) {
@ -1535,8 +1564,8 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
minKey = tsLast; minKey = tsLast;
} }
if (minKey < k.ts) { if (minKey < k.key.ts) {
minKey = k.ts; minKey = k.key.ts;
} }
if (minKey < key && hasDataInFileBlock(pBlockData, pDumpInfo)) { if (minKey < key && hasDataInFileBlock(pBlockData, pDumpInfo)) {
@ -1564,7 +1593,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, pReader->idStr); doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, pReader->idStr);
} }
if (minKey == k.ts) { if (minKey == k.key.ts) {
STSchema* pTSchema = NULL; STSchema* pTSchema = NULL;
if (pRow->type == TSDBROW_ROW_FMT) { if (pRow->type == TSDBROW_ROW_FMT) {
pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid); pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
@ -1578,13 +1607,13 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
return code; return code;
} }
code = doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, pReader); code = doMergeRowsInBuf(pIter, pBlockScanInfo->uid, &k, pBlockScanInfo->delSkyline, pReader);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
} }
} else { } else {
if (minKey == k.ts) { if (minKey == k.key.ts) {
STSchema* pTSchema = NULL; STSchema* pTSchema = NULL;
if (pRow->type == TSDBROW_ROW_FMT) { if (pRow->type == TSDBROW_ROW_FMT) {
pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid); pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
@ -1598,7 +1627,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
return code; return code;
} }
code = doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, pReader); code = doMergeRowsInBuf(pIter, pBlockScanInfo->uid, &k, pBlockScanInfo->delSkyline, pReader);
if (code != TSDB_CODE_SUCCESS || pMerger->pTSchema == NULL) { if (code != TSDB_CODE_SUCCESS || pMerger->pTSchema == NULL) {
return code; return code;
} }
@ -1746,8 +1775,9 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
int64_t key = hasDataInFileBlock(pBlockData, pDumpInfo) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] : INT64_MIN; int64_t key = hasDataInFileBlock(pBlockData, pDumpInfo) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] : INT64_MIN;
TSDBKEY k = TSDBROW_KEY(pRow); STsdbRowKey k, ik;
TSDBKEY ik = TSDBROW_KEY(piRow); tsdbRowGetKey(pRow, &k);
tsdbRowGetKey(piRow, &ik);
STSchema* pSchema = NULL; STSchema* pSchema = NULL;
if (pRow->type == TSDBROW_ROW_FMT) { if (pRow->type == TSDBROW_ROW_FMT) {
@ -1777,12 +1807,12 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
int64_t minKey = 0; int64_t minKey = 0;
if (ASCENDING_TRAVERSE(pReader->info.order)) { if (ASCENDING_TRAVERSE(pReader->info.order)) {
minKey = INT64_MAX; // let's find the minimum minKey = INT64_MAX; // let's find the minimum
if (minKey > k.ts) { if (minKey > k.key.ts) {
minKey = k.ts; minKey = k.key.ts;
} }
if (minKey > ik.ts) { if (minKey > ik.key.ts) {
minKey = ik.ts; minKey = ik.key.ts;
} }
if (minKey > key && hasDataInFileBlock(pBlockData, pDumpInfo)) { if (minKey > key && hasDataInFileBlock(pBlockData, pDumpInfo)) {
@ -1794,12 +1824,12 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
} }
} else { } else {
minKey = INT64_MIN; // let find the maximum ts value minKey = INT64_MIN; // let find the maximum ts value
if (minKey < k.ts) { if (minKey < k.key.ts) {
minKey = k.ts; minKey = k.key.ts;
} }
if (minKey < ik.ts) { if (minKey < ik.key.ts) {
minKey = ik.ts; minKey = ik.key.ts;
} }
if (minKey < key && hasDataInFileBlock(pBlockData, pDumpInfo)) { if (minKey < key && hasDataInFileBlock(pBlockData, pDumpInfo)) {
@ -1834,49 +1864,49 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, pReader->idStr); doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, pReader->idStr);
} }
if (minKey == ik.ts) { if (minKey == ik.key.ts) {
code = tsdbRowMergerAdd(pMerger, piRow, piSchema); code = tsdbRowMergerAdd(pMerger, piRow, piSchema);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, pReader); code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, &ik, pBlockScanInfo->delSkyline, pReader);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
} }
if (minKey == k.ts) { if (minKey == k.key.ts) {
code = tsdbRowMergerAdd(pMerger, pRow, pSchema); code = tsdbRowMergerAdd(pMerger, pRow, pSchema);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, pReader); code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, &k, pBlockScanInfo->delSkyline, pReader);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
} }
} else { } else {
if (minKey == k.ts) { if (minKey == k.key.ts) {
code = tsdbRowMergerAdd(pMerger, pRow, pSchema); code = tsdbRowMergerAdd(pMerger, pRow, pSchema);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, pReader); code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, &k, pBlockScanInfo->delSkyline, pReader);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
} }
if (minKey == ik.ts) { if (minKey == ik.key.ts) {
code = tsdbRowMergerAdd(pMerger, piRow, piSchema); code = tsdbRowMergerAdd(pMerger, piRow, piSchema);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, pReader); code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, &ik, pBlockScanInfo->delSkyline, pReader);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
@ -3395,7 +3425,7 @@ TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* p
} }
} }
int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, STsdbReader* pReader) { int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, STsdbRowKey *pCurKey, SArray* pDelList, STsdbReader* pReader) {
SRowMerger* pMerger = &pReader->status.merger; SRowMerger* pMerger = &pReader->status.merger;
while (1) { while (1) {
@ -3411,8 +3441,11 @@ int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDe
} }
// ts is not identical, quit // ts is not identical, quit
TSDBKEY k = TSDBROW_KEY(pRow); if (TSDBROW_TS(pRow) != pCurKey->key.ts) {
if (k.ts != ts) { break;
}
if (pkComp1(pReader, pCurKey, pRow) != 0) {
break; break;
} }
@ -3528,25 +3561,14 @@ int32_t doMergeRowsInSttBlock(SSttBlockReader* pSttBlockReader, STableBlockScanI
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t pkComp(STsdbReader* pReader, TSDBROW* p1, TSDBROW* p2) {
STsdbRowKey k1 = {0}, k2 = {0};
if (pReader->pkComparFn == NULL) {
ASSERT(TSDBROW_TS(p1) != TSDBROW_TS(p2));
ASSERT(pReader->pkChecked);
return 0;
}
tsdbRowGetKey(p1, &k1);
tsdbRowGetKey(p2, &k2);
return pReader->pkComparFn(&k1.key.pks[0].val, &k2.key.pks[0].val);
}
int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, TSDBROW* pResRow, int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, TSDBROW* pResRow,
STsdbReader* pReader, bool* freeTSRow) { STsdbReader* pReader, bool* freeTSRow) {
TSDBROW* pNextRow = NULL; TSDBROW* pNextRow = NULL;
TSDBROW current = *pRow; TSDBROW current = *pRow;
STsdbRowKey curKey = {0};
tsdbRowGetKey(&current, &curKey);
{ // if the timestamp of the next valid row has a different ts, return current row directly { // if the timestamp of the next valid row has a different ts, return current row directly
pIter->hasVal = tsdbTbDataIterNext(pIter->iter); pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
@ -3562,7 +3584,7 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter,
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (TSDBROW_TS(&current) != TSDBROW_TS(pNextRow) || (pkComp(pReader, &current, pNextRow) != 0)) { if (TSDBROW_TS(&current) != TSDBROW_TS(pNextRow) || (pkComp1(pReader, &curKey, pNextRow) != 0)) {
*pResRow = current; *pResRow = current;
*freeTSRow = false; *freeTSRow = false;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -3600,7 +3622,7 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter,
return code; return code;
} }
code = doMergeRowsInBuf(pIter, uid, TSDBROW_TS(&current), pDelList, pReader); code = doMergeRowsInBuf(pIter, uid, &curKey, pDelList, pReader);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
@ -3621,8 +3643,9 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p
SRow** pTSRow) { SRow** pTSRow) {
SRowMerger* pMerger = &pReader->status.merger; SRowMerger* pMerger = &pReader->status.merger;
TSDBKEY k = TSDBROW_KEY(pRow); STsdbRowKey k, ik;
TSDBKEY ik = TSDBROW_KEY(piRow); tsdbRowGetKey(pRow, &k);
tsdbRowGetKey(piRow, &ik);
STSchema* pSchema = NULL; STSchema* pSchema = NULL;
if (pRow->type == TSDBROW_ROW_FMT) { if (pRow->type == TSDBROW_ROW_FMT) {
@ -3646,13 +3669,13 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p
return code; return code;
} }
code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, pReader); code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, &ik, pBlockScanInfo->delSkyline, pReader);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
tsdbRowMergerAdd(&pReader->status.merger, pRow, pSchema); tsdbRowMergerAdd(&pReader->status.merger, pRow, pSchema);
code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, pReader); code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, &k, pBlockScanInfo->delSkyline, pReader);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
@ -3663,13 +3686,13 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p
return code; return code;
} }
code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, pReader); code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, &k, pBlockScanInfo->delSkyline, pReader);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
tsdbRowMergerAdd(&pReader->status.merger, piRow, piSchema); tsdbRowMergerAdd(&pReader->status.merger, piRow, piSchema);
code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, pReader); code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, &ik, pBlockScanInfo->delSkyline, pReader);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }