fix(stream): return after get value.

This commit is contained in:
Haojun Liao 2024-07-17 11:17:27 +08:00
parent 872cae1386
commit 1ed620e702
3 changed files with 124 additions and 47 deletions

View File

@ -48,7 +48,7 @@ typedef struct {
static int32_t getCurrentBlockInfo(SDataBlockIter* pBlockIter, SFileDataBlockInfo** pInfo);
static int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity,
STsdbReader* pReader);
static TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader);
static int32_t getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader, TSDBROW** pRow);
static int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, SRowKey* pKey,
STsdbReader* pReader);
static int32_t doMergeRowsInSttBlock(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo,
@ -2025,9 +2025,11 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
SArray* pDelList = pBlockScanInfo->delSkyline;
int32_t pkSrcSlot = pReader->suppInfo.pkSrcSlot;
TSDBROW* pRow = NULL;
TSDBROW* piRow = NULL;
TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pDelList, pReader);
TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pDelList, pReader);
getValidMemRow(&pBlockScanInfo->iter, pDelList, pReader, &pRow);
getValidMemRow(&pBlockScanInfo->iiter, pDelList, pReader, &piRow);
SRowKey* pSttKey = NULL;
if (hasDataInSttBlock(pBlockScanInfo) && (!pBlockScanInfo->cleanSttBlocks)) {
@ -2201,9 +2203,10 @@ int32_t doInitMemDataIter(STsdbReader* pReader, STbData** pData, STableBlockScan
static void doForwardDataIter(SRowKey* pKey, SIterInfo* pIter, STableBlockScanInfo* pBlockScanInfo,
STsdbReader* pReader) {
SRowKey rowKey = {0};
TSDBROW* pRow = NULL;
while (1) {
TSDBROW* pRow = getValidMemRow(pIter, pBlockScanInfo->delSkyline, pReader);
getValidMemRow(pIter, pBlockScanInfo->delSkyline, pReader, &pRow);
if (!pIter->hasVal) {
break;
}
@ -2559,11 +2562,11 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI
}
if (pBlockScanInfo->iter.hasVal) {
pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader, &pRow);
}
if (pBlockScanInfo->iiter.hasVal) {
piRow = getValidMemRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader);
getValidMemRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader, &piRow);
}
// two levels of mem-table does contain the valid rows
@ -2810,13 +2813,16 @@ TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader)
TSDBKEY key = {.ts = TSKEY_INITIAL_VAL}, ikey = {.ts = TSKEY_INITIAL_VAL};
bool hasKey = false, hasIKey = false;
TSDBROW* pRow = getValidMemRow(&pScanInfo->iter, pScanInfo->delSkyline, pReader);
TSDBROW* pRow = NULL;
TSDBROW* pIRow = NULL;
getValidMemRow(&pScanInfo->iter, pScanInfo->delSkyline, pReader, &pRow);
if (pRow != NULL) {
hasKey = true;
key = TSDBROW_KEY(pRow);
}
TSDBROW* pIRow = getValidMemRow(&pScanInfo->iiter, pScanInfo->delSkyline, pReader);
getValidMemRow(&pScanInfo->iiter, pScanInfo->delSkyline, pReader, &pIRow);
if (pIRow != NULL) {
hasIKey = true;
ikey = TSDBROW_KEY(pIRow);
@ -3742,9 +3748,11 @@ bool hasBeenDropped(const SArray* pDelList, int32_t* index, int64_t key, int64_t
return false;
}
FORCE_INLINE TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader) {
FORCE_INLINE int32_t getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader, TSDBROW** pRes) {
*pRes = NULL;
if (!pIter->hasVal) {
return NULL;
return TSDB_CODE_SUCCESS;
}
int32_t order = pReader->info.order;
@ -3754,18 +3762,20 @@ FORCE_INLINE TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, S
TSDBROW_INIT_KEY(pRow, key);
if (outOfTimeWindow(key.ts, &pReader->info.window)) {
pIter->hasVal = false;
return NULL;
return TSDB_CODE_SUCCESS;
}
// it is a valid data version
if (key.version <= pReader->info.verRange.maxVer && key.version >= pReader->info.verRange.minVer) {
if (pDelList == NULL || TARRAY_SIZE(pDelList) == 0) {
return pRow;
*pRes = pRow;
return TSDB_CODE_SUCCESS;
} else {
bool dropped = hasBeenDropped(pDelList, &pIter->index, key.ts, key.version, order, &pReader->info.verRange,
pReader->suppInfo.numOfPks > 0);
if (!dropped) {
return pRow;
*pRes = pRow;
return TSDB_CODE_SUCCESS;
}
}
}
@ -3773,7 +3783,7 @@ FORCE_INLINE TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, S
while (1) {
pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
if (!pIter->hasVal) {
return NULL;
return TSDB_CODE_SUCCESS;
}
pRow = tsdbTbDataIterGet(pIter->iter);
@ -3781,17 +3791,19 @@ FORCE_INLINE TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, S
TSDBROW_INIT_KEY(pRow, key);
if (outOfTimeWindow(key.ts, &pReader->info.window)) {
pIter->hasVal = false;
return NULL;
return TSDB_CODE_SUCCESS;
}
if (key.version <= pReader->info.verRange.maxVer && key.version >= pReader->info.verRange.minVer) {
if (pDelList == NULL || TARRAY_SIZE(pDelList) == 0) {
return pRow;
*pRes = pRow;
return TSDB_CODE_SUCCESS;
} else {
bool dropped = hasBeenDropped(pDelList, &pIter->index, key.ts, key.version, order, &pReader->info.verRange,
pReader->suppInfo.numOfPks > 0);
if (!dropped) {
return pRow;
*pRes = pRow;
return TSDB_CODE_SUCCESS;
}
}
}
@ -3809,7 +3821,8 @@ int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, SRowKey* pCurKey, SArra
}
// data exists but not valid
TSDBROW* pRow = getValidMemRow(pIter, pDelList, pReader);
TSDBROW* pRow = NULL;
getValidMemRow(pIter, pDelList, pReader, &pRow);
if (pRow == NULL) {
break;
}
@ -3974,7 +3987,7 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, SRowKey* pKey, uint64_t uid, SIt
*freeTSRow = false;
return TSDB_CODE_SUCCESS;
} else { // has next point in mem/imem
pNextRow = getValidMemRow(pIter, pDelList, pReader);
getValidMemRow(pIter, pDelList, pReader, &pNextRow);
if (pNextRow == NULL) {
*pResRow = current;
*freeTSRow = false;
@ -4118,8 +4131,11 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, SRowKey* pRowKey, TSDBROW* piRow, SRow
static int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, TSDBROW* pResRow,
int64_t endKey, bool* freeTSRow) {
TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader);
TSDBROW* pRow = NULL;
TSDBROW* piRow = NULL;
getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader, &pRow);
getValidMemRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader, &piRow);
SArray* pDelList = pBlockScanInfo->delSkyline;
uint64_t uid = pBlockScanInfo->uid;

View File

@ -37,7 +37,10 @@ static int32_t initBlockScanInfoBuf(SBlockInfoBuf* pBuf, int32_t numOfTables) {
return TSDB_CODE_OUT_OF_MEMORY;
}
taosArrayPush(pBuf->pData, &p);
void* px = taosArrayPush(pBuf->pData, &p);
if (px == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
if (remainder > 0) {
@ -45,7 +48,10 @@ static int32_t initBlockScanInfoBuf(SBlockInfoBuf* pBuf, int32_t numOfTables) {
if (p == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
taosArrayPush(pBuf->pData, &p);
void* px = taosArrayPush(pBuf->pData, &p);
if (px == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
pBuf->numOfTables = numOfTables;
@ -86,7 +92,10 @@ int32_t ensureBlockScanInfoBuf(SBlockInfoBuf* pBuf, int32_t numOfTables) {
return TSDB_CODE_OUT_OF_MEMORY;
}
taosArrayPush(pBuf->pData, &p);
void* px = taosArrayPush(pBuf->pData, &p);
if (px == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
if (remainder > 0) {
@ -94,7 +103,10 @@ int32_t ensureBlockScanInfoBuf(SBlockInfoBuf* pBuf, int32_t numOfTables) {
if (p == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
taosArrayPush(pBuf->pData, &p);
void* px = taosArrayPush(pBuf->pData, &p);
if (px == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
pBuf->numOfTables = numOfTables;
@ -214,7 +226,8 @@ void clearRowKey(SRowKey* pKey) {
taosMemoryFreeClear(pKey->pks[0].pData);
}
static void initLastProcKey(STableBlockScanInfo *pScanInfo, STsdbReader* pReader) {
static int32_t initLastProcKey(STableBlockScanInfo *pScanInfo, STsdbReader* pReader) {
int32_t code = 0;
int32_t numOfPks = pReader->suppInfo.numOfPks;
bool asc = ASCENDING_TRAVERSE(pReader->info.order);
int8_t type = pReader->suppInfo.pk.type;
@ -225,18 +238,37 @@ static void initLastProcKey(STableBlockScanInfo *pScanInfo, STsdbReader* pReader
int64_t skey = pReader->info.window.skey;
int64_t ts = (skey > INT64_MIN) ? (skey - 1) : skey;
initRowKey(pRowKey, ts, numOfPks, type, bytes, asc);
initRowKey(&pScanInfo->sttKeyInfo.nextProcKey, skey, numOfPks, type, bytes, asc);
code = initRowKey(pRowKey, ts, numOfPks, type, bytes, asc);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
code = initRowKey(&pScanInfo->sttKeyInfo.nextProcKey, skey, numOfPks, type, bytes, asc);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
} else {
int64_t ekey = pReader->info.window.ekey;
int64_t ts = (ekey < INT64_MAX) ? (ekey + 1) : ekey;
initRowKey(pRowKey, ts, numOfPks, type, bytes, asc);
initRowKey(&pScanInfo->sttKeyInfo.nextProcKey, ekey, numOfPks, type, bytes, asc);
code = initRowKey(pRowKey, ts, numOfPks, type, bytes, asc);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
code = initRowKey(&pScanInfo->sttKeyInfo.nextProcKey, ekey, numOfPks, type, bytes, asc);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
initRowKey(&pScanInfo->sttRange.skey, INT64_MAX, numOfPks, type, bytes, asc);
initRowKey(&pScanInfo->sttRange.ekey, INT64_MIN, numOfPks, type, bytes, asc);
code = initRowKey(&pScanInfo->sttRange.skey, INT64_MAX, numOfPks, type, bytes, asc);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
code = initRowKey(&pScanInfo->sttRange.ekey, INT64_MIN, numOfPks, type, bytes, asc);
return code;
}
int32_t initTableBlockScanInfo(STableBlockScanInfo* pScanInfo, uint64_t uid, SSHashObj* pTableMap,
@ -248,10 +280,13 @@ int32_t initTableBlockScanInfo(STableBlockScanInfo* pScanInfo, uint64_t uid, SSH
pScanInfo->cleanSttBlocks = false;
pScanInfo->sttBlockReturned = false;
initLastProcKey(pScanInfo, pReader);
int32_t code = initLastProcKey(pScanInfo, pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
pScanInfo->sttKeyInfo.status = STT_FILE_READER_UNINIT;
int32_t code = tSimpleHashPut(pTableMap, &pScanInfo->uid, sizeof(uint64_t), &pScanInfo, POINTER_BYTES);
code = tSimpleHashPut(pTableMap, &pScanInfo->uid, sizeof(uint64_t), &pScanInfo, POINTER_BYTES);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
@ -607,7 +642,10 @@ int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int3
STableBlockScanInfo* pTableScanInfo = taosArrayGetP(pTableList, 0);
for (int32_t i = 0; i < numOfBlocks; ++i) {
STableDataBlockIdx tableDataBlockIdx = {.globalIndex = i};
taosArrayPush(pTableScanInfo->pBlockIdxList, &tableDataBlockIdx);
void* px = taosArrayPush(pTableScanInfo->pBlockIdxList, &tableDataBlockIdx);
if (px == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
taosArrayAddAll(pBlockIter->blockList, pTableScanInfo->pBlockList);
@ -640,11 +678,18 @@ int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int3
int32_t index = sup.indexPerTable[pos]++;
SFileDataBlockInfo* pBlockInfo = taosArrayGet(sup.pDataBlockInfo[pos][index].pInfo->pBlockList, index);
taosArrayPush(pBlockIter->blockList, pBlockInfo);
void* px = taosArrayPush(pBlockIter->blockList, pBlockInfo);
if (px == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
STableBlockScanInfo* pTableScanInfo = sup.pDataBlockInfo[pos][index].pInfo;
STableDataBlockIdx tableDataBlockIdx = {.globalIndex = numOfTotal};
taosArrayPush(pTableScanInfo->pBlockIdxList, &tableDataBlockIdx);
px = taosArrayPush(pTableScanInfo->pBlockIdxList, &tableDataBlockIdx);
if (px == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
// set data block index overflow, in order to disable the offset comparator
if (sup.indexPerTable[pos] >= sup.numOfBlocksPerTable[pos]) {
@ -752,7 +797,10 @@ static int32_t doCheckTombBlock(STombBlock* pBlock, STsdbReader* pReader, int32_
if (record.version <= pReader->info.verRange.maxVer) {
SDelData delData = {.version = record.version, .sKey = record.skey, .eKey = record.ekey};
taosArrayPush(pScanInfo->pFileDelData, &delData);
void* px = taosArrayPush(pScanInfo->pFileDelData, &delData);
if (px == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
}
@ -858,7 +906,10 @@ int32_t loadMemTombData(SArray** ppMemDelData, STbData* pMemTbData, STbData* piM
p = pMemTbData->pHead;
while (p) {
if (p->version <= ver) {
taosArrayPush(pMemDelData, p);
void* px = taosArrayPush(pMemDelData, p);
if (px == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
p = p->pNext;
@ -870,7 +921,10 @@ int32_t loadMemTombData(SArray** ppMemDelData, STbData* pMemTbData, STbData* piM
p = piMemTbData->pHead;
while (p) {
if (p->version <= ver) {
taosArrayPush(pMemDelData, p);
void* px = taosArrayPush(pMemDelData, p);
if (px == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
p = p->pNext;
}
@ -914,7 +968,7 @@ int32_t getNumOfRowsInSttBlock(SSttFileReader* pSttFileReader, SSttBlockLoadInfo
}
if (index >= pStatisBlock->numOfRecords) {
tStatisBlockDestroy(pStatisBlock);
(void) tStatisBlockDestroy(pStatisBlock);
taosMemoryFreeClear(pStatisBlock);
return num;
}
@ -924,7 +978,7 @@ int32_t getNumOfRowsInSttBlock(SSttFileReader* pSttFileReader, SSttBlockLoadInfo
while (i < TARRAY2_SIZE(pStatisBlkArray) && uidIndex < numOfTables) {
p = &pStatisBlkArray->data[i];
if (p->minTbid.suid > suid) {
tStatisBlockDestroy(pStatisBlock);
(void) tStatisBlockDestroy(pStatisBlock);
taosMemoryFreeClear(pStatisBlock);
return num;
}
@ -944,7 +998,7 @@ int32_t getNumOfRowsInSttBlock(SSttFileReader* pSttFileReader, SSttBlockLoadInfo
}
}
tStatisBlockDestroy(pStatisBlock);
(void) tStatisBlockDestroy(pStatisBlock);
taosMemoryFreeClear(pStatisBlock);
return num;
}
@ -961,14 +1015,17 @@ static void loadNextStatisticsBlock(SSttFileReader* pSttFileReader, STbStatisBlo
}
}
void doAdjustValidDataIters(SArray* pLDIterList, int32_t numOfFileObj) {
int32_t doAdjustValidDataIters(SArray* pLDIterList, int32_t numOfFileObj) {
int32_t size = taosArrayGetSize(pLDIterList);
if (size < numOfFileObj) {
int32_t inc = numOfFileObj - size;
for (int32_t k = 0; k < inc; ++k) {
SLDataIter* pIter = taosMemoryCalloc(1, sizeof(SLDataIter));
taosArrayPush(pLDIterList, &pIter);
void* px = taosArrayPush(pLDIterList, &pIter);
if (px == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
} else if (size > numOfFileObj) { // remove unused LDataIter
int32_t inc = size - numOfFileObj;
@ -978,6 +1035,8 @@ void doAdjustValidDataIters(SArray* pLDIterList, int32_t numOfFileObj) {
destroyLDataIter(pIter);
}
}
return TSDB_CODE_SUCCESS;
}
int32_t adjustSttDataIters(SArray* pSttFileBlockIterArray, STFileSet* pFileSet) {
@ -986,7 +1045,10 @@ int32_t adjustSttDataIters(SArray* pSttFileBlockIterArray, STFileSet* pFileSet)
// add the list/iter placeholder
while (taosArrayGetSize(pSttFileBlockIterArray) < numOfLevels) {
SArray* pList = taosArrayInit(4, POINTER_BYTES);
taosArrayPush(pSttFileBlockIterArray, &pList);
void* px = taosArrayPush(pSttFileBlockIterArray, &pList);
if (px == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
for (int32_t j = 0; j < numOfLevels; ++j) {

View File

@ -247,7 +247,6 @@ typedef struct SDataBlockIter {
typedef struct SFileBlockDumpInfo {
int32_t totalRows;
int32_t rowIndex;
// STsdbRowKey lastKey; // this key should be removed
bool allDumped;
} SFileBlockDumpInfo;