Merge pull request #26881 from taosdata/fix/create_tb
fix(tsdb): check return value
This commit is contained in:
commit
2c73757aa5
|
@ -301,8 +301,10 @@ static int32_t binarySearchForStartRowIndex(uint64_t *uidList, int32_t num, uint
|
|||
|
||||
static int32_t extractSttBlockInfo(SLDataIter *pIter, const TSttBlkArray *pArray, SSttBlockLoadInfo *pBlockLoadInfo,
|
||||
uint64_t suid) {
|
||||
void *px = NULL;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
if (TARRAY2_SIZE(pArray) <= 0) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return code;
|
||||
}
|
||||
|
||||
SSttBlk *pStart = &pArray->data[0];
|
||||
|
@ -316,10 +318,17 @@ static int32_t extractSttBlockInfo(SLDataIter *pIter, const TSttBlkArray *pArray
|
|||
return TSDB_CODE_SUCCESS;
|
||||
} else { // all blocks are qualified
|
||||
taosArrayClear(pBlockLoadInfo->aSttBlk);
|
||||
taosArrayAddBatch(pBlockLoadInfo->aSttBlk, pArray->data, pArray->size);
|
||||
px = taosArrayAddBatch(pBlockLoadInfo->aSttBlk, pArray->data, pArray->size);
|
||||
if (px == NULL){
|
||||
return terrno;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
SArray *pTmp = taosArrayInit(TARRAY2_SIZE(pArray), sizeof(SSttBlk));
|
||||
if (pTmp == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < TARRAY2_SIZE(pArray); ++i) {
|
||||
SSttBlk *p = &pArray->data[i];
|
||||
if (p->suid < suid) {
|
||||
|
@ -327,7 +336,11 @@ static int32_t extractSttBlockInfo(SLDataIter *pIter, const TSttBlkArray *pArray
|
|||
}
|
||||
|
||||
if (p->suid == suid) {
|
||||
taosArrayPush(pTmp, p);
|
||||
void* px = taosArrayPush(pTmp, p);
|
||||
if (px == NULL) {
|
||||
code = terrno;
|
||||
break;
|
||||
}
|
||||
} else if (p->suid > suid) {
|
||||
break;
|
||||
}
|
||||
|
@ -337,7 +350,7 @@ static int32_t extractSttBlockInfo(SLDataIter *pIter, const TSttBlkArray *pArray
|
|||
pBlockLoadInfo->aSttBlk = pTmp;
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tValueDupPayload(SValue *pVal) {
|
||||
|
@ -357,9 +370,11 @@ static int32_t tValueDupPayload(SValue *pVal) {
|
|||
|
||||
static int32_t loadSttStatisticsBlockData(SSttFileReader *pSttFileReader, SSttBlockLoadInfo *pBlockLoadInfo,
|
||||
TStatisBlkArray *pStatisBlkArray, uint64_t suid, const char *id) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
void* px = NULL;
|
||||
int32_t numOfBlocks = TARRAY2_SIZE(pStatisBlkArray);
|
||||
if (numOfBlocks <= 0) {
|
||||
return 0;
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t startIndex = 0;
|
||||
|
@ -385,7 +400,10 @@ static int32_t loadSttStatisticsBlockData(SSttFileReader *pSttFileReader, SSttBl
|
|||
int64_t st = taosGetTimestampUs();
|
||||
|
||||
for (int32_t k = startIndex; k < endIndex; ++k) {
|
||||
tsdbSttFileReadStatisBlock(pSttFileReader, &pStatisBlkArray->data[k], &block);
|
||||
code = tsdbSttFileReadStatisBlock(pSttFileReader, &pStatisBlkArray->data[k], &block);
|
||||
if (code) {
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t i = 0;
|
||||
int32_t rows = block.numOfRecords;
|
||||
|
@ -409,21 +427,43 @@ static int32_t loadSttStatisticsBlockData(SSttFileReader *pSttFileReader, SSttBl
|
|||
int32_t size = rows - i;
|
||||
int32_t offset = i * sizeof(int64_t);
|
||||
|
||||
taosArrayAddBatch(pBlockLoadInfo->info.pUid, tBufferGetDataAt(&block.uids, offset), size);
|
||||
taosArrayAddBatch(pBlockLoadInfo->info.pFirstTs, tBufferGetDataAt(&block.firstKeyTimestamps, offset), size);
|
||||
taosArrayAddBatch(pBlockLoadInfo->info.pLastTs, tBufferGetDataAt(&block.lastKeyTimestamps, offset), size);
|
||||
taosArrayAddBatch(pBlockLoadInfo->info.pCount, tBufferGetDataAt(&block.counts, offset), size);
|
||||
px = taosArrayAddBatch(pBlockLoadInfo->info.pUid, tBufferGetDataAt(&block.uids, offset), size);
|
||||
if (px == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
px = taosArrayAddBatch(pBlockLoadInfo->info.pFirstTs, tBufferGetDataAt(&block.firstKeyTimestamps, offset), size);
|
||||
if (px == NULL){
|
||||
return terrno;
|
||||
}
|
||||
|
||||
px = taosArrayAddBatch(pBlockLoadInfo->info.pLastTs, tBufferGetDataAt(&block.lastKeyTimestamps, offset), size);
|
||||
if (px == NULL){
|
||||
return terrno;
|
||||
}
|
||||
|
||||
px = taosArrayAddBatch(pBlockLoadInfo->info.pCount, tBufferGetDataAt(&block.counts, offset), size);
|
||||
if (px == NULL){
|
||||
return terrno;
|
||||
}
|
||||
|
||||
if (block.numOfPKs > 0) {
|
||||
SValue vFirst = {0}, vLast = {0};
|
||||
for (int32_t f = i; f < rows; ++f) {
|
||||
int32_t code = tValueColumnGet(&block.firstKeyPKs[0], f, &vFirst);
|
||||
code = tValueColumnGet(&block.firstKeyPKs[0], f, &vFirst);
|
||||
if (code) {
|
||||
break;
|
||||
}
|
||||
|
||||
tValueDupPayload(&vFirst);
|
||||
taosArrayPush(pBlockLoadInfo->info.pFirstKey, &vFirst);
|
||||
code = tValueDupPayload(&vFirst);
|
||||
if (code) {
|
||||
break;
|
||||
}
|
||||
|
||||
px = taosArrayPush(pBlockLoadInfo->info.pFirstKey, &vFirst);
|
||||
if (px == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
// todo add api to clone the original data
|
||||
code = tValueColumnGet(&block.lastKeyPKs[0], f, &vLast);
|
||||
|
@ -431,14 +471,28 @@ static int32_t loadSttStatisticsBlockData(SSttFileReader *pSttFileReader, SSttBl
|
|||
break;
|
||||
}
|
||||
|
||||
tValueDupPayload(&vLast);
|
||||
taosArrayPush(pBlockLoadInfo->info.pLastKey, &vLast);
|
||||
code = tValueDupPayload(&vLast);
|
||||
if (code) {
|
||||
break;
|
||||
}
|
||||
|
||||
px = taosArrayPush(pBlockLoadInfo->info.pLastKey, &vLast);
|
||||
if (px == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
SValue vFirst = {0};
|
||||
for (int32_t j = 0; j < size; ++j) {
|
||||
taosArrayPush(pBlockLoadInfo->info.pFirstKey, &vFirst);
|
||||
taosArrayPush(pBlockLoadInfo->info.pLastKey, &vFirst);
|
||||
px = taosArrayPush(pBlockLoadInfo->info.pFirstKey, &vFirst);
|
||||
if (px == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
px = taosArrayPush(pBlockLoadInfo->info.pLastKey, &vFirst);
|
||||
if (px == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
@ -450,24 +504,59 @@ static int32_t loadSttStatisticsBlockData(SSttFileReader *pSttFileReader, SSttBl
|
|||
break;
|
||||
}
|
||||
|
||||
taosArrayPush(pBlockLoadInfo->info.pUid, &record.uid);
|
||||
taosArrayPush(pBlockLoadInfo->info.pCount, &record.count);
|
||||
px = taosArrayPush(pBlockLoadInfo->info.pUid, &record.uid);
|
||||
if (px == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
taosArrayPush(pBlockLoadInfo->info.pFirstTs, &record.firstKey.ts);
|
||||
taosArrayPush(pBlockLoadInfo->info.pLastTs, &record.lastKey.ts);
|
||||
px = taosArrayPush(pBlockLoadInfo->info.pCount, &record.count);
|
||||
if (px == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
px = taosArrayPush(pBlockLoadInfo->info.pFirstTs, &record.firstKey.ts);
|
||||
if (px == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
px = taosArrayPush(pBlockLoadInfo->info.pLastTs, &record.lastKey.ts);
|
||||
if (px == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
if (record.firstKey.numOfPKs > 0) {
|
||||
SValue s = record.firstKey.pks[0];
|
||||
tValueDupPayload(&s);
|
||||
taosArrayPush(pBlockLoadInfo->info.pFirstKey, &s);
|
||||
code = tValueDupPayload(&s);
|
||||
if (code) {
|
||||
return code;
|
||||
}
|
||||
|
||||
px = taosArrayPush(pBlockLoadInfo->info.pFirstKey, &s);
|
||||
if (px == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
s = record.lastKey.pks[0];
|
||||
tValueDupPayload(&s);
|
||||
taosArrayPush(pBlockLoadInfo->info.pLastKey, &s);
|
||||
code = tValueDupPayload(&s);
|
||||
if (code) {
|
||||
return code;
|
||||
}
|
||||
|
||||
px = taosArrayPush(pBlockLoadInfo->info.pLastKey, &s);
|
||||
if (px == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
} else {
|
||||
SValue v = {0};
|
||||
taosArrayPush(pBlockLoadInfo->info.pFirstKey, &v);
|
||||
taosArrayPush(pBlockLoadInfo->info.pLastKey, &v);
|
||||
px = taosArrayPush(pBlockLoadInfo->info.pFirstKey, &v);
|
||||
if (px == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
px = taosArrayPush(pBlockLoadInfo->info.pLastKey, &v);
|
||||
if (px == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
}
|
||||
|
||||
i += 1;
|
||||
|
@ -482,7 +571,7 @@ static int32_t loadSttStatisticsBlockData(SSttFileReader *pSttFileReader, SSttBl
|
|||
pBlockLoadInfo->cost.statisElapsedTime += el;
|
||||
|
||||
tsdbDebug("%s load %d statis blocks into buf, elapsed time:%.2fms", id, num, el);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t doLoadSttFilesBlk(SSttBlockLoadInfo *pBlockLoadInfo, SLDataIter *pIter, int64_t suid,
|
||||
|
@ -617,7 +706,7 @@ int32_t tLDataIterOpen2(SLDataIter *pIter, SSttFileReader *pSttFileReader, int32
|
|||
}
|
||||
|
||||
void tLDataIterClose2(SLDataIter *pIter) {
|
||||
tsdbSttFileReaderClose(&pIter->pReader);
|
||||
(void) tsdbSttFileReaderClose(&pIter->pReader); // always return 0
|
||||
pIter->pReader = NULL;
|
||||
}
|
||||
|
||||
|
@ -890,7 +979,10 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf, SSttDataInfoF
|
|||
goto _end;
|
||||
}
|
||||
|
||||
adjustSttDataIters(pConf->pSttFileBlockIterArray, pConf->pCurrentFileset);
|
||||
code = adjustSttDataIters(pConf->pSttFileBlockIterArray, pConf->pCurrentFileset);
|
||||
if (code) {
|
||||
goto _end;
|
||||
}
|
||||
|
||||
for (int32_t j = 0; j < numOfLevels; ++j) {
|
||||
SSttLvl *pSttLevel = ((STFileSet *)pConf->pCurrentFileset)->lvlArr->data[j];
|
||||
|
@ -940,7 +1032,10 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf, SSttDataInfoF
|
|||
|
||||
// let's record the time window for current table of uid in the stt files
|
||||
if (pSttDataInfo != NULL && numOfRows > 0) {
|
||||
taosArrayPush(pSttDataInfo->pKeyRangeList, &range);
|
||||
void* px = taosArrayPush(pSttDataInfo->pKeyRangeList, &range);
|
||||
if (px == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
pSttDataInfo->numOfRows += numOfRows;
|
||||
}
|
||||
} else {
|
||||
|
@ -958,7 +1053,7 @@ _end:
|
|||
return code;
|
||||
}
|
||||
|
||||
void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter) { tRBTreePut(&pMTree->rbt, (SRBTreeNode *)pIter); }
|
||||
void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter) { (void) tRBTreePut(&pMTree->rbt, (SRBTreeNode *)pIter); }
|
||||
|
||||
bool tMergeTreeIgnoreEarlierTs(SMergeTree *pMTree) { return pMTree->ignoreEarlierTs; }
|
||||
|
||||
|
@ -1035,7 +1130,7 @@ bool tMergeTreeNext(SMergeTree *pMTree) {
|
|||
if (pMTree->pIter && pIter) {
|
||||
int32_t c = pMTree->rbt.cmprFn(&pMTree->pIter->node, &pIter->node);
|
||||
if (c > 0) {
|
||||
tRBTreePut(&pMTree->rbt, (SRBTreeNode *)pMTree->pIter);
|
||||
(void) tRBTreePut(&pMTree->rbt, (SRBTreeNode *)pMTree->pIter);
|
||||
pMTree->pIter = NULL;
|
||||
} else {
|
||||
ASSERT(c);
|
||||
|
|
Loading…
Reference in New Issue