diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index cf4392e51c..7295e9849e 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -125,6 +125,9 @@ int32_t tsdbRowCompare(const void *p1, const void *p2); int32_t tsdbRowCompareWithoutVersion(const void *p1, const void *p2); int32_t tsdbRowKeyCmpr(const STsdbRowKey *key1, const STsdbRowKey *key2); void tsdbRowGetKey(TSDBROW *row, STsdbRowKey *key); +void tsdbColRowGetKey(SBlockData *pBlock, int32_t irow, STsdbRowKey *key); +int32_t tsdbRowKeyAssign(STsdbRowKey *pDst, STsdbRowKey *pSrc); + // STSDBRowIter int32_t tsdbRowIterOpen(STSDBRowIter *pIter, TSDBROW *pRow, STSchema *pTSchema); void tsdbRowClose(STSDBRowIter *pIter); diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 834971b087..106269ad5c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -23,7 +23,7 @@ #include "tsimplehash.h" #define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC) -#define getCurrentKeyInSttBlock(_r) ((_r)->currentKey) +#define getCurrentKeyInSttBlock(_r) (&((_r)->currentKey)) typedef struct { bool overlapWithNeighborBlock; @@ -40,9 +40,9 @@ static int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, i STsdbReader* pReader); static TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader); static int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader); -static int32_t doMergeRowsInSttBlock(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts, - SRowMerger* pMerger, SVersionRange* pVerRange, const char* id); -static int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, STsdbRowKey* pKey, SArray* pDelList, +static int32_t doMergeRowsInSttBlock(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, + STsdbRowKey* pRowKey, SRowMerger* pMerger, SVersionRange* pVerRange, const char* id); +static int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, STsdbRowKey* pCurKey, SArray* pDelList, STsdbReader* pReader); static int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, SRow* pTSRow, STableBlockScanInfo* pScanInfo); @@ -106,6 +106,10 @@ static int32_t pkComp1(STsdbReader* pReader, STsdbRowKey* p1, TSDBROW* p2) { return pReader->pkComparFn(&p1->key.pks[0].val, &k2.key.pks[0].val); } +static int32_t pkComp2(STsdbReader* pReader, STsdbRowKey* p1, STsdbRowKey* p2) { + return pReader->pkComparFn(&p1->key.pks[0].val, &p2->key.pks[0].val); +} + static int32_t setColumnIdSlotList(SBlockLoadSuppInfo* pSupInfo, SColumnInfo* pCols, const int32_t* pSlotIdList, int32_t numOfCols) { pSupInfo->smaValid = true; @@ -195,6 +199,7 @@ static int32_t initFilesetIterator(SFilesetIter* pIter, TFileSetArray* pFileSetA pLReader->order = pReader->info.order; pLReader->window = pReader->info.window; pLReader->verRange = pReader->info.verRange; + pLReader->numOfPks = pReader->numOfPks; pLReader->uid = 0; tMergeTreeClose(&pLReader->mergeTree); @@ -427,6 +432,8 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, void pReader->type = pCond->type; pReader->info.window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows); pReader->blockInfoBuf.numPerBucket = 1000; // 1000 tables per bucket + pReader->numOfPks = -1; + pReader->pkChecked = false; code = initResBlockInfo(&pReader->resBlockInfo, capacity, pResBlock, pCond); if (code != TSDB_CODE_SUCCESS) { @@ -565,7 +572,8 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN int32_t k = 0; int32_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap); - int32_t step = ASCENDING_TRAVERSE(pReader->info.order) ? 1 : -1; + bool asc = ASCENDING_TRAVERSE(pReader->info.order); + int32_t step = asc ? 1 : -1; STimeWindow w = pReader->info.window; SBrinRecord* pRecord = NULL; @@ -601,10 +609,12 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN ASSERT(pRecord->suid == pReader->info.suid && uid == pRecord->uid); STableBlockScanInfo* pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, uid, pReader->idStr); - if (ASCENDING_TRAVERSE(pReader->info.order)) { - w.skey = pScanInfo->lastProcKey + step; + + // todo: here we should find the first timestamp that is greater than the lastProcKey + if (asc) { + w.skey = pScanInfo->lastProcKey.key.ts + step; } else { - w.ekey = pScanInfo->lastProcKey + step; + w.ekey = pScanInfo->lastProcKey.key.ts + step; } if (isEmptyQueryTimeWindow(&w)) { @@ -672,10 +682,12 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN return TSDB_CODE_SUCCESS; } +// todo keep the the last returned key static void setBlockAllDumped(SFileBlockDumpInfo* pDumpInfo, int64_t maxKey, int32_t order) { - int32_t step = ASCENDING_TRAVERSE(order) ? 1 : -1; +// int32_t step = ASCENDING_TRAVERSE(order) ? 1 : -1; pDumpInfo->allDumped = true; - pDumpInfo->lastKey = maxKey + step; +// ASSERT(0); +// pDumpInfo->lastKey.key.ts = maxKey + step; } static int32_t doCopyColVal(SColumnInfoData* pColInfoData, int32_t rowIndex, int32_t colIndex, SColVal* pColVal, @@ -933,7 +945,7 @@ static void blockInfoToRecord(SBrinRecord* record, SFileDataBlockInfo* pBlockInf record->count = pBlockInfo->count; } -static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) { +static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STsdbRowKey* pLastProcKey) { SReaderStatus* pStatus = &pReader->status; SDataBlockIter* pBlockIter = &pStatus->blockIter; SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo; @@ -1021,7 +1033,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) { int32_t dumpedRows = asc ? (endIndex - pDumpInfo->rowIndex) : (pDumpInfo->rowIndex - endIndex); if (dumpedRows > pReader->resBlockInfo.capacity) { // output buffer check dumpedRows = pReader->resBlockInfo.capacity; - } else if (dumpedRows <= 0) { // no qualified rows in current data block, abort directly. + } else if (dumpedRows <= 0) { // no qualified rows in current data block, quit directly. setBlockAllDumped(pDumpInfo, pReader->info.window.ekey, pReader->info.order); return TSDB_CODE_SUCCESS; } @@ -1082,11 +1094,14 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) { pResBlock->info.rows = dumpedRows; pDumpInfo->rowIndex += step * dumpedRows; + tsdbColRowGetKey(pBlockData, pDumpInfo->rowIndex - step, pLastProcKey); + // check if current block are all handled if (pDumpInfo->rowIndex >= 0 && pDumpInfo->rowIndex < pRecord->numRow) { int64_t ts = pBlockData->aTSKEY[pDumpInfo->rowIndex]; + + // the remain data has out of query time window, ignore current block if (outOfTimeWindow(ts, &pReader->info.window)) { - // the remain data has out of query time window, ignore current block setBlockAllDumped(pDumpInfo, ts, pReader->info.order); } } else { @@ -1316,6 +1331,7 @@ static void getBlockToLoadInfo(SDataBlockToLoadInfo* pInfo, SFileDataBlockInfo* SBrinRecord pRecord; blockInfoToRecord(&pRecord, pBlockInfo); + // has duplicated ts of different version in this block pInfo->hasDupTs = (pBlockInfo->numRow > pBlockInfo->count) || (pBlockInfo->count <= 0); pInfo->overlapWithDelInfo = overlapWithDelSkyline(pScanInfo, &pRecord, pReader->info.order); @@ -1399,10 +1415,19 @@ static bool tryCopyDistinctRowFromFileBlock(STsdbReader* pReader, SBlockData* pB *copied = false; bool asc = (pReader->info.order == TSDB_ORDER_ASC); if ((pDumpInfo->rowIndex < pDumpInfo->totalRows - 1 && asc) || (pDumpInfo->rowIndex > 0 && (!asc))) { - int32_t step = pReader->info.order == TSDB_ORDER_ASC ? 1 : -1; + int32_t step = ASCENDING_TRAVERSE(pReader->info.order)? 1 : -1; - int64_t nextKey = pBlockData->aTSKEY[pDumpInfo->rowIndex + step]; - if (nextKey != key) { // merge is not needed + STsdbRowKey rowKey, nextRowKey; + tsdbColRowGetKey(pBlockData, pDumpInfo->rowIndex, &rowKey); + tsdbColRowGetKey(pBlockData, pDumpInfo->rowIndex + step, &nextRowKey); + + if (!pReader->pkChecked) { + pReader->pkComparFn = getComparFunc(rowKey.key.pks[0].type, 0); + pReader->numOfPks = rowKey.key.numOfPKs; + pReader->pkChecked = true; + } + + if (rowKey.key.ts != nextRowKey.key.ts || (pkComp2(pReader, &rowKey, &nextRowKey) != 0)) { // merge is not needed code = doAppendRowFromFileBlock(pReader->resBlockInfo.pResBlock, pReader, pBlockData, pDumpInfo->rowIndex); if (code) { return code; @@ -1424,6 +1449,8 @@ static bool nextRowFromSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockSc bool hasVal = tMergeTreeNext(&pSttBlockReader->mergeTree); if (!hasVal) { // the next value will be the accessed key in stt pScanInfo->sttKeyInfo.status = STT_FILE_NO_DATA; + + // next file, the timestamps in the next file must be greater than those in current pScanInfo->sttKeyInfo.nextProcKey += step; return false; } @@ -1432,8 +1459,27 @@ static bool nextRowFromSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockSc int64_t key = pRow->pBlockData->aTSKEY[pRow->iRow]; int64_t ver = pRow->pBlockData->aVersion[pRow->iRow]; - pSttBlockReader->currentKey = key; - pScanInfo->sttKeyInfo.nextProcKey = key; + if (pSttBlockReader->numOfPks < 0) {// todo handle the deep copy problem + tsdbColRowGetKey(pRow->pBlockData, pRow->iRow, &pSttBlockReader->currentKey); + pSttBlockReader->numOfPks = pSttBlockReader->currentKey.key.numOfPKs; + if (pSttBlockReader->numOfPks > 0) { + pSttBlockReader->pkComparFn = getComparFunc(pSttBlockReader->currentKey.key.pks[0].type, 0); + } + + pScanInfo->sttKeyInfo.nextProcKey = key; + } else { + if (pSttBlockReader->numOfPks == 0) { + pSttBlockReader->currentKey.key.ts = key; + pSttBlockReader->currentKey.version = ver; + + // todo handle error + pScanInfo->sttKeyInfo.nextProcKey = key; + } else { + // todo handle the deep copy problem + tsdbColRowGetKey(pRow->pBlockData, pRow->iRow, &pSttBlockReader->currentKey); + pScanInfo->sttKeyInfo.nextProcKey = key; + } + } if (pScanInfo->delSkyline != NULL && TARRAY_SIZE(pScanInfo->delSkyline) > 0) { if (!hasBeenDropped(pScanInfo->delSkyline, &pScanInfo->sttBlockDelIndex, key, ver, order, pVerRange)) { @@ -1451,8 +1497,22 @@ static void doPinSttBlock(SSttBlockReader* pSttBlockReader) { tMergeTreePinSttBl static void doUnpinSttBlock(SSttBlockReader* pSttBlockReader) { tMergeTreeUnpinSttBlock(&pSttBlockReader->mergeTree); } +static int32_t pkCompEx(__compar_fn_t comparFn, STsdbRowKey* p1, STsdbRowKey* p2) { + if (p1->key.ts < p2->key.ts) { + return -1; + } else if (p1->key.ts > p2->key.ts) { + return 1; + } + + if (p1->key.numOfPKs == 0) { + return 0; + } else { + return comparFn(p1->key.pks[0].pData, p2->key.pks[0].pData); + } +} + static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SSttBlockReader* pSttBlockReader, - STableBlockScanInfo* pScanInfo, int64_t ts, STsdbReader* pReader, + STableBlockScanInfo* pScanInfo, STsdbRowKey* pSttKey, STsdbReader* pReader, bool* copied) { int32_t code = TSDB_CODE_SUCCESS; *copied = false; @@ -1462,18 +1522,18 @@ static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SSttBlockReader* pSttB bool hasVal = nextRowFromSttBlocks(pSttBlockReader, pScanInfo, &pReader->info.verRange); doUnpinSttBlock(pSttBlockReader); if (hasVal) { - STsdbRowKey key, nextKey; - tsdbRowGetKey(fRow, &key); + STsdbRowKey nextKey; TSDBROW* pNextRow = tMergeTreeGetRow(&pSttBlockReader->mergeTree); tsdbRowGetKey(pNextRow, &nextKey); if (!pReader->pkChecked) { - pReader->pkComparFn = getComparFunc(key.key.pks[0].type, 0); + pReader->pkComparFn = getComparFunc(pSttKey->key.pks[0].type, 0); pReader->pkChecked = true; + pReader->numOfPks = pSttKey->key.numOfPKs; } - if (nextKey.key.ts != ts || (pkComp(pReader, fRow, pNextRow) != 0)) { + if (pkCompEx(pReader->pkComparFn, pSttKey, &nextKey) != 0) { code = doAppendRowFromFileBlock(pReader->resBlockInfo.pResBlock, pReader, fRow->pBlockData, fRow->iRow); if (code) { return code; @@ -1534,15 +1594,31 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* SRow* pTSRow = NULL; SBlockData* pBlockData = &pReader->status.fileBlockData; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; + __compar_fn_t compFn = pReader->pkComparFn; - int64_t tsLast = INT64_MIN; + STsdbRowKey* pSttKey = NULL; if (hasDataInSttBlock(pBlockScanInfo) && (!pBlockScanInfo->cleanSttBlocks)) { - tsLast = getCurrentKeyInSttBlock(pSttBlockReader); + pSttKey = getCurrentKeyInSttBlock(pSttBlockReader); } STsdbRowKey k; tsdbRowGetKey(pRow, &k); + STSchema* pSchema = NULL; + if (pRow->type == TSDBROW_ROW_FMT) { + pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid); + if (pSchema == NULL) { + return terrno; + } + } + + STsdbRowKey* pfKey = &(STsdbRowKey){0}; + if (hasDataInFileBlock(pBlockData, pDumpInfo)) { + tsdbColRowGetKey(pBlockData, pDumpInfo->rowIndex, pfKey); + } else { + pfKey = NULL; + } + TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); // merge is not initialized yet, due to the fact that the pReader->info.pSchema is not initialized @@ -1554,65 +1630,53 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* } } - int64_t minKey = 0; +// int64_t minKey = 0; + STsdbRowKey minKey; if (pReader->info.order == TSDB_ORDER_ASC) { - minKey = INT64_MAX; // chosen the minimum value - if (minKey > tsLast && hasDataInSttBlock(pBlockScanInfo) && (!pBlockScanInfo->cleanSttBlocks)) { - minKey = tsLast; + minKey = k; // chosen the minimum value + + if (hasDataInFileBlock(pBlockData, pDumpInfo) && (pkCompEx(compFn, pfKey, &minKey) < 0)) { + minKey = *pfKey; } - if (minKey > k.key.ts) { - minKey = k.key.ts; - } - - if (minKey > key && hasDataInFileBlock(pBlockData, pDumpInfo)) { - minKey = key; + if (pSttKey != NULL && (pkCompEx(compFn, pSttKey, &minKey) < 0)) { + minKey = *pSttKey; } } else { - minKey = INT64_MIN; - if (minKey < tsLast && hasDataInSttBlock(pBlockScanInfo) && (!pBlockScanInfo->cleanSttBlocks)) { - minKey = tsLast; + minKey = k; + + if (hasDataInFileBlock(pBlockData, pDumpInfo) && (pkCompEx(compFn, pfKey, &minKey) > 0)) { + minKey = *pfKey; } - if (minKey < k.key.ts) { - minKey = k.key.ts; - } - - if (minKey < key && hasDataInFileBlock(pBlockData, pDumpInfo)) { - minKey = key; + if (pSttKey != NULL && (pkCompEx(compFn, pSttKey, &minKey) > 0)) { + minKey = *pSttKey; } } // ASC: file block ---> last block -----> imem -----> mem // DESC: mem -----> imem -----> last block -----> file block if (pReader->info.order == TSDB_ORDER_ASC) { - if (minKey == key) { + if (pkCompEx(compFn, &minKey, pfKey) == 0) { int32_t code = tsdbRowMergerAdd(pMerger, &fRow, NULL); if (code != TSDB_CODE_SUCCESS) { return code; } + doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader); } - if (minKey == tsLast) { + if (pkCompEx(compFn, &minKey, pSttKey) == 0) { TSDBROW* fRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree); int32_t code = tsdbRowMergerAdd(pMerger, fRow1, NULL); if (code != TSDB_CODE_SUCCESS) { return code; } - doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, pReader->idStr); + doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, pSttKey, pMerger, &pReader->info.verRange, pReader->idStr); } - if (minKey == k.key.ts) { - STSchema* pTSchema = NULL; - if (pRow->type == TSDBROW_ROW_FMT) { - pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid); - if (pTSchema == NULL) { - return terrno; - } - } - - int32_t code = tsdbRowMergerAdd(pMerger, pRow, pTSchema); + if (pkCompEx(compFn, &minKey, &k) == 0) { + int32_t code = tsdbRowMergerAdd(pMerger, pRow, pSchema); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -1623,16 +1687,8 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* } } } else { - if (minKey == k.key.ts) { - STSchema* pTSchema = NULL; - if (pRow->type == TSDBROW_ROW_FMT) { - pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid); - if (pTSchema == NULL) { - return terrno; - } - } - - int32_t code = tsdbRowMergerAdd(pMerger, pRow, pTSchema); + if (pkCompEx(compFn, &minKey, &k) == 0) { + int32_t code = tsdbRowMergerAdd(pMerger, pRow, pSchema); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -1643,16 +1699,16 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* } } - if (minKey == tsLast) { + if (pkCompEx(compFn, &minKey, pSttKey) == 0) { TSDBROW* fRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree); int32_t code = tsdbRowMergerAdd(pMerger, fRow1, NULL); if (code != TSDB_CODE_SUCCESS) { return code; } - doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, pReader->idStr); + doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, pSttKey, pMerger, &pReader->info.verRange, pReader->idStr); } - if (minKey == key) { + if (pkCompEx(compFn, &minKey, pfKey) == 0) { int32_t code = tsdbRowMergerAdd(pMerger, &fRow, NULL); if (code != TSDB_CODE_SUCCESS) { return code; @@ -1691,11 +1747,12 @@ static int32_t mergeFileBlockAndSttBlock(STsdbReader* pReader, SSttBlockReader* bool dataInDataFile = hasDataInFileBlock(pBlockData, pDumpInfo); bool dataInSttFile = hasDataInSttBlock(pBlockScanInfo); + if (dataInDataFile && (!dataInSttFile)) { // no stt file block available, only data block exists return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader); } else if ((!dataInDataFile) && dataInSttFile) { - // no data ile block exists + // no data in data file exists return mergeRowsInSttBlocks(pSttBlockReader, pBlockScanInfo, pReader); } else if (pBlockScanInfo->cleanSttBlocks && pReader->info.execMode == READER_EXEC_ROWS) { // opt model for count data in stt file, which is not overlap with data blocks in files. @@ -1703,22 +1760,49 @@ static int32_t mergeFileBlockAndSttBlock(STsdbReader* pReader, SSttBlockReader* } else { // row in both stt file blocks and data file blocks TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); - int64_t tsLast = getCurrentKeyInSttBlock(pSttBlockReader); + STsdbRowKey* pSttKey = getCurrentKeyInSttBlock(pSttBlockReader); + if (ASCENDING_TRAVERSE(pReader->info.order)) { - if (key < tsLast) { // asc + if (key < pSttKey->key.ts) { // asc return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader); - } else if (key > tsLast) { + } else if (key > pSttKey->key.ts) { return mergeRowsInSttBlocks(pSttBlockReader, pBlockScanInfo, pReader); } + + // key == tsLast. ts is equal and the primary key exists + if (pSttBlockReader->numOfPks > 0) { + if (!pReader->pkChecked) { + pReader->numOfPks = pSttBlockReader->numOfPks; + pReader->pkComparFn = pSttBlockReader->pkComparFn; + pReader->pkChecked = true; + } + + int32_t res = pkComp1(pReader, pSttKey, &fRow); + if (res > 0) { + return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader); + } else if (res < 0) { + return mergeRowsInSttBlocks(pSttBlockReader, pBlockScanInfo, pReader); + } + } } else { // desc - if (key > tsLast) { + if (key > pSttKey->key.ts) { return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader); - } else if (key < tsLast) { + } else if (key < pSttKey->key.ts) { return mergeRowsInSttBlocks(pSttBlockReader, pBlockScanInfo, pReader); } + + // key == tsLast. ts is equal and the primary key exists + if (pReader->numOfPks > 0) { + int32_t res = pkComp1(pReader, pSttKey, &fRow); + if (res < 0) { + return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader); + } else if (res > 0) { + return mergeRowsInSttBlocks(pSttBlockReader, pBlockScanInfo, pReader); + } + } } - // the following for key == tsLast + // the following for key == sttKey->key.ts // ASC: file block ------> stt block // DESC: stt block ------> file block SRow* pTSRow = NULL; @@ -1736,7 +1820,7 @@ static int32_t mergeFileBlockAndSttBlock(STsdbReader* pReader, SSttBlockReader* return code; } - doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, pReader->idStr); + doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, pSttKey, pMerger, &pReader->info.verRange, pReader->idStr); } else { TSDBROW* pRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree); code = tsdbRowMergerAdd(pMerger, pRow1, NULL); @@ -1744,7 +1828,7 @@ static int32_t mergeFileBlockAndSttBlock(STsdbReader* pReader, SSttBlockReader* return code; } - doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, pReader->idStr); + doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, pSttKey, pMerger, &pReader->info.verRange, pReader->idStr); code = tsdbRowMergerAdd(pMerger, &fRow, NULL); if (code != TSDB_CODE_SUCCESS) { @@ -1774,16 +1858,22 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* int32_t code = TSDB_CODE_SUCCESS; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SArray* pDelList = pBlockScanInfo->delSkyline; + __compar_fn_t compFn = pReader->pkComparFn; TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pDelList, pReader); TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pDelList, pReader); - int64_t tsLast = INT64_MIN; + STsdbRowKey* pSttKey = NULL; if (hasDataInSttBlock(pBlockScanInfo) && (!pBlockScanInfo->cleanSttBlocks)) { - tsLast = getCurrentKeyInSttBlock(pSttBlockReader); + pSttKey = getCurrentKeyInSttBlock(pSttBlockReader); } - int64_t key = hasDataInFileBlock(pBlockData, pDumpInfo) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] : INT64_MIN; + STsdbRowKey* pfKey = &(STsdbRowKey){0}; + if (hasDataInFileBlock(pBlockData, pDumpInfo)) { + tsdbColRowGetKey(pBlockData, pDumpInfo->rowIndex, pfKey); + } else { + pfKey = NULL; + } STsdbRowKey k, ik; tsdbRowGetKey(pRow, &k); @@ -1814,47 +1904,40 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* } } - int64_t minKey = 0; + STsdbRowKey minKey; if (ASCENDING_TRAVERSE(pReader->info.order)) { - minKey = INT64_MAX; // let's find the minimum - if (minKey > k.key.ts) { - minKey = k.key.ts; + minKey = k; // let's find the minimum + + if (pkCompEx(compFn, &ik, &minKey) < 0) {//minKey > ik.key.ts) { + minKey = ik; } - if (minKey > ik.key.ts) { - minKey = ik.key.ts; + if (hasDataInFileBlock(pBlockData, pDumpInfo) && (pkCompEx(compFn, pfKey, &minKey) < 0)) { + minKey = *pfKey; } - if (minKey > key && hasDataInFileBlock(pBlockData, pDumpInfo)) { - minKey = key; - } - - if (minKey > tsLast && hasDataInSttBlock(pBlockScanInfo) && (!pBlockScanInfo->cleanSttBlocks)) { - minKey = tsLast; + if (pSttKey != NULL && (pkCompEx(compFn, pSttKey, &minKey) < 0)) { + minKey = *pSttKey; } } else { - minKey = INT64_MIN; // let find the maximum ts value - if (minKey < k.key.ts) { - minKey = k.key.ts; + minKey = k; // let find the maximum ts value + if (pkCompEx(compFn, &ik, &minKey) > 0) { + minKey = ik; } - if (minKey < ik.key.ts) { - minKey = ik.key.ts; + if (hasDataInFileBlock(pBlockData, pDumpInfo) && (pkCompEx(compFn, pfKey, &minKey) > 0)) { + minKey = *pfKey; } - if (minKey < key && hasDataInFileBlock(pBlockData, pDumpInfo)) { - minKey = key; - } - - if (minKey < tsLast && hasDataInSttBlock(pBlockScanInfo) && (!pBlockScanInfo->cleanSttBlocks)) { - minKey = tsLast; + if (pSttKey != NULL && (pkCompEx(compFn, pSttKey, &minKey) > 0)) { + minKey = *pSttKey; } } // ASC: file block -----> stt block -----> imem -----> mem // DESC: mem -----> imem -----> stt block -----> file block if (ASCENDING_TRAVERSE(pReader->info.order)) { - if (minKey == key) { + if (pkCompEx(compFn, &minKey, pfKey) == 0) { TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); code = tsdbRowMergerAdd(pMerger, &fRow, NULL); if (code != TSDB_CODE_SUCCESS) { @@ -1864,17 +1947,17 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader); } - if (minKey == tsLast) { + if (pkCompEx(compFn, &minKey, pSttKey) == 0) { TSDBROW* pRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree); code = tsdbRowMergerAdd(pMerger, pRow1, NULL); if (code != TSDB_CODE_SUCCESS) { return code; } - doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, pReader->idStr); + doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, pSttKey, pMerger, &pReader->info.verRange, pReader->idStr); } - if (minKey == ik.key.ts) { + if (pkCompEx(compFn, &minKey, &ik) == 0) { code = tsdbRowMergerAdd(pMerger, piRow, piSchema); if (code != TSDB_CODE_SUCCESS) { return code; @@ -1886,7 +1969,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* } } - if (minKey == k.key.ts) { + if (pkCompEx(compFn, &minKey, &k) == 0) { code = tsdbRowMergerAdd(pMerger, pRow, pSchema); if (code != TSDB_CODE_SUCCESS) { return code; @@ -1898,7 +1981,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* } } } else { - if (minKey == k.key.ts) { + if (pkCompEx(compFn, &minKey, &k) == 0) { code = tsdbRowMergerAdd(pMerger, pRow, pSchema); if (code != TSDB_CODE_SUCCESS) { return code; @@ -1910,7 +1993,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* } } - if (minKey == ik.key.ts) { + if (pkCompEx(compFn, &minKey, &ik) == 0) { code = tsdbRowMergerAdd(pMerger, piRow, piSchema); if (code != TSDB_CODE_SUCCESS) { return code; @@ -1922,17 +2005,17 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* } } - if (minKey == tsLast) { + if (pkCompEx(compFn, &minKey, pSttKey) == 0) { TSDBROW* pRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree); code = tsdbRowMergerAdd(pMerger, pRow1, NULL); if (code != TSDB_CODE_SUCCESS) { return code; } - doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, pReader->idStr); + doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, pSttKey, pMerger, &pReader->info.verRange, pReader->idStr); } - if (minKey == key) { + if (pkCompEx(compFn, &minKey, pfKey) == 0) { TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); code = tsdbRowMergerAdd(pMerger, &fRow, NULL); if (code != TSDB_CODE_SUCCESS) { @@ -1996,13 +2079,13 @@ static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbRea if (ASCENDING_TRAVERSE(pReader->info.order)) { startKey = (STsdbRowKey){.version = pReader->info.verRange.minVer, .key = { - .ts = pBlockScanInfo->lastProcKey + 1, + .ts = pBlockScanInfo->lastProcKey.key.ts + 1, .numOfPKs = 0, // TODO: change here if multi-key is supported }}; } else { startKey = (STsdbRowKey){.version = pReader->info.verRange.maxVer, .key = { - .ts = pBlockScanInfo->lastProcKey - 1, + .ts = pBlockScanInfo->lastProcKey.key.ts - 1, .numOfPKs = 0, // TODO: change here if multi-key is supported }}; } @@ -2027,7 +2110,7 @@ static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbRea } static bool isValidFileBlockRow(SBlockData* pBlockData, int32_t rowIndex, STableBlockScanInfo* pBlockScanInfo, bool asc, - STsdbReaderInfo* pInfo) { + STsdbReaderInfo* pInfo, STsdbReader* pReader) { // it is an multi-table data block if (pBlockData->aUid != NULL) { uint64_t uid = pBlockData->aUid[rowIndex]; @@ -2047,10 +2130,25 @@ static bool isValidFileBlockRow(SBlockData* pBlockData, int32_t rowIndex, STable return false; } - if ((asc && (ts <= pBlockScanInfo->lastProcKey)) || ((!asc) && (ts >= pBlockScanInfo->lastProcKey))) { + if ((asc && (ts < pBlockScanInfo->lastProcKey.key.ts)) || ((!asc) && (ts > pBlockScanInfo->lastProcKey.key.ts))) { return false; } + if (ts == pBlockScanInfo->lastProcKey.key.ts) { // todo opt perf + STsdbRowKey nextRowKey; + tsdbColRowGetKey(pBlockData, rowIndex, &nextRowKey); + + if (!pReader->pkChecked) { + pReader->pkComparFn = getComparFunc(pBlockScanInfo->lastProcKey.key.pks[0].type, 0); + pReader->pkChecked = true; + pReader->numOfPks = nextRowKey.key.numOfPKs; + } + + if (pkComp2(pReader, &pBlockScanInfo->lastProcKey, &nextRowKey) == 0) { + return false; + } + } + if (pBlockScanInfo->delSkyline != NULL && TARRAY_SIZE(pBlockScanInfo->delSkyline) > 0) { bool dropped = hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->fileDelIndex, ts, ver, pInfo->order, &pInfo->verRange); @@ -2196,7 +2294,12 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc } if (copied) { - pBlockScanInfo->lastProcKey = key; + if (pReader->numOfPks == 0) { + pBlockScanInfo->lastProcKey.key.ts = key; + } else { // todo use deep copy instead of shallow copy + int32_t step = ASCENDING_TRAVERSE(pReader->info.order)? 1 : -1; + tsdbColRowGetKey(pBlockData, pDumpInfo->rowIndex - step, &pBlockScanInfo->lastProcKey); + } return TSDB_CODE_SUCCESS; } else { TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); @@ -2223,9 +2326,13 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc int32_t mergeRowsInSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) { - bool copied = false; - SRow* pTSRow = NULL; - int64_t tsLastBlock = getCurrentKeyInSttBlock(pSttBlockReader); + bool copied = false; + SRow* pTSRow = NULL; + STsdbRowKey* pSttKey = NULL;//getCurrentKeyInSttBlock(pSttBlockReader); + + STsdbRowKey newSttKey; + tsdbRowKeyAssign(&newSttKey, getCurrentKeyInSttBlock(pSttBlockReader)); + pSttKey = &newSttKey; SRowMerger* pMerger = &pReader->status.merger; TSDBROW* pRow = tMergeTreeGetRow(&pSttBlockReader->mergeTree); @@ -2234,13 +2341,13 @@ int32_t mergeRowsInSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockScanIn tsdbTrace("fRow ptr:%p, %d, uid:%" PRIu64 ", ts:%" PRId64 " %s", pRow->pBlockData, pRow->iRow, pSttBlockReader->uid, fRow.pBlockData->aTSKEY[fRow.iRow], pReader->idStr); - int32_t code = tryCopyDistinctRowFromSttBlock(&fRow, pSttBlockReader, pBlockScanInfo, tsLastBlock, pReader, &copied); + int32_t code = tryCopyDistinctRowFromSttBlock(&fRow, pSttBlockReader, pBlockScanInfo, pSttKey, pReader, &copied); if (code) { return code; } if (copied) { - pBlockScanInfo->lastProcKey = tsLastBlock; + tsdbRowKeyAssign(&pBlockScanInfo->lastProcKey, pSttKey); return TSDB_CODE_SUCCESS; } else { code = tsdbRowMergerAdd(pMerger, &fRow, NULL); @@ -2250,8 +2357,7 @@ int32_t mergeRowsInSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockScanIn TSDBROW* pRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree); tsdbRowMergerAdd(pMerger, pRow1, NULL); - doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLastBlock, pMerger, &pReader->info.verRange, - pReader->idStr); + doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, pSttKey, pMerger, &pReader->info.verRange, pReader->idStr); code = tsdbRowMergerGetRow(pMerger, &pTSRow); if (code != TSDB_CODE_SUCCESS) { @@ -2365,7 +2471,6 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { SSttBlockReader* pSttBlockReader = pReader->status.fileIter.pSttBlockReader; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; - STableBlockScanInfo* pBlockScanInfo = NULL; if (pBlockInfo == NULL) { return 0; } @@ -2375,7 +2480,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { return code; } - pBlockScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, pBlockInfo->uid, pReader->idStr); + STableBlockScanInfo* pBlockScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, pBlockInfo->uid, pReader->idStr); if (pBlockScanInfo == NULL) { goto _end; } @@ -2384,18 +2489,13 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { // it is a clean block, load it directly int64_t cap = pReader->resBlockInfo.capacity; - if (isCleanFileDataBlock(pReader, pBlockInfo, pBlockScanInfo, keyInBuf) && (pBlockInfo->numRow <= cap)) { - if (((asc && (pBlockInfo->firstKey < keyInBuf.ts)) || (!asc && (pBlockInfo->lastKey > keyInBuf.ts))) && - (pBlockScanInfo->sttKeyInfo.status == STT_FILE_NO_DATA)) { - code = copyBlockDataToSDataBlock(pReader); - if (code) { - goto _end; - } - - // record the last key value - pBlockScanInfo->lastProcKey = asc ? pBlockInfo->lastKey : pBlockInfo->firstKey; - goto _end; - } + bool directCopy = isCleanFileDataBlock(pReader, pBlockInfo, pBlockScanInfo, keyInBuf) && + (pBlockInfo->numRow <= cap) && (pBlockScanInfo->sttKeyInfo.status == STT_FILE_NO_DATA) && + ((asc && ((pBlockInfo->lastKey < keyInBuf.ts) || (keyInBuf.ts == INT64_MIN))) || + (!asc && (pBlockInfo->lastKey > keyInBuf.ts))); + if (directCopy) { + code = copyBlockDataToSDataBlock(pReader, &pBlockScanInfo->lastProcKey); + goto _end; } SBlockData* pBlockData = &pReader->status.fileBlockData; @@ -2406,7 +2506,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { { while (pBlockData->nRow > 0 && pBlockData->uid == pBlockScanInfo->uid) { // find the first qualified row in data block - if (isValidFileBlockRow(pBlockData, pDumpInfo->rowIndex, pBlockScanInfo, asc, &pReader->info)) { + if (isValidFileBlockRow(pBlockData, pDumpInfo->rowIndex, pBlockScanInfo, asc, &pReader->info, pReader)) { hasBlockData = true; break; } @@ -2699,7 +2799,8 @@ static void buildCleanBlockFromSttFiles(STsdbReader* pReader, STableBlockScanInf pScanInfo->sttKeyInfo.nextProcKey = asc ? pScanInfo->sttWindow.ekey + 1 : pScanInfo->sttWindow.skey - 1; pScanInfo->sttKeyInfo.status = STT_FILE_NO_DATA; - pScanInfo->lastProcKey = asc ? pScanInfo->sttWindow.ekey : pScanInfo->sttWindow.skey; + ASSERT(0); + pScanInfo->lastProcKey.key.ts = asc ? pScanInfo->sttWindow.ekey : pScanInfo->sttWindow.skey; pScanInfo->sttBlockReturned = true; pSttBlockReader->mergeTree.pIter = NULL; @@ -2725,7 +2826,18 @@ static void buildCleanBlockFromDataFiles(STsdbReader* pReader, STableBlockScanIn setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlockInfo->lastKey, pReader->info.order); // update the last key for the corresponding table - pScanInfo->lastProcKey = asc ? pInfo->window.ekey : pInfo->window.skey; + SRowKey* pKey = &pScanInfo->lastProcKey.key; + pKey->ts = asc ? pInfo->window.ekey : pInfo->window.skey; + pKey->numOfPKs = pReader->numOfPks; + + // todo opt allocation +// if (IS_NUMERIC_TYPE(p)) { + pKey->pks[0].val = asc? pBlockInfo->lastPrimaryKey.val:pBlockInfo->firstPrimaryKey.val; +// } else { +// int32_t len = asc? pBlockInfo->lastPKLen:pBlockInfo->firstPKLen; +// char* p = taosMemoryRealloc(pKey->pks[0].pData, len); +// } + tsdbDebug("%p uid:%" PRIu64 " clean file block retrieved from file, global index:%d, " "table index:%d, rows:%d, brange:%" PRId64 "-%" PRId64 ", %s", @@ -2931,10 +3043,10 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { } // data in stt now overlaps with current active file data block, need to composed with file data block. - int64_t lastKeyInStt = getCurrentKeyInSttBlock(pSttBlockReader); - if ((lastKeyInStt >= pBlockInfo->firstKey && asc) || (lastKeyInStt <= pBlockInfo->lastKey && (!asc))) { + STsdbRowKey* pSttKey = getCurrentKeyInSttBlock(pSttBlockReader); + if ((pSttKey->key.ts >= pBlockInfo->firstKey && asc) || (pSttKey->key.ts <= pBlockInfo->lastKey && (!asc))) { tsdbDebug("%p lastKeyInStt:%" PRId64 ", overlap with file block, brange:%" PRId64 "-%" PRId64 " %s", pReader, - lastKeyInStt, pBlockInfo->firstKey, pBlockInfo->lastKey, pReader->idStr); + pSttKey->key.ts, pBlockInfo->firstKey, pBlockInfo->lastKey, pReader->idStr); break; } } @@ -3046,20 +3158,22 @@ static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter) SFileBlockDumpInfo* pDumpInfo = &pStatus->fBlockDumpInfo; if (pBlockInfo) { - STableBlockScanInfo* pScanInfo = tSimpleHashGet(pBlockIter->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid)); - if (pScanInfo) { - lastKey = pScanInfo->lastProcKey; - } +// STableBlockScanInfo* pScanInfo = tSimpleHashGet(pBlockIter->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid)); +// if (pScanInfo) { +// tsdbRowKeyAssign(&pDumpInfo->lastKey, &pScanInfo->lastProcKey); +// lastKey = pScanInfo->lastProcKey; +// } pDumpInfo->totalRows = pBlockInfo->numRow; pDumpInfo->rowIndex = ASCENDING_TRAVERSE(pReader->info.order) ? 0 : pBlockInfo->numRow - 1; } else { pDumpInfo->totalRows = 0; pDumpInfo->rowIndex = 0; +// pDumpInfo->lastKey.key.ts = lastKey; } pDumpInfo->allDumped = false; - pDumpInfo->lastKey = lastKey; +// pDumpInfo->lastKey = lastKey; } static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBlockIter) { @@ -3173,9 +3287,10 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) { } } - while (1) { - SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; + SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; + SBlockData* pBlockData = &pReader->status.fileBlockData; + while (1) { if (fileBlockPartiallyRead(pDumpInfo, asc)) { // file data block is partially loaded code = buildComposedDataBlock(pReader); } else { @@ -3188,7 +3303,6 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) { } else { // all data blocks in files are checked, let's check the data in last files. // data blocks in current file are exhausted, let's try the next file now - SBlockData* pBlockData = &pReader->status.fileBlockData; if (pBlockData->uid != 0) { tBlockDataClear(pBlockData); } @@ -3553,14 +3667,19 @@ int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pSc return TSDB_CODE_SUCCESS; } -int32_t doMergeRowsInSttBlock(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts, + + +int32_t doMergeRowsInSttBlock(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, STsdbRowKey* pRowKey, SRowMerger* pMerger, SVersionRange* pVerRange, const char* idStr) { while (nextRowFromSttBlocks(pSttBlockReader, pScanInfo, pVerRange)) { - int64_t next1 = getCurrentKeyInSttBlock(pSttBlockReader); - if (next1 == ts) { + STsdbRowKey* next1 = getCurrentKeyInSttBlock(pSttBlockReader); + + int32_t ret = pkCompEx(pSttBlockReader->pkComparFn, pRowKey, next1); + if (ret == 0) { TSDBROW* pRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree); tsdbRowMergerAdd(pMerger, pRow1, NULL); } else { + ASSERT(ret < 0); tsdbTrace("uid:%" PRIu64 " last del index:%d, del range:%d, lastKeyInStt:%" PRId64 ", %s", pScanInfo->uid, pScanInfo->sttBlockDelIndex, (int32_t)taosArrayGetSize(pScanInfo->delSkyline), pScanInfo->sttKeyInfo.nextProcKey, idStr); @@ -3822,7 +3941,11 @@ int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, SRow* pT pBlock->info.dataLoad = 1; pBlock->info.rows += 1; - pScanInfo->lastProcKey = pTSRow->ts; + + // todo no version + TSDBROW row = {.pTSRow = pTSRow, .type = TSDBROW_ROW_FMT}; + tsdbRowGetKey(&row, &pScanInfo->lastProcKey); +// pScanInfo->lastProcKey = pTSRow->ts; return TSDB_CODE_SUCCESS; } @@ -3899,13 +4022,13 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e return code; } - pBlockScanInfo->lastProcKey = ts; + tsdbRowGetKey(&row, &pBlockScanInfo->lastProcKey); } else { code = doAppendRowFromFileBlock(pBlock, pReader, row.pBlockData, row.iRow); if (code) { - break; + return code; } - pBlockScanInfo->lastProcKey = row.pBlockData->aTSKEY[row.iRow]; + tsdbColRowGetKey(row.pBlockData, row.iRow, &pBlockScanInfo->lastProcKey); } // no data in buffer, return immediately @@ -3959,11 +4082,13 @@ int32_t tsdbSetTableList2(STsdbReader* pReader, const void* pTableList, int32_t // todo extract method if (ASCENDING_TRAVERSE(pReader->info.order)) { int64_t skey = pReader->info.window.skey; - pInfo->lastProcKey = (skey > INT64_MIN) ? (skey - 1) : skey; + ASSERT(0); +// pInfo->lastProcKey = (skey > INT64_MIN) ? (skey - 1) : skey; pInfo->sttKeyInfo.nextProcKey = skey; } else { int64_t ekey = pReader->info.window.ekey; - pInfo->lastProcKey = (ekey < INT64_MAX) ? (ekey + 1) : ekey; + ASSERT(0); +// pInfo->lastProcKey = (ekey < INT64_MAX) ? (ekey + 1) : ekey; pInfo->sttKeyInfo.nextProcKey = ekey; } @@ -4279,7 +4404,8 @@ static int32_t doSuspendCurrentReader(STsdbReader* pCurrentReader) { while ((p = tSimpleHashIterate(pStatus->pTableMap, p, &iter)) != NULL) { STableBlockScanInfo* pInfo = *(STableBlockScanInfo**)p; clearBlockScanInfo(pInfo); - pInfo->sttKeyInfo.nextProcKey = pInfo->lastProcKey + step; + ASSERT(0); +// pInfo->sttKeyInfo.nextProcKey = pInfo->lastProcKey + step; } pStatus->uidList.currentIndex = 0; @@ -4798,7 +4924,7 @@ static SSDataBlock* doRetrieveDataBlock(STsdbReader* pReader) { return NULL; } - code = copyBlockDataToSDataBlock(pReader); + code = copyBlockDataToSDataBlock(pReader, &pBlockScanInfo->lastProcKey); if (code != TSDB_CODE_SUCCESS) { tBlockDataReset(&pStatus->fileBlockData); terrno = code; diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c index 4c742c0572..e99ea3467a 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c @@ -165,18 +165,18 @@ SSHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf* pBuf if (ASCENDING_TRAVERSE(pTsdbReader->info.order)) { int64_t skey = pTsdbReader->info.window.skey; - pScanInfo->lastProcKey = (skey > INT64_MIN) ? (skey - 1) : skey; + pScanInfo->lastProcKey.key.ts = (skey > INT64_MIN) ? (skey - 1) : skey; pScanInfo->sttKeyInfo.nextProcKey = skey; } else { int64_t ekey = pTsdbReader->info.window.ekey; - pScanInfo->lastProcKey = (ekey < INT64_MAX) ? (ekey + 1) : ekey; + pScanInfo->lastProcKey.key.ts = (ekey < INT64_MAX) ? (ekey + 1) : ekey; pScanInfo->sttKeyInfo.nextProcKey = ekey; } pScanInfo->sttKeyInfo.status = STT_FILE_READER_UNINIT; tSimpleHashPut(pTableMap, &pScanInfo->uid, sizeof(uint64_t), &pScanInfo, POINTER_BYTES); tsdbTrace("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pTsdbReader, pScanInfo->uid, - pScanInfo->lastProcKey, pTsdbReader->idStr); + pScanInfo->lastProcKey.key.ts, pTsdbReader->idStr); } taosSort(pUidList->tableUidList, numOfTables, sizeof(uint64_t), uidComparFunc); @@ -209,7 +209,9 @@ void resetAllDataBlockScanInfo(SSHashObj* pTableMap, int64_t ts, int32_t step) { } pInfo->delSkyline = taosArrayDestroy(pInfo->delSkyline); - pInfo->lastProcKey = ts; + pInfo->lastProcKey.key.ts = ts; + ASSERT(0); + pInfo->sttKeyInfo.nextProcKey = ts + step; } } @@ -355,7 +357,7 @@ static int32_t fileDataBlockOrderCompar(const void* pLeft, const void* pRight, v return pLeftBlock->offset > pRightBlock->offset ? 1 : -1; } -static void recordToBlockInfo(SFileDataBlockInfo* pBlockInfo, SBrinRecord* record) { +static void recordToBlockInfo(SFileDataBlockInfo* pBlockInfo, SBrinRecord* record, STsdbReader* pReader) { pBlockInfo->uid = record->uid; pBlockInfo->firstKey = record->firstKey.key.ts; pBlockInfo->lastKey = record->lastKey.key.ts; @@ -368,6 +370,31 @@ static void recordToBlockInfo(SFileDataBlockInfo* pBlockInfo, SBrinRecord* recor pBlockInfo->smaSize = record->smaSize; pBlockInfo->numRow = record->numRow; pBlockInfo->count = record->count; + + SRowKey* pFirstKey = &record->firstKey.key; + if (!pReader->pkChecked) { + pReader->pkChecked = true; + pReader->numOfPks = pFirstKey->numOfPKs; + pReader->pkComparFn = getComparFunc(pFirstKey->pks[0].type, 0); + } + + if (pFirstKey->numOfPKs > 0) { + if (IS_NUMERIC_TYPE(pFirstKey->pks[0].type)) { + pBlockInfo->firstPrimaryKey.val = pFirstKey->pks[0].val; + pBlockInfo->lastPrimaryKey.val = record->lastKey.key.pks[0].val; + + pBlockInfo->firstPKLen = 0; + pBlockInfo->lastPKLen = 0; + } else { // todo handle memory alloc error, opt memory alloc perf + pBlockInfo->firstPKLen = pFirstKey->pks[0].nData; + pBlockInfo->firstPrimaryKey.pData = taosMemoryCalloc(1, pBlockInfo->firstPKLen); + memcpy(pBlockInfo->firstPrimaryKey.pData, pFirstKey->pks[0].pData, pBlockInfo->firstPKLen); + + pBlockInfo->lastPKLen = record->lastKey.key.pks[0].nData; + pBlockInfo->lastPrimaryKey.pData = taosMemoryCalloc(1, pBlockInfo->lastPKLen); + memcpy(pBlockInfo->lastPrimaryKey.pData, record->lastKey.key.pks[0].pData, pBlockInfo->lastPKLen); + } + } } int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int32_t numOfBlocks, SArray* pTableList) { @@ -392,7 +419,6 @@ int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int3 for (int32_t i = 0; i < numOfTables; ++i) { STableBlockScanInfo* pTableScanInfo = taosArrayGetP(pTableList, i); - // ASSERT(pTableScanInfo->pBlockList != NULL && taosArrayGetSize(pTableScanInfo->pBlockList) > 0); size_t num = taosArrayGetSize(pTableScanInfo->pBlockList); sup.numOfBlocksPerTable[sup.numOfTables] = num; @@ -426,15 +452,17 @@ int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int3 if (pTableScanInfo->pBlockIdxList == NULL) { pTableScanInfo->pBlockIdxList = taosArrayInit(numOfBlocks, sizeof(STableDataBlockIdx)); } + for (int32_t i = 0; i < numOfBlocks; ++i) { SFileDataBlockInfo blockInfo = {.tbBlockIdx = i}; SBrinRecord* record = (SBrinRecord*)taosArrayGet(sup.pDataBlockInfo[0][i].pInfo->pBlockList, i); - recordToBlockInfo(&blockInfo, record); + recordToBlockInfo(&blockInfo, record, pReader); taosArrayPush(pBlockIter->blockList, &blockInfo); STableDataBlockIdx tableDataBlockIdx = {.globalIndex = i}; taosArrayPush(pTableScanInfo->pBlockIdxList, &tableDataBlockIdx); } + pTableScanInfo->pBlockList = taosArrayDestroy(pTableScanInfo->pBlockList); int64_t et = taosGetTimestampUs(); @@ -464,7 +492,7 @@ int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int3 SFileDataBlockInfo blockInfo = {.tbBlockIdx = index}; SBrinRecord* record = (SBrinRecord*)taosArrayGet(sup.pDataBlockInfo[pos][index].pInfo->pBlockList, index); - recordToBlockInfo(&blockInfo, record); + recordToBlockInfo(&blockInfo, record, pReader); taosArrayPush(pBlockIter->blockList, &blockInfo); STableBlockScanInfo* pTableScanInfo = sup.pDataBlockInfo[pos][index].pInfo; diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h index 9fd52d4823..2dbae87c56 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h @@ -77,7 +77,7 @@ typedef enum ESttKeyStatus { typedef struct SSttKeyInfo { ESttKeyStatus status; // this value should be updated when switch to the next fileset - int64_t nextProcKey; + int64_t nextProcKey; // todo remove this attribute, since it is impossible to set correct nextProcKey value } SSttKeyInfo; // clean stt file blocks: @@ -87,7 +87,8 @@ typedef struct SSttKeyInfo { // 4. not overlap with data file blocks typedef struct STableBlockScanInfo { uint64_t uid; - TSKEY lastProcKey; // todo: refactor: add primary key +// TSKEY lastProcKey; // todo: refactor: add primary key + STsdbRowKey lastProcKey; SSttKeyInfo sttKeyInfo; SArray* pBlockList; // block data index list, SArray SArray* pBlockIdxList; // SArray @@ -168,7 +169,9 @@ typedef struct SSttBlockReader { int32_t order; uint64_t uid; SMergeTree mergeTree; - int64_t currentKey; + STsdbRowKey currentKey; + int32_t numOfPks; + __compar_fn_t pkComparFn; } SSttBlockReader; typedef struct SFilesetIter { @@ -181,12 +184,21 @@ typedef struct SFilesetIter { typedef struct SFileDataBlockInfo { // index position in STableBlockScanInfo in order to check whether neighbor block overlaps with it - // int64_t suid; int64_t uid; int64_t firstKey; -// int64_t firstKeyVer; + union { + int64_t val; + uint8_t* pData; + } firstPrimaryKey; + int64_t lastKey; -// int64_t lastKeyVer; + union { + int64_t val; + uint8_t* pData; + } lastPrimaryKey; + + int32_t firstPKLen; + int32_t lastPKLen; int64_t minVer; int64_t maxVer; int64_t blockOffset; @@ -211,7 +223,8 @@ typedef struct SDataBlockIter { typedef struct SFileBlockDumpInfo { int32_t totalRows; int32_t rowIndex; - int64_t lastKey; +// int64_t lastKey; +// STsdbRowKey lastKey; // this key should be removed bool allDumped; } SFileBlockDumpInfo; @@ -249,7 +262,6 @@ struct STsdbReader { TdThreadMutex readerMutex; EReaderStatus flag; int32_t code; - uint64_t rowsNum; SResultBlockInfo resBlockInfo; SReaderStatus status; char* idStr; // query info handle, for debug purpose @@ -268,6 +280,7 @@ struct STsdbReader { TsdReaderNotifyCbFn notifyFn; void* notifyParam; __compar_fn_t pkComparFn; + int32_t numOfPks; bool pkChecked; }; diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index f708700b1a..d701387263 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -623,6 +623,54 @@ void tsdbRowGetKey(TSDBROW *row, STsdbRowKey *key) { } } +void tsdbColRowGetKey(SBlockData* pBlock, int32_t irow, STsdbRowKey* key) { + key->version = pBlock->aVersion[irow]; + key->key.ts = pBlock->aTSKEY[irow]; + key->key.numOfPKs = 0; + + for (int32_t i = 0; i < pBlock->nColData; i++) { + SColData *pColData = &pBlock->aColData[i]; + if (pColData->cflag & COL_IS_KEY) { + SColVal cv; + tColDataGetValue(pColData, irow, &cv); + ASSERT(COL_VAL_IS_VALUE(&cv)); + key->key.pks[key->key.numOfPKs] = cv.value; + key->key.numOfPKs++; + } else { + break; + } + } +} + +int32_t tsdbRowKeyAssign(STsdbRowKey *pDst, STsdbRowKey* pSrc) { + pDst->version = pSrc->version; + + if (pSrc->key.numOfPKs == 0) { + pDst->key.ts = pSrc->key.ts; + pDst->key.numOfPKs = 0; + } else { + pDst->key = pSrc->key; + + for (int32_t i = 0; i < pDst->key.numOfPKs; ++i) { + SValue *pVal = &pDst->key.pks[i]; + if (IS_NUMERIC_TYPE(pVal->type)) { + continue; + } + + uint8_t *p = taosMemoryMalloc(pVal->nData); + if (p == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return terrno; + } + + memcpy(p, pVal->pData, pVal->nData); + pVal->pData = p; + } + } + + return TSDB_CODE_SUCCESS; +} + int32_t tsdbRowKeyCmpr(const STsdbRowKey *key1, const STsdbRowKey *key2) { int32_t c = tRowKeyCompare(&key1->key, &key2->key); @@ -726,7 +774,6 @@ int32_t tsdbRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) if (taosArrayPush(pMerger->pArray, pColVal) == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; return code; - // goto _exit; } // other