Merge pull request #24419 from wangjiaming0909/draft/fix/TD-28182

fix: table merge scan return disordered rows
This commit is contained in:
dapan1121 2024-01-12 10:16:21 +08:00 committed by GitHub
commit 3b928237d7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 15 additions and 1 deletions

View File

@ -73,6 +73,7 @@ static int32_t getInitialDelIndex(const SArray* pDelSkyline, int32_t order
static void resetTableListIndex(SReaderStatus* pStatus);
static void getMemTableTimeRange(STsdbReader* pReader, int64_t* pMaxKey, int64_t* pMinKey);
static void updateComposedBlockInfo(STsdbReader* pReader, double el, STableBlockScanInfo* pBlockScanInfo);
static int32_t buildFromPreFilesetBuffer(STsdbReader* pReader);
static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); }
@ -3040,6 +3041,17 @@ static ERetrieveType doReadDataFromSttFiles(STsdbReader* pReader) {
return TSDB_READ_RETURN;
}
if (pReader->status.bProcMemPreFileset) {
code = buildFromPreFilesetBuffer(pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
if (pResBlock->info.rows > 0) {
pReader->status.processingMemPreFileSet = true;
return TSDB_READ_RETURN;
}
}
if (pBlockIter->numOfBlocks > 0) { // there are data blocks existed.
return TSDB_READ_CONTINUE;
} else { // all blocks in data file are checked, let's check the data in last files
@ -4297,6 +4309,7 @@ static int32_t buildFromPreFilesetBuffer(STsdbReader* pReader) {
} else {
tsdbDebug("finished pre-fileset %d buffer processing. %s", fid, pReader->idStr);
pStatus->bProcMemPreFileset = false;
pStatus->processingMemPreFileSet = false;
if (pReader->notifyFn) {
STsdReaderNotifyInfo info = {0};
info.duration.filesetId = fid;
@ -4329,7 +4342,7 @@ static int32_t doTsdbNextDataBlockFilesetDelimited(STsdbReader* pReader) {
pStatus->bProcMemFirstFileset, pReader->idStr);
if (pStatus->bProcMemPreFileset) {
if (pBlock->info.rows > 0) {
if (pReader->notifyFn) {
if (pReader->notifyFn && !pReader->status.processingMemPreFileSet) {
int32_t fid = pReader->status.pCurrentFileset->fid;
STsdReaderNotifyInfo info = {0};
info.duration.filesetId = fid;

View File

@ -238,6 +238,7 @@ typedef struct SReaderStatus {
int64_t prevFilesetStartKey;
int64_t prevFilesetEndKey;
bool bProcMemFirstFileset;
bool processingMemPreFileSet;
STableUidList procMemUidList;
STableBlockScanInfo** pProcMemTableIter;
} SReaderStatus;