Merge pull request #25116 from taosdata/fix/3_liaohj

fix(stream):adjust current offset by using the scanning version, not …
This commit is contained in:
Haojun Liao 2024-03-19 16:44:58 +08:00 committed by GitHub
commit 7ff383cb1f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 11 additions and 19 deletions

View File

@ -246,32 +246,20 @@ int metaAlterCache(SMeta *pMeta, int32_t nPage) {
} }
int32_t metaRLock(SMeta *pMeta) { int32_t metaRLock(SMeta *pMeta) {
int32_t ret = 0;
metaTrace("meta rlock %p", &pMeta->lock); metaTrace("meta rlock %p", &pMeta->lock);
int32_t ret = taosThreadRwlockRdlock(&pMeta->lock);
ret = taosThreadRwlockRdlock(&pMeta->lock);
return ret; return ret;
} }
int32_t metaWLock(SMeta *pMeta) { int32_t metaWLock(SMeta *pMeta) {
int32_t ret = 0;
metaTrace("meta wlock %p", &pMeta->lock); metaTrace("meta wlock %p", &pMeta->lock);
int32_t ret = taosThreadRwlockWrlock(&pMeta->lock);
ret = taosThreadRwlockWrlock(&pMeta->lock);
return ret; return ret;
} }
int32_t metaULock(SMeta *pMeta) { int32_t metaULock(SMeta *pMeta) {
int32_t ret = 0;
metaTrace("meta ulock %p", &pMeta->lock); metaTrace("meta ulock %p", &pMeta->lock);
int32_t ret = taosThreadRwlockUnlock(&pMeta->lock);
ret = taosThreadRwlockUnlock(&pMeta->lock);
return ret; return ret;
} }

View File

@ -429,7 +429,7 @@ static int32_t doLoadSttFilesBlk(SSttBlockLoadInfo *pBlockLoadInfo, SLDataIter *
code = loadTombFn(pReader1, pIter->pReader, pIter->pBlockLoadInfo); code = loadTombFn(pReader1, pIter->pReader, pIter->pBlockLoadInfo);
double el = (taosGetTimestampUs() - st) / 1000.0; double el = (taosGetTimestampUs() - st) / 1000.0;
tsdbDebug("load the stt file info completed, elapsed time:%.2fms, %s", el, idStr); tsdbDebug("load the stt file blk info completed, elapsed time:%.2fms, %s", el, idStr);
return code; return code;
} }
@ -806,7 +806,7 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf, SSttDataInfoF
tMergeTreeAddIter(pMTree, pIter); tMergeTreeAddIter(pMTree, pIter);
// let's record the time window for current table of uid in the stt files // let's record the time window for current table of uid in the stt files
if (pSttDataInfo != NULL) { if (pSttDataInfo != NULL && numOfRows > 0) {
taosArrayPush(pSttDataInfo->pTimeWindowList, &w); taosArrayPush(pSttDataInfo->pTimeWindowList, &w);
pSttDataInfo->numOfRows += numOfRows; pSttDataInfo->numOfRows += numOfRows;
} }

View File

@ -2093,7 +2093,7 @@ static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScan
pScanInfo->sttKeyInfo.status = taosArrayGetSize(info.pTimeWindowList) ? STT_FILE_HAS_DATA : STT_FILE_NO_DATA; pScanInfo->sttKeyInfo.status = taosArrayGetSize(info.pTimeWindowList) ? STT_FILE_HAS_DATA : STT_FILE_NO_DATA;
pScanInfo->sttKeyInfo.nextProcKey = pScanInfo->sttKeyInfo.nextProcKey =
ASCENDING_TRAVERSE(pReader->info.order) ? pScanInfo->sttWindow.skey : pScanInfo->sttWindow.ekey; ASCENDING_TRAVERSE(pReader->info.order) ? pScanInfo->sttWindow.skey : pScanInfo->sttWindow.ekey;
hasData = true; hasData = (pScanInfo->sttKeyInfo.status == STT_FILE_HAS_DATA);
} else { // not clean stt blocks } else { // not clean stt blocks
INIT_TIMEWINDOW(&pScanInfo->sttWindow); //reset the time window INIT_TIMEWINDOW(&pScanInfo->sttWindow); //reset the time window
pScanInfo->sttBlockReturned = false; pScanInfo->sttBlockReturned = false;

View File

@ -1128,7 +1128,11 @@ static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) {
} }
if ((*pTask)->exec.pWalReader != NULL) { if ((*pTask)->exec.pWalReader != NULL) {
entry.processedVer = (*pTask)->chkInfo.nextProcessVer - 1; entry.processedVer = walReaderGetCurrentVer((*pTask)->exec.pWalReader) - 1;
if (entry.processedVer < 0) {
entry.processedVer = (*pTask)->chkInfo.processedVer;
}
walReaderValidVersionRange((*pTask)->exec.pWalReader, &entry.verStart, &entry.verEnd); walReaderValidVersionRange((*pTask)->exec.pWalReader, &entry.verStart, &entry.verEnd);
} }