fix(stream): remove the shared ptr between reader ptr.
This commit is contained in:
parent
0df1cff34e
commit
ba38a625cf
|
@ -173,7 +173,6 @@ typedef struct SReaderStatus {
|
||||||
SFilesetIter fileIter;
|
SFilesetIter fileIter;
|
||||||
SDataBlockIter blockIter;
|
SDataBlockIter blockIter;
|
||||||
SArray* pLDataIterArray;
|
SArray* pLDataIterArray;
|
||||||
// SLDataIter* pLDataIter;
|
|
||||||
SRowMerger merger;
|
SRowMerger merger;
|
||||||
SColumnInfoData* pPrimaryTsCol; // primary time stamp output col info data
|
SColumnInfoData* pPrimaryTsCol; // primary time stamp output col info data
|
||||||
} SReaderStatus;
|
} SReaderStatus;
|
||||||
|
@ -2444,7 +2443,6 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader
|
||||||
// row in last file block
|
// row in last file block
|
||||||
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
|
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
|
||||||
int64_t ts = getCurrentKeyInLastBlock(pLastBlockReader);
|
int64_t ts = getCurrentKeyInLastBlock(pLastBlockReader);
|
||||||
// ASSERT(ts >= key);
|
|
||||||
|
|
||||||
if (ASCENDING_TRAVERSE(pReader->order)) {
|
if (ASCENDING_TRAVERSE(pReader->order)) {
|
||||||
if (key < ts) { // imem, mem are all empty, file blocks (data blocks and last block) exist
|
if (key < ts) { // imem, mem are all empty, file blocks (data blocks and last block) exist
|
||||||
|
@ -2788,41 +2786,13 @@ static bool isValidFileBlockRow(SBlockData* pBlockData, SFileBlockDumpInfo* pDum
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t loadTomRecordInfoFromSttFiles(SArray* pLDataIterList, uint64_t suid, STableBlockScanInfo* pBlockScanInfo,
|
static int32_t checkTombBlockRecords(SArray* pData, STombBlock* pBlock, uint64_t suid, uint64_t uid, int64_t maxVer) {
|
||||||
uint64_t maxVer) {
|
|
||||||
int32_t size = taosArrayGetSize(pLDataIterList);
|
|
||||||
if (size <= 0) {
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
uint64_t uid = pBlockScanInfo->uid;
|
|
||||||
if (pBlockScanInfo->pDelData == NULL) {
|
|
||||||
pBlockScanInfo->pDelData = taosArrayInit(4, sizeof(SDelData));
|
|
||||||
}
|
|
||||||
|
|
||||||
STombRecord record = {0};
|
STombRecord record = {0};
|
||||||
|
|
||||||
for(int32_t i = 0; i < size; ++i) {
|
|
||||||
SArray* pLeveledLDataIter = taosArrayGetP(pLDataIterList, i);
|
|
||||||
|
|
||||||
int32_t numOfIter = taosArrayGetSize(pLeveledLDataIter);
|
|
||||||
if (numOfIter == 0) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
for (int32_t f = 0; f < numOfIter; ++f) {
|
|
||||||
SLDataIter* pIter = taosArrayGetP(pLeveledLDataIter, f);
|
|
||||||
|
|
||||||
SArray* pTombBlockArray = pIter->pBlockLoadInfo->pTombBlockArray;
|
|
||||||
|
|
||||||
int32_t numOfBlocks = taosArrayGetSize(pTombBlockArray);
|
|
||||||
for (int32_t k = 0; k < numOfBlocks; ++k) {
|
|
||||||
STombBlock* pBlock = taosArrayGetP(pTombBlockArray, k);
|
|
||||||
|
|
||||||
for (int32_t j = 0; j < pBlock->suid->size; ++j) {
|
for (int32_t j = 0; j < pBlock->suid->size; ++j) {
|
||||||
int32_t code = tTombBlockGet(pBlock, j, &record);
|
int32_t code = tTombBlockGet(pBlock, j, &record);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
// todo handle error
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (record.suid < suid) {
|
if (record.suid < suid) {
|
||||||
|
@ -2840,9 +2810,45 @@ static int32_t loadTomRecordInfoFromSttFiles(SArray* pLDataIterList, uint64_t su
|
||||||
|
|
||||||
if (record.version <= maxVer) {
|
if (record.version <= maxVer) {
|
||||||
SDelData delData = {.version = record.version, .sKey = record.skey, .eKey = record.ekey};
|
SDelData delData = {.version = record.version, .sKey = record.skey, .eKey = record.ekey};
|
||||||
taosArrayPush(pBlockScanInfo->pDelData, &delData);
|
taosArrayPush(pData, &delData);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t loadTomRecordInfoFromSttFiles(SArray* pLDataIterList, uint64_t suid, STableBlockScanInfo* pBlockScanInfo,
|
||||||
|
uint64_t maxVer) {
|
||||||
|
int32_t size = taosArrayGetSize(pLDataIterList);
|
||||||
|
if (size <= 0) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
uint64_t uid = pBlockScanInfo->uid;
|
||||||
|
if (pBlockScanInfo->pDelData == NULL) {
|
||||||
|
pBlockScanInfo->pDelData = taosArrayInit(4, sizeof(SDelData));
|
||||||
|
}
|
||||||
|
|
||||||
|
for(int32_t i = 0; i < size; ++i) {
|
||||||
|
SArray* pLeveledLDataIter = taosArrayGetP(pLDataIterList, i);
|
||||||
|
|
||||||
|
int32_t numOfIter = taosArrayGetSize(pLeveledLDataIter);
|
||||||
|
if (numOfIter == 0) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int32_t f = 0; f < numOfIter; ++f) {
|
||||||
|
SLDataIter* pIter = taosArrayGetP(pLeveledLDataIter, f);
|
||||||
|
|
||||||
|
SArray* pTombBlockArray = pIter->pBlockLoadInfo->pTombBlockArray;
|
||||||
|
int32_t numOfBlocks = taosArrayGetSize(pTombBlockArray);
|
||||||
|
for (int32_t k = 0; k < numOfBlocks; ++k) {
|
||||||
|
STombBlock* pBlock = taosArrayGetP(pTombBlockArray, k);
|
||||||
|
|
||||||
|
int32_t code = checkTombBlockRecords(pBlockScanInfo->pDelData, pBlock, suid, uid, maxVer);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -4803,7 +4809,6 @@ int32_t tsdbReaderOpen2(void* pVnode, SQueryTableDataCond* pCond, void* pTableLi
|
||||||
}
|
}
|
||||||
|
|
||||||
static void clearSharedPtr(STsdbReader* p) {
|
static void clearSharedPtr(STsdbReader* p) {
|
||||||
p->status.pLDataIterArray = NULL;
|
|
||||||
p->status.pTableMap = NULL;
|
p->status.pTableMap = NULL;
|
||||||
p->status.uidList.tableUidList = NULL;
|
p->status.uidList.tableUidList = NULL;
|
||||||
p->pReadSnap = NULL;
|
p->pReadSnap = NULL;
|
||||||
|
@ -4814,7 +4819,6 @@ static void clearSharedPtr(STsdbReader* p) {
|
||||||
|
|
||||||
static void setSharedPtr(STsdbReader* pDst, const STsdbReader* pSrc) {
|
static void setSharedPtr(STsdbReader* pDst, const STsdbReader* pSrc) {
|
||||||
pDst->status.pTableMap = pSrc->status.pTableMap;
|
pDst->status.pTableMap = pSrc->status.pTableMap;
|
||||||
pDst->status.pLDataIterArray = pSrc->status.pLDataIterArray;
|
|
||||||
pDst->status.uidList = pSrc->status.uidList;
|
pDst->status.uidList = pSrc->status.uidList;
|
||||||
pDst->pSchema = pSrc->pSchema;
|
pDst->pSchema = pSrc->pSchema;
|
||||||
pDst->pSchemaMap = pSrc->pSchemaMap;
|
pDst->pSchemaMap = pSrc->pSchemaMap;
|
||||||
|
|
Loading…
Reference in New Issue