fix bug
This commit is contained in:
parent
259815fd83
commit
34bf638f19
|
@ -40,9 +40,7 @@ typedef struct SUpdateInfo {
|
||||||
TSKEY minTS;
|
TSKEY minTS;
|
||||||
SScalableBf *pCloseWinSBF;
|
SScalableBf *pCloseWinSBF;
|
||||||
SHashObj *pMap;
|
SHashObj *pMap;
|
||||||
STimeWindow scanWindow;
|
uint64_t maxDataVersion;
|
||||||
uint64_t scanGroupId;
|
|
||||||
uint64_t maxVersion;
|
|
||||||
} SUpdateInfo;
|
} SUpdateInfo;
|
||||||
|
|
||||||
SUpdateInfo *updateInfoInitP(SInterval *pInterval, int64_t watermark);
|
SUpdateInfo *updateInfoInitP(SInterval *pInterval, int64_t watermark);
|
||||||
|
@ -50,8 +48,6 @@ SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t waterma
|
||||||
TSKEY updateInfoFillBlockData(SUpdateInfo *pInfo, SSDataBlock *pBlock, int32_t primaryTsCol);
|
TSKEY updateInfoFillBlockData(SUpdateInfo *pInfo, SSDataBlock *pBlock, int32_t primaryTsCol);
|
||||||
bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts);
|
bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts);
|
||||||
bool updateInfoIsTableInserted(SUpdateInfo *pInfo, int64_t tbUid);
|
bool updateInfoIsTableInserted(SUpdateInfo *pInfo, int64_t tbUid);
|
||||||
void updateInfoSetScanRange(SUpdateInfo *pInfo, STimeWindow *pWin, uint64_t groupId, uint64_t version);
|
|
||||||
bool updateInfoIgnore(SUpdateInfo *pInfo, STimeWindow *pWin, uint64_t groupId, uint64_t version);
|
|
||||||
void updateInfoDestroy(SUpdateInfo *pInfo);
|
void updateInfoDestroy(SUpdateInfo *pInfo);
|
||||||
void updateInfoAddCloseWindowSBF(SUpdateInfo *pInfo);
|
void updateInfoAddCloseWindowSBF(SUpdateInfo *pInfo);
|
||||||
void updateInfoDestoryColseWinSBF(SUpdateInfo *pInfo);
|
void updateInfoDestoryColseWinSBF(SUpdateInfo *pInfo);
|
||||||
|
|
|
@ -1022,8 +1022,9 @@ static void setGroupId(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_t grou
|
||||||
pInfo->groupId = groupCol[rowIndex];
|
pInfo->groupId = groupCol[rowIndex];
|
||||||
}
|
}
|
||||||
|
|
||||||
void resetTableScanInfo(STableScanInfo* pTableScanInfo, STimeWindow* pWin) {
|
void resetTableScanInfo(STableScanInfo* pTableScanInfo, STimeWindow* pWin, uint64_t version) {
|
||||||
pTableScanInfo->base.cond.twindows = *pWin;
|
pTableScanInfo->base.cond.twindows = *pWin;
|
||||||
|
pTableScanInfo->base.cond.endVersion = version;
|
||||||
pTableScanInfo->scanTimes = 0;
|
pTableScanInfo->scanTimes = 0;
|
||||||
pTableScanInfo->currentGroupId = -1;
|
pTableScanInfo->currentGroupId = -1;
|
||||||
tsdbReaderClose(pTableScanInfo->base.dataReader);
|
tsdbReaderClose(pTableScanInfo->base.dataReader);
|
||||||
|
@ -1142,7 +1143,7 @@ static bool prepareRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
resetTableScanInfo(pInfo->pTableScanOp->info, &win);
|
resetTableScanInfo(pInfo->pTableScanOp->info, &win, pInfo->pUpdateInfo->maxDataVersion);
|
||||||
pInfo->pTableScanOp->status = OP_OPENED;
|
pInfo->pTableScanOp->status = OP_OPENED;
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -1784,6 +1785,7 @@ static void setBlockGroupIdByUid(SStreamScanInfo* pInfo, SSDataBlock* pBlock) {
|
||||||
|
|
||||||
static void doCheckUpdate(SStreamScanInfo* pInfo, TSKEY endKey, SSDataBlock* pBlock) {
|
static void doCheckUpdate(SStreamScanInfo* pInfo, TSKEY endKey, SSDataBlock* pBlock) {
|
||||||
if (pInfo->pUpdateInfo) {
|
if (pInfo->pUpdateInfo) {
|
||||||
|
pInfo->pUpdateInfo->maxDataVersion = TMAX(pInfo->pUpdateInfo->maxDataVersion, pBlock->info.version);
|
||||||
checkUpdateData(pInfo, true, pBlock, true);
|
checkUpdateData(pInfo, true, pBlock, true);
|
||||||
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, endKey);
|
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, endKey);
|
||||||
if (pInfo->pUpdateDataRes->info.rows > 0) {
|
if (pInfo->pUpdateDataRes->info.rows > 0) {
|
||||||
|
@ -1845,7 +1847,6 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
||||||
pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__SCAN2;
|
pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__SCAN2;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*resetTableScanInfo(pTSInfo, pWin);*/
|
|
||||||
tsdbReaderClose(pTSInfo->base.dataReader);
|
tsdbReaderClose(pTSInfo->base.dataReader);
|
||||||
qDebug("4");
|
qDebug("4");
|
||||||
|
|
||||||
|
@ -1891,8 +1892,6 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
||||||
SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
|
SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
|
||||||
if (pSDB) {
|
if (pSDB) {
|
||||||
STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
|
STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
|
||||||
uint64_t version = getReaderMaxVersion(pTableScanInfo->base.dataReader);
|
|
||||||
updateInfoSetScanRange(pInfo->pUpdateInfo, &pTableScanInfo->base.cond.twindows, pInfo->groupId, version);
|
|
||||||
pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA;
|
pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA;
|
||||||
checkUpdateData(pInfo, true, pSDB, false);
|
checkUpdateData(pInfo, true, pSDB, false);
|
||||||
printDataBlock(pSDB, "scan recover update");
|
printDataBlock(pSDB, "scan recover update");
|
||||||
|
@ -1961,6 +1960,9 @@ FETCH_NEXT_BLOCK:
|
||||||
pBlock->info.calWin.skey = INT64_MIN;
|
pBlock->info.calWin.skey = INT64_MIN;
|
||||||
pBlock->info.calWin.ekey = INT64_MAX;
|
pBlock->info.calWin.ekey = INT64_MAX;
|
||||||
pBlock->info.dataLoad = 1;
|
pBlock->info.dataLoad = 1;
|
||||||
|
if (pInfo->pUpdateInfo) {
|
||||||
|
pInfo->pUpdateInfo->maxDataVersion = TMAX(pInfo->pUpdateInfo->maxDataVersion, pBlock->info.version);
|
||||||
|
}
|
||||||
blockDataUpdateTsWindow(pBlock, 0);
|
blockDataUpdateTsWindow(pBlock, 0);
|
||||||
switch (pBlock->info.type) {
|
switch (pBlock->info.type) {
|
||||||
case STREAM_NORMAL:
|
case STREAM_NORMAL:
|
||||||
|
@ -2058,8 +2060,6 @@ FETCH_NEXT_BLOCK:
|
||||||
SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
|
SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
|
||||||
if (pSDB) {
|
if (pSDB) {
|
||||||
STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
|
STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
|
||||||
uint64_t version = getReaderMaxVersion(pTableScanInfo->base.dataReader);
|
|
||||||
updateInfoSetScanRange(pInfo->pUpdateInfo, &pTableScanInfo->base.cond.twindows, pInfo->groupId, version);
|
|
||||||
pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA;
|
pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA;
|
||||||
checkUpdateData(pInfo, true, pSDB, false);
|
checkUpdateData(pInfo, true, pSDB, false);
|
||||||
printDataBlock(pSDB, "stream scan update");
|
printDataBlock(pSDB, "stream scan update");
|
||||||
|
@ -2125,13 +2125,6 @@ FETCH_NEXT_BLOCK:
|
||||||
|
|
||||||
setBlockIntoRes(pInfo, &block, false);
|
setBlockIntoRes(pInfo, &block, false);
|
||||||
|
|
||||||
if (updateInfoIgnore(pInfo->pUpdateInfo, &pInfo->pRes->info.window, pInfo->pRes->info.id.groupId,
|
|
||||||
pInfo->pRes->info.version)) {
|
|
||||||
printDataBlock(pInfo->pRes, "stream scan ignore");
|
|
||||||
blockDataCleanup(pInfo->pRes);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pInfo->pCreateTbRes->info.rows > 0) {
|
if (pInfo->pCreateTbRes->info.rows > 0) {
|
||||||
pInfo->scanMode = STREAM_SCAN_FROM_RES;
|
pInfo->scanMode = STREAM_SCAN_FROM_RES;
|
||||||
return pInfo->pCreateTbRes;
|
return pInfo->pCreateTbRes;
|
||||||
|
|
|
@ -1066,6 +1066,7 @@ void streamStateDestroy(SStreamState* pState) {
|
||||||
#ifdef USE_ROCKSDB
|
#ifdef USE_ROCKSDB
|
||||||
streamFileStateDestroy(pState->pFileState);
|
streamFileStateDestroy(pState->pFileState);
|
||||||
streamStateDestroy_rocksdb(pState);
|
streamStateDestroy_rocksdb(pState);
|
||||||
|
taosMemoryFreeClear(pState->parNameMap);
|
||||||
// do nothong
|
// do nothong
|
||||||
#endif
|
#endif
|
||||||
taosMemoryFreeClear(pState->pTdbState);
|
taosMemoryFreeClear(pState->pTdbState);
|
||||||
|
|
|
@ -128,9 +128,7 @@ SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t waterma
|
||||||
pInfo->pCloseWinSBF = NULL;
|
pInfo->pCloseWinSBF = NULL;
|
||||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT);
|
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT);
|
||||||
pInfo->pMap = taosHashInit(DEFAULT_MAP_CAPACITY, hashFn, true, HASH_NO_LOCK);
|
pInfo->pMap = taosHashInit(DEFAULT_MAP_CAPACITY, hashFn, true, HASH_NO_LOCK);
|
||||||
pInfo->maxVersion = 0;
|
pInfo->maxDataVersion = 0;
|
||||||
pInfo->scanGroupId = 0;
|
|
||||||
pInfo->scanWindow = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX};
|
|
||||||
return pInfo;
|
return pInfo;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -242,29 +240,6 @@ bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
void updateInfoSetScanRange(SUpdateInfo *pInfo, STimeWindow *pWin, uint64_t groupId, uint64_t version) {
|
|
||||||
qDebug("===stream===groupId:%" PRIu64 ", startTs:%" PRIu64 ", endTs:%" PRIu64 ", version:%" PRIu64, groupId,
|
|
||||||
pWin->skey, pWin->ekey, version);
|
|
||||||
pInfo->scanWindow = *pWin;
|
|
||||||
pInfo->scanGroupId = groupId;
|
|
||||||
pInfo->maxVersion = version;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool updateInfoIgnore(SUpdateInfo *pInfo, STimeWindow *pWin, uint64_t groupId, uint64_t version) {
|
|
||||||
if (!pInfo) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
qDebug("===stream===check groupId:%" PRIu64 ", startTs:%" PRIu64 ", endTs:%" PRIu64 ", version:%" PRIu64, groupId,
|
|
||||||
pWin->skey, pWin->ekey, version);
|
|
||||||
if (pInfo->scanGroupId == groupId && pInfo->scanWindow.skey <= pWin->skey && pWin->ekey <= pInfo->scanWindow.ekey &&
|
|
||||||
version <= pInfo->maxVersion) {
|
|
||||||
qDebug("===stream===ignore groupId:%" PRIu64 ", startTs:%" PRIu64 ", endTs:%" PRIu64 ", version:%" PRIu64, groupId,
|
|
||||||
pWin->skey, pWin->ekey, version);
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
void updateInfoDestroy(SUpdateInfo *pInfo) {
|
void updateInfoDestroy(SUpdateInfo *pInfo) {
|
||||||
if (pInfo == NULL) {
|
if (pInfo == NULL) {
|
||||||
return;
|
return;
|
||||||
|
@ -337,10 +312,7 @@ int32_t updateInfoSerialize(void *buf, int32_t bufLen, const SUpdateInfo *pInfo)
|
||||||
if (tEncodeI64(&encoder, *(TSKEY *)pIte) < 0) return -1;
|
if (tEncodeI64(&encoder, *(TSKEY *)pIte) < 0) return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tEncodeI64(&encoder, pInfo->scanWindow.skey) < 0) return -1;
|
if (tEncodeU64(&encoder, pInfo->maxDataVersion) < 0) return -1;
|
||||||
if (tEncodeI64(&encoder, pInfo->scanWindow.ekey) < 0) return -1;
|
|
||||||
if (tEncodeU64(&encoder, pInfo->scanGroupId) < 0) return -1;
|
|
||||||
if (tEncodeU64(&encoder, pInfo->maxVersion) < 0) return -1;
|
|
||||||
|
|
||||||
tEndEncode(&encoder);
|
tEndEncode(&encoder);
|
||||||
|
|
||||||
|
@ -393,11 +365,7 @@ int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo) {
|
||||||
taosHashPut(pInfo->pMap, &uid, sizeof(uint64_t), &ts, sizeof(TSKEY));
|
taosHashPut(pInfo->pMap, &uid, sizeof(uint64_t), &ts, sizeof(TSKEY));
|
||||||
}
|
}
|
||||||
ASSERT(mapSize == taosHashGetSize(pInfo->pMap));
|
ASSERT(mapSize == taosHashGetSize(pInfo->pMap));
|
||||||
|
if (tDecodeU64(&decoder, &pInfo->maxDataVersion) < 0) return -1;
|
||||||
if (tDecodeI64(&decoder, &pInfo->scanWindow.skey) < 0) return -1;
|
|
||||||
if (tDecodeI64(&decoder, &pInfo->scanWindow.ekey) < 0) return -1;
|
|
||||||
if (tDecodeU64(&decoder, &pInfo->scanGroupId) < 0) return -1;
|
|
||||||
if (tDecodeU64(&decoder, &pInfo->maxVersion) < 0) return -1;
|
|
||||||
|
|
||||||
tEndDecode(&decoder);
|
tEndDecode(&decoder);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue