From 750ea5789f4246be2874d0ac305d7f65a0ed1af0 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 25 Mar 2024 14:20:46 +0800 Subject: [PATCH] fix(tsdb): fix errors identified by CI. --- source/dnode/vnode/src/tsdb/tsdbRead2.c | 69 +++++++++++++++++-------- source/dnode/vnode/src/tsdb/tsdbUtil.c | 2 +- 2 files changed, 48 insertions(+), 23 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 6e134698b5..891fd823e7 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -141,7 +141,7 @@ static int32_t tGetPrimaryKeyIndex(uint8_t *p, SPrimaryKeyIndex *index) { } static void tRowGetKeyDeepCopy(SRow* pRow, SRowKey* pKey) { - pKey->ts = pKey->ts; + pKey->ts = pRow->ts; pKey->numOfPKs = pRow->numOfPKs; if (pKey->numOfPKs == 0) { return; @@ -164,7 +164,7 @@ static void tRowGetKeyDeepCopy(SRow* pRow, SRowKey* pKey) { pKey->pks[i].pData = memcpy(pKey->pks[i].pData, data + indices[i].offset, pKey->pks[i].nData); pKey->pks[i].pData += pKey->pks[i].nData; } else { - pKey->pks[i].val = *(int64_t*) data + indices[i].offset; + pKey->pks[i].val = *(int64_t*) (data + indices[i].offset); } } } @@ -694,7 +694,7 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN w.ekey = pScanInfo->lastProcKey.ts; } - if (/*isEmptyQueryTimeWindow(&w)*/ w.ekey - w.skey < 2) { // NOTE: specialized for open interval + if (/*isEmptyQueryTimeWindow(&w)*/ w.ekey - w.skey < 1) { // NOTE: specialized for open interval k += 1; if (k >= numOfTables) { @@ -2012,41 +2012,58 @@ int32_t doInitMemDataIter(STsdbReader* pReader, STbData** pData, STableBlockScan return code; } +static void doForwardDataIter(SRowKey* pKey, SIterInfo* pIter, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) { + SRowKey rowKey; + + while (1) { + TSDBROW* pRow = getValidMemRow(pIter, pBlockScanInfo->delSkyline, pReader); + if (!pIter->hasVal) { + break; + } + + tRowGetKeyEx(pRow, &rowKey); + int32_t ret = pkCompEx(pReader->pkComparFn, pKey, &rowKey); + if (ret == 0) { + pIter->hasVal = tsdbTbDataIterNext(pIter->iter); + } else { + break; + } + } +} + +// handle the open interval issue. Find the first row key that is greater than the given one. +static int32_t forwardDataIter(SRowKey* pKey, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) { + doForwardDataIter(pKey, &pBlockScanInfo->iter, pBlockScanInfo, pReader); + doForwardDataIter(pKey, &pBlockScanInfo->iiter, pBlockScanInfo, pReader); + return TSDB_CODE_SUCCESS; +} + static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) { + STbData* d = NULL; + STbData* di = NULL; + bool asc = ASCENDING_TRAVERSE(pReader->info.order); + STsdbReadSnap* pSnap = pReader->pReadSnap; + if (pBlockScanInfo->iterInit) { return TSDB_CODE_SUCCESS; } - STbData* d = NULL; STsdbRowKey startKey = {0}; - if (ASCENDING_TRAVERSE(pReader->info.order)) { - startKey = (STsdbRowKey){.version = pReader->info.verRange.minVer, - .key = { - .ts = pBlockScanInfo->lastProcKey.ts + 1, - .numOfPKs = pReader->suppInfo.numOfPks, - }}; - } else { - startKey = (STsdbRowKey){.version = pReader->info.verRange.maxVer, - .key = { - .ts = pBlockScanInfo->lastProcKey.ts - 1, - .numOfPKs = pReader->suppInfo.numOfPks, - }}; - } + tRowKeyAssign(&startKey.key, &pBlockScanInfo->lastProcKey); + startKey.version = asc ? pReader->info.verRange.minVer : pReader->info.verRange.maxVer; - int32_t code = - doInitMemDataIter(pReader, &d, pBlockScanInfo, &startKey, pReader->pReadSnap->pMem, &pBlockScanInfo->iter, "mem"); + int32_t code = doInitMemDataIter(pReader, &d, pBlockScanInfo, &startKey, pSnap->pMem, &pBlockScanInfo->iter, "mem"); if (code != TSDB_CODE_SUCCESS) { return code; } - STbData* di = NULL; - code = doInitMemDataIter(pReader, &di, pBlockScanInfo, &startKey, pReader->pReadSnap->pIMem, &pBlockScanInfo->iiter, - "imem"); + code = doInitMemDataIter(pReader, &di, pBlockScanInfo, &startKey, pSnap->pIMem, &pBlockScanInfo->iiter, "imem"); if (code != TSDB_CODE_SUCCESS) { return code; } loadMemTombData(&pBlockScanInfo->pMemDelData, d, di, pReader->info.verRange.maxVer); + forwardDataIter(&startKey.key, pBlockScanInfo, pReader); pBlockScanInfo->iterInit = true; return TSDB_CODE_SUCCESS; @@ -4128,6 +4145,9 @@ int32_t tsdbReaderOpen2(void* pVnode, SQueryTableDataCond* pCond, void* pTableLi blockDataEnsureCapacity(pResBlock, capacity); } + // for debug purpose +// capacity = 7; + int32_t code = tsdbReaderCreate(pVnode, pCond, ppReader, capacity, pResBlock, idstr); if (code != TSDB_CODE_SUCCESS) { goto _err; @@ -4894,6 +4914,10 @@ SSDataBlock* tsdbRetrieveDataBlock2(STsdbReader* pReader, SArray* pIdList) { SReaderStatus* pStatus = &pTReader->status; if (pStatus->composedDataBlock || pReader->info.execMode == READER_EXEC_ROWS) { + +// tsdbReaderSuspend2(pReader); +// tsdbReaderResume2(pReader); + return pTReader->resBlockInfo.pResBlock; } @@ -4904,6 +4928,7 @@ SSDataBlock* tsdbRetrieveDataBlock2(STsdbReader* pReader, SArray* pIdList) { // tsdbReaderSuspend2(pReader); // tsdbReaderResume2(pReader); + return ret; } diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index fe7a3457bd..59452ebb9d 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -633,7 +633,7 @@ int32_t tRowKeyAssign(SRowKey *pDst, SRowKey* pSrc) { pDst->numOfPKs = pSrc->numOfPKs; if (pSrc->numOfPKs > 0) { - for (int32_t i = 0; i < pDst->numOfPKs; ++i) { + for (int32_t i = 0; i < pSrc->numOfPKs; ++i) { SValue *pVal = &pDst->pks[i]; pVal->type = pSrc->pks[i].type;