refactor: remove void.
This commit is contained in:
parent
199de25410
commit
06f42443cb
|
@ -207,7 +207,10 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
|
||||||
updated = streamTaskUpdateEpsetInfo(pTask, req.pNodeList);
|
updated = streamTaskUpdateEpsetInfo(pTask, req.pNodeList);
|
||||||
|
|
||||||
// send the checkpoint-source-rsp for source task to end the checkpoint trans in mnode
|
// send the checkpoint-source-rsp for source task to end the checkpoint trans in mnode
|
||||||
(void)streamTaskSendCheckpointsourceRsp(pTask);
|
code = streamTaskSendCheckpointsourceRsp(pTask);
|
||||||
|
if (code) {
|
||||||
|
tqError("%s failed to send checkpoint-source rsp, code:%s", pTask->id.idStr, tstrerror(code));
|
||||||
|
}
|
||||||
streamTaskResetStatus(pTask);
|
streamTaskResetStatus(pTask);
|
||||||
|
|
||||||
streamTaskStopMonitorCheckRsp(&pTask->taskCheckInfo, pTask->id.idStr);
|
streamTaskStopMonitorCheckRsp(&pTask->taskCheckInfo, pTask->id.idStr);
|
||||||
|
@ -806,25 +809,26 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead
|
||||||
|
|
||||||
int32_t type = pReq->reqType;
|
int32_t type = pReq->reqType;
|
||||||
int32_t vgId = pMeta->vgId;
|
int32_t vgId = pMeta->vgId;
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
if (type == STREAM_EXEC_T_START_ONE_TASK) {
|
if (type == STREAM_EXEC_T_START_ONE_TASK) {
|
||||||
(void)streamMetaStartOneTask(pMeta, pReq->streamId, pReq->taskId);
|
code = streamMetaStartOneTask(pMeta, pReq->streamId, pReq->taskId);
|
||||||
return 0;
|
return 0;
|
||||||
} else if (type == STREAM_EXEC_T_START_ALL_TASKS) {
|
} else if (type == STREAM_EXEC_T_START_ALL_TASKS) {
|
||||||
(void)streamMetaStartAllTasks(pMeta);
|
code = streamMetaStartAllTasks(pMeta);
|
||||||
return 0;
|
return 0;
|
||||||
} else if (type == STREAM_EXEC_T_RESTART_ALL_TASKS) {
|
} else if (type == STREAM_EXEC_T_RESTART_ALL_TASKS) {
|
||||||
(void)restartStreamTasks(pMeta, isLeader);
|
code = restartStreamTasks(pMeta, isLeader);
|
||||||
return 0;
|
return 0;
|
||||||
} else if (type == STREAM_EXEC_T_STOP_ALL_TASKS) {
|
} else if (type == STREAM_EXEC_T_STOP_ALL_TASKS) {
|
||||||
(void)streamMetaStopAllTasks(pMeta);
|
code = streamMetaStopAllTasks(pMeta);
|
||||||
return 0;
|
return 0;
|
||||||
} else if (type == STREAM_EXEC_T_ADD_FAILED_TASK) {
|
} else if (type == STREAM_EXEC_T_ADD_FAILED_TASK) {
|
||||||
int32_t code = streamMetaAddFailedTask(pMeta, pReq->streamId, pReq->taskId);
|
code = streamMetaAddFailedTask(pMeta, pReq->streamId, pReq->taskId);
|
||||||
return code;
|
return code;
|
||||||
} else if (type == STREAM_EXEC_T_RESUME_TASK) { // task resume to run after idle for a while
|
} else if (type == STREAM_EXEC_T_RESUME_TASK) { // task resume to run after idle for a while
|
||||||
SStreamTask* pTask = NULL;
|
SStreamTask* pTask = NULL;
|
||||||
int32_t code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask);
|
code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask);
|
||||||
|
|
||||||
if (pTask != NULL && (code == 0)) {
|
if (pTask != NULL && (code == 0)) {
|
||||||
char* pStatus = NULL;
|
char* pStatus = NULL;
|
||||||
|
@ -846,7 +850,7 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead
|
||||||
}
|
}
|
||||||
|
|
||||||
SStreamTask* pTask = NULL;
|
SStreamTask* pTask = NULL;
|
||||||
int32_t code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask);
|
code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask);
|
||||||
if ((pTask != NULL) && (code == 0)) { // even in halt status, the data in inputQ must be processed
|
if ((pTask != NULL) && (code == 0)) { // even in halt status, the data in inputQ must be processed
|
||||||
char* p = NULL;
|
char* p = NULL;
|
||||||
if (streamTaskReadyToRun(pTask, &p)) {
|
if (streamTaskReadyToRun(pTask, &p)) {
|
||||||
|
@ -864,7 +868,7 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead
|
||||||
} else { // NOTE: pTask->status.schedStatus is not updated since it is not be handled by the run exec.
|
} else { // NOTE: pTask->status.schedStatus is not updated since it is not be handled by the run exec.
|
||||||
// todo add one function to handle this
|
// todo add one function to handle this
|
||||||
tqError("vgId:%d failed to found s-task, taskId:0x%x may have been dropped", vgId, pReq->taskId);
|
tqError("vgId:%d failed to found s-task, taskId:0x%x may have been dropped", vgId, pReq->taskId);
|
||||||
return -1;
|
return code;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1229,7 +1233,8 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
||||||
tqError(
|
tqError(
|
||||||
"vgId:%d process set consensus checkpointId req, failed to acquire task:0x%x, it may have been dropped already",
|
"vgId:%d process set consensus checkpointId req, failed to acquire task:0x%x, it may have been dropped already",
|
||||||
pMeta->vgId, req.taskId);
|
pMeta->vgId, req.taskId);
|
||||||
(void)streamMetaAddFailedTask(pMeta, req.streamId, req.taskId);
|
// ignore this code to avoid error code over write
|
||||||
|
int32_t ret = streamMetaAddFailedTask(pMeta, req.streamId, req.taskId);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -379,6 +379,7 @@ static int32_t loadSttStatisticsBlockData(SSttFileReader *pSttFileReader, SSttBl
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
void *px = NULL;
|
void *px = NULL;
|
||||||
int32_t startIndex = 0;
|
int32_t startIndex = 0;
|
||||||
|
double el = 0;
|
||||||
|
|
||||||
int32_t numOfBlocks = TARRAY2_SIZE(pStatisBlkArray);
|
int32_t numOfBlocks = TARRAY2_SIZE(pStatisBlkArray);
|
||||||
if (numOfBlocks <= 0) {
|
if (numOfBlocks <= 0) {
|
||||||
|
@ -489,7 +490,9 @@ static int32_t loadSttStatisticsBlockData(SSttFileReader *pSttFileReader, SSttBl
|
||||||
} else {
|
} else {
|
||||||
STbStatisRecord record = {0};
|
STbStatisRecord record = {0};
|
||||||
while (i < rows) {
|
while (i < rows) {
|
||||||
(void)tStatisBlockGet(&block, i, &record);
|
code = tStatisBlockGet(&block, i, &record);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _end);
|
||||||
|
|
||||||
if (record.suid != suid) {
|
if (record.suid != suid) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -536,12 +539,16 @@ static int32_t loadSttStatisticsBlockData(SSttFileReader *pSttFileReader, SSttBl
|
||||||
}
|
}
|
||||||
|
|
||||||
_end:
|
_end:
|
||||||
(void)tStatisBlockDestroy(&block);
|
el = (taosGetTimestampUs() - st) / 1000.0;
|
||||||
|
|
||||||
double el = (taosGetTimestampUs() - st) / 1000.0;
|
|
||||||
pBlockLoadInfo->cost.statisElapsedTime += el;
|
pBlockLoadInfo->cost.statisElapsedTime += el;
|
||||||
|
|
||||||
tsdbDebug("%s load %d statis blocks into buf, elapsed time:%.2fms", id, num, el);
|
if (code != 0) {
|
||||||
|
tsdbError("%s failed to load block data statistics, %s at line:%d, code:%s", id, __func__, lino, tstrerror(code));
|
||||||
|
} else {
|
||||||
|
tsdbDebug("%s load %d statis blocks into buf, elapsed time:%.2fms", id, num, el);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t ret = tStatisBlockDestroy(&block);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -677,7 +684,11 @@ int32_t tLDataIterOpen2(SLDataIter *pIter, SSttFileReader *pSttFileReader, int32
|
||||||
}
|
}
|
||||||
|
|
||||||
void tLDataIterClose2(SLDataIter *pIter) {
|
void tLDataIterClose2(SLDataIter *pIter) {
|
||||||
(void)tsdbSttFileReaderClose(&pIter->pReader); // always return 0
|
int32_t code = tsdbSttFileReaderClose(&pIter->pReader); // always return 0
|
||||||
|
if (code != 0) {
|
||||||
|
tsdbError("%" PRId64 " failed to close tsdb file reader, code:%s", pIter->cid, tstrerror(code));
|
||||||
|
}
|
||||||
|
|
||||||
pIter->pReader = NULL;
|
pIter->pReader = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -449,10 +449,14 @@ static int32_t tsdbUninitReaderLock(STsdbReader* pReader) {
|
||||||
|
|
||||||
static int32_t tsdbAcquireReader(STsdbReader* pReader) {
|
static int32_t tsdbAcquireReader(STsdbReader* pReader) {
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
tsdbTrace("tsdb/read: %p, pre-take read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);
|
tsdbTrace("tsdb/read: %s, pre-take read mutex: %p, code: %d", pReader->idStr, &pReader->readerMutex, code);
|
||||||
|
|
||||||
code = taosThreadMutexLock(&pReader->readerMutex);
|
code = taosThreadMutexLock(&pReader->readerMutex);
|
||||||
tsdbTrace("tsdb/read: %p, post-take read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);
|
if (code != 0) {
|
||||||
|
tsdbError("tsdb/read:%p, failed to lock reader mutex, code:%s", pReader->idStr, tstrerror(code));
|
||||||
|
} else {
|
||||||
|
tsdbTrace("tsdb/read: %s, post-take read mutex: %p, code: %d", pReader->idStr, &pReader->readerMutex, code);
|
||||||
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -4574,7 +4578,10 @@ int32_t tsdbSetTableList2(STsdbReader* pReader, const void* pTableList, int32_t
|
||||||
STableBlockScanInfo** p = NULL;
|
STableBlockScanInfo** p = NULL;
|
||||||
int32_t iter = 0;
|
int32_t iter = 0;
|
||||||
|
|
||||||
(void)tsdbAcquireReader(pReader);
|
code = tsdbAcquireReader(pReader);
|
||||||
|
if (code) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
while ((p = tSimpleHashIterate(pReader->status.pTableMap, p, &iter)) != NULL) {
|
while ((p = tSimpleHashIterate(pReader->status.pTableMap, p, &iter)) != NULL) {
|
||||||
clearBlockScanInfo(*p);
|
clearBlockScanInfo(*p);
|
||||||
|
@ -4805,7 +4812,10 @@ void tsdbReaderClose2(STsdbReader* pReader) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
(void)tsdbAcquireReader(pReader);
|
int32_t code = tsdbAcquireReader(pReader);
|
||||||
|
if (code) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
if (pReader->innerReader[0] != NULL || pReader->innerReader[1] != NULL) {
|
if (pReader->innerReader[0] != NULL || pReader->innerReader[1] != NULL) {
|
||||||
|
@ -5853,6 +5863,7 @@ int32_t tsdbTakeReadSnap2(STsdbReader* pReader, _query_reseek_func_t reseek, STs
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
STsdb* pTsdb = pReader->pTsdb;
|
STsdb* pTsdb = pReader->pTsdb;
|
||||||
SVersionRange* pRange = &pReader->info.verRange;
|
SVersionRange* pRange = &pReader->info.verRange;
|
||||||
|
int32_t lino = 0;
|
||||||
*ppSnap = NULL;
|
*ppSnap = NULL;
|
||||||
|
|
||||||
// lock
|
// lock
|
||||||
|
@ -5866,8 +5877,7 @@ int32_t tsdbTakeReadSnap2(STsdbReader* pReader, _query_reseek_func_t reseek, STs
|
||||||
STsdbReadSnap* pSnap = (STsdbReadSnap*)taosMemoryCalloc(1, sizeof(STsdbReadSnap));
|
STsdbReadSnap* pSnap = (STsdbReadSnap*)taosMemoryCalloc(1, sizeof(STsdbReadSnap));
|
||||||
if (pSnap == NULL) {
|
if (pSnap == NULL) {
|
||||||
(void) taosThreadMutexUnlock(&pTsdb->mutex);
|
(void) taosThreadMutexUnlock(&pTsdb->mutex);
|
||||||
code = terrno;
|
TSDB_CHECK_NULL(pSnap, code, lino, _exit, terrno);
|
||||||
goto _exit;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// take snapshot
|
// take snapshot
|
||||||
|
@ -5876,14 +5886,14 @@ int32_t tsdbTakeReadSnap2(STsdbReader* pReader, _query_reseek_func_t reseek, STs
|
||||||
pSnap->pNode = taosMemoryMalloc(sizeof(*pSnap->pNode));
|
pSnap->pNode = taosMemoryMalloc(sizeof(*pSnap->pNode));
|
||||||
if (pSnap->pNode == NULL) {
|
if (pSnap->pNode == NULL) {
|
||||||
(void) taosThreadMutexUnlock(&pTsdb->mutex);
|
(void) taosThreadMutexUnlock(&pTsdb->mutex);
|
||||||
code = terrno;
|
TSDB_CHECK_NULL(pSnap->pNode, code, lino, _exit, terrno);
|
||||||
goto _exit;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pSnap->pNode->pQHandle = pReader;
|
pSnap->pNode->pQHandle = pReader;
|
||||||
pSnap->pNode->reseek = reseek;
|
pSnap->pNode->reseek = reseek;
|
||||||
|
|
||||||
(void)tsdbRefMemTable(pTsdb->mem, pSnap->pNode);
|
code = tsdbRefMemTable(pTsdb->mem, pSnap->pNode);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pTsdb->imem && (pRange->minVer <= pTsdb->imem->maxVer && pRange->maxVer >= pTsdb->imem->minVer)) {
|
if (pTsdb->imem && (pRange->minVer <= pTsdb->imem->maxVer && pRange->maxVer >= pTsdb->imem->minVer)) {
|
||||||
|
@ -5903,7 +5913,8 @@ int32_t tsdbTakeReadSnap2(STsdbReader* pReader, _query_reseek_func_t reseek, STs
|
||||||
pSnap->pINode->pQHandle = pReader;
|
pSnap->pINode->pQHandle = pReader;
|
||||||
pSnap->pINode->reseek = reseek;
|
pSnap->pINode->reseek = reseek;
|
||||||
|
|
||||||
(void)tsdbRefMemTable(pTsdb->imem, pSnap->pINode);
|
code = tsdbRefMemTable(pTsdb->imem, pSnap->pINode);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
}
|
}
|
||||||
|
|
||||||
// fs
|
// fs
|
||||||
|
@ -5918,8 +5929,7 @@ int32_t tsdbTakeReadSnap2(STsdbReader* pReader, _query_reseek_func_t reseek, STs
|
||||||
}
|
}
|
||||||
|
|
||||||
(void) taosThreadMutexUnlock(&pTsdb->mutex);
|
(void) taosThreadMutexUnlock(&pTsdb->mutex);
|
||||||
goto _exit;
|
TSDB_CHECK_CODE(code, lino, _exit);}
|
||||||
}
|
|
||||||
|
|
||||||
// unlock
|
// unlock
|
||||||
(void) taosThreadMutexUnlock(&pTsdb->mutex);
|
(void) taosThreadMutexUnlock(&pTsdb->mutex);
|
||||||
|
@ -5929,7 +5939,8 @@ int32_t tsdbTakeReadSnap2(STsdbReader* pReader, _query_reseek_func_t reseek, STs
|
||||||
return code;
|
return code;
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
tsdbError("vgId:%d take read snapshot failed, code:%s", TD_VID(pTsdb->pVnode), tstrerror(code));
|
tsdbError("%s vgId:%d take read snapshot failed, line:%d code:%s", pReader->idStr, TD_VID(pTsdb->pVnode), lino,
|
||||||
|
tstrerror(code));
|
||||||
if (pSnap) {
|
if (pSnap) {
|
||||||
if (pSnap->pNode) taosMemoryFree(pSnap->pNode);
|
if (pSnap->pNode) taosMemoryFree(pSnap->pNode);
|
||||||
if (pSnap->pINode) taosMemoryFree(pSnap->pINode);
|
if (pSnap->pINode) taosMemoryFree(pSnap->pINode);
|
||||||
|
|
|
@ -779,8 +779,9 @@ typedef enum {
|
||||||
BLK_CHECK_QUIT = 0x2,
|
BLK_CHECK_QUIT = 0x2,
|
||||||
} ETombBlkCheckEnum;
|
} ETombBlkCheckEnum;
|
||||||
|
|
||||||
static void loadNextStatisticsBlock(SSttFileReader* pSttFileReader, STbStatisBlock* pStatisBlock,
|
static int32_t loadNextStatisticsBlock(SSttFileReader* pSttFileReader, STbStatisBlock* pStatisBlock,
|
||||||
const TStatisBlkArray* pStatisBlkArray, int32_t numOfRows, int32_t* i, int32_t* j);
|
const TStatisBlkArray* pStatisBlkArray, int32_t numOfRows, int32_t* i,
|
||||||
|
int32_t* j);
|
||||||
static int32_t doCheckTombBlock(STombBlock* pBlock, STsdbReader* pReader, int32_t numOfTables, int32_t* j,
|
static int32_t doCheckTombBlock(STombBlock* pBlock, STsdbReader* pReader, int32_t numOfTables, int32_t* j,
|
||||||
ETombBlkCheckEnum* pRet) {
|
ETombBlkCheckEnum* pRet) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
@ -912,7 +913,7 @@ static int32_t doLoadTombDataFromTombBlk(const TTombBlkArray* pTombBlkArray, STs
|
||||||
ETombBlkCheckEnum ret = 0;
|
ETombBlkCheckEnum ret = 0;
|
||||||
code = doCheckTombBlock(&block, pReader, numOfTables, &j, &ret);
|
code = doCheckTombBlock(&block, pReader, numOfTables, &j, &ret);
|
||||||
|
|
||||||
(void)tTombBlockDestroy(&block);
|
tTombBlockDestroy(&block);
|
||||||
if (code != TSDB_CODE_SUCCESS || ret == BLK_CHECK_QUIT) {
|
if (code != TSDB_CODE_SUCCESS || ret == BLK_CHECK_QUIT) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -994,11 +995,17 @@ int32_t loadMemTombData(SArray** ppMemDelData, STbData* pMemTbData, STbData* piM
|
||||||
|
|
||||||
int32_t getNumOfRowsInSttBlock(SSttFileReader* pSttFileReader, SSttBlockLoadInfo* pBlockLoadInfo,
|
int32_t getNumOfRowsInSttBlock(SSttFileReader* pSttFileReader, SSttBlockLoadInfo* pBlockLoadInfo,
|
||||||
TStatisBlkArray* pStatisBlkArray, uint64_t suid, const uint64_t* pUidList,
|
TStatisBlkArray* pStatisBlkArray, uint64_t suid, const uint64_t* pUidList,
|
||||||
int32_t numOfTables) {
|
int32_t numOfTables, int32_t* pNumOfRows) {
|
||||||
int32_t num = 0;
|
int32_t num = 0;
|
||||||
|
int32_t code = 0;
|
||||||
|
int32_t lino = 0;
|
||||||
|
|
||||||
|
if (pNumOfRows != 0) {
|
||||||
|
*pNumOfRows = 0;
|
||||||
|
}
|
||||||
|
|
||||||
if (TARRAY2_SIZE(pStatisBlkArray) <= 0) {
|
if (TARRAY2_SIZE(pStatisBlkArray) <= 0) {
|
||||||
return 0;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t i = 0;
|
int32_t i = 0;
|
||||||
|
@ -1007,18 +1014,19 @@ int32_t getNumOfRowsInSttBlock(SSttFileReader* pSttFileReader, SSttBlockLoadInfo
|
||||||
}
|
}
|
||||||
|
|
||||||
if (i >= TARRAY2_SIZE(pStatisBlkArray)) {
|
if (i >= TARRAY2_SIZE(pStatisBlkArray)) {
|
||||||
return 0;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
SStatisBlk* p = &pStatisBlkArray->data[i];
|
SStatisBlk* p = &pStatisBlkArray->data[i];
|
||||||
STbStatisBlock* pStatisBlock = taosMemoryCalloc(1, sizeof(STbStatisBlock));
|
STbStatisBlock* pStatisBlock = taosMemoryCalloc(1, sizeof(STbStatisBlock));
|
||||||
(void)tStatisBlockInit(pStatisBlock);
|
TSDB_CHECK_NULL(pStatisBlock, code, lino, _err, terrno);
|
||||||
|
|
||||||
|
code = tStatisBlockInit(pStatisBlock);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _err);
|
||||||
|
|
||||||
int64_t st = taosGetTimestampMs();
|
int64_t st = taosGetTimestampMs();
|
||||||
int32_t code = tsdbSttFileReadStatisBlock(pSttFileReader, p, pStatisBlock);
|
code = tsdbSttFileReadStatisBlock(pSttFileReader, p, pStatisBlock);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
TSDB_CHECK_CODE(code, lino, _err);
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
double el = (taosGetTimestampMs() - st) / 1000.0;
|
double el = (taosGetTimestampMs() - st) / 1000.0;
|
||||||
pBlockLoadInfo->cost.loadStatisBlocks += 1;
|
pBlockLoadInfo->cost.loadStatisBlocks += 1;
|
||||||
|
@ -1030,9 +1038,10 @@ int32_t getNumOfRowsInSttBlock(SSttFileReader* pSttFileReader, SSttBlockLoadInfo
|
||||||
}
|
}
|
||||||
|
|
||||||
if (index >= pStatisBlock->numOfRecords) {
|
if (index >= pStatisBlock->numOfRecords) {
|
||||||
(void)tStatisBlockDestroy(pStatisBlock);
|
code = tStatisBlockDestroy(pStatisBlock);
|
||||||
taosMemoryFreeClear(pStatisBlock);
|
taosMemoryFreeClear(pStatisBlock);
|
||||||
return num;
|
*pNumOfRows = num;
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t j = index;
|
int32_t j = index;
|
||||||
|
@ -1040,9 +1049,10 @@ int32_t getNumOfRowsInSttBlock(SSttFileReader* pSttFileReader, SSttBlockLoadInfo
|
||||||
while (i < TARRAY2_SIZE(pStatisBlkArray) && uidIndex < numOfTables) {
|
while (i < TARRAY2_SIZE(pStatisBlkArray) && uidIndex < numOfTables) {
|
||||||
p = &pStatisBlkArray->data[i];
|
p = &pStatisBlkArray->data[i];
|
||||||
if (p->minTbid.suid > suid) {
|
if (p->minTbid.suid > suid) {
|
||||||
(void)tStatisBlockDestroy(pStatisBlock);
|
code = tStatisBlockDestroy(pStatisBlock);
|
||||||
taosMemoryFreeClear(pStatisBlock);
|
taosMemoryFreeClear(pStatisBlock);
|
||||||
return num;
|
*pNumOfRows = num;
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
uint64_t uid = pUidList[uidIndex];
|
uint64_t uid = pUidList[uidIndex];
|
||||||
|
@ -1051,30 +1061,44 @@ int32_t getNumOfRowsInSttBlock(SSttFileReader* pSttFileReader, SSttBlockLoadInfo
|
||||||
num += ((int64_t*)pStatisBlock->counts.data)[j];
|
num += ((int64_t*)pStatisBlock->counts.data)[j];
|
||||||
uidIndex += 1;
|
uidIndex += 1;
|
||||||
j += 1;
|
j += 1;
|
||||||
loadNextStatisticsBlock(pSttFileReader, pStatisBlock, pStatisBlkArray, pStatisBlock->numOfRecords, &i, &j);
|
code = loadNextStatisticsBlock(pSttFileReader, pStatisBlock, pStatisBlkArray, pStatisBlock->numOfRecords, &i, &j);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _err);
|
||||||
} else if (((int64_t*)pStatisBlock->uids.data)[j] < uid) {
|
} else if (((int64_t*)pStatisBlock->uids.data)[j] < uid) {
|
||||||
j += 1;
|
j += 1;
|
||||||
loadNextStatisticsBlock(pSttFileReader, pStatisBlock, pStatisBlkArray, pStatisBlock->numOfRecords, &i, &j);
|
code = loadNextStatisticsBlock(pSttFileReader, pStatisBlock, pStatisBlkArray, pStatisBlock->numOfRecords, &i, &j);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _err);
|
||||||
} else {
|
} else {
|
||||||
uidIndex += 1;
|
uidIndex += 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
(void)tStatisBlockDestroy(pStatisBlock);
|
tStatisBlockDestroy(pStatisBlock);
|
||||||
taosMemoryFreeClear(pStatisBlock);
|
taosMemoryFreeClear(pStatisBlock);
|
||||||
return num;
|
*pNumOfRows = num;
|
||||||
|
return code;
|
||||||
|
|
||||||
|
_err:
|
||||||
|
tsdbError("%p failed to get number of rows in stt block, %s at line:%d code:%s", pSttFileReader, __func__, lino,
|
||||||
|
tstrerror(code));
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
// load next stt statistics block
|
// load next stt statistics block
|
||||||
static void loadNextStatisticsBlock(SSttFileReader* pSttFileReader, STbStatisBlock* pStatisBlock,
|
static int32_t loadNextStatisticsBlock(SSttFileReader* pSttFileReader, STbStatisBlock* pStatisBlock,
|
||||||
const TStatisBlkArray* pStatisBlkArray, int32_t numOfRows, int32_t* i, int32_t* j) {
|
const TStatisBlkArray* pStatisBlkArray, int32_t numOfRows, int32_t* i, int32_t* j) {
|
||||||
if ((*j) >= numOfRows) {
|
if ((*j) >= numOfRows) {
|
||||||
(*i) += 1;
|
(*i) += 1;
|
||||||
(*j) = 0;
|
(*j) = 0;
|
||||||
if ((*i) < TARRAY2_SIZE(pStatisBlkArray)) {
|
if ((*i) < TARRAY2_SIZE(pStatisBlkArray)) {
|
||||||
(void)tsdbSttFileReadStatisBlock(pSttFileReader, &pStatisBlkArray->data[(*i)], pStatisBlock);
|
int32_t code = tsdbSttFileReadStatisBlock(pSttFileReader, &pStatisBlkArray->data[(*i)], pStatisBlock);
|
||||||
|
if (code != 0) {
|
||||||
|
tsdbError("%p failed to read statisBlock, code:%s", pSttFileReader, tstrerror(code));
|
||||||
|
return code;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t doAdjustValidDataIters(SArray* pLDIterList, int32_t numOfFileObj) {
|
int32_t doAdjustValidDataIters(SArray* pLDIterList, int32_t numOfFileObj) {
|
||||||
|
@ -1191,8 +1215,13 @@ int32_t tsdbGetRowsInSttFiles(STFileSet* pFileSet, SArray* pSttFileBlockIterArra
|
||||||
STsdbReader* pReader = pConf->pReader;
|
STsdbReader* pReader = pConf->pReader;
|
||||||
int32_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap);
|
int32_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap);
|
||||||
uint64_t* pUidList = pReader->status.uidList.tableUidList;
|
uint64_t* pUidList = pReader->status.uidList.tableUidList;
|
||||||
numOfRows += getNumOfRowsInSttBlock(pIter->pReader, pIter->pBlockLoadInfo, pStatisBlkArray, pConf->suid, pUidList,
|
int32_t n = 0;
|
||||||
numOfTables);
|
code = getNumOfRowsInSttBlock(pIter->pReader, pIter->pBlockLoadInfo, pStatisBlkArray, pConf->suid, pUidList,
|
||||||
|
numOfTables, &n);
|
||||||
|
numOfRows += n;
|
||||||
|
if (code) {
|
||||||
|
tsdbError("%s failed to get rows in stt blocks, code:%s", pstr, tstrerror(code));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -343,7 +343,7 @@ int32_t loadDataFileTombDataForAll(STsdbReader* pReader);
|
||||||
int32_t loadSttTombDataForAll(STsdbReader* pReader, SSttFileReader* pSttFileReader, SSttBlockLoadInfo* pLoadInfo);
|
int32_t loadSttTombDataForAll(STsdbReader* pReader, SSttFileReader* pSttFileReader, SSttBlockLoadInfo* pLoadInfo);
|
||||||
int32_t getNumOfRowsInSttBlock(SSttFileReader* pSttFileReader, SSttBlockLoadInfo* pBlockLoadInfo,
|
int32_t getNumOfRowsInSttBlock(SSttFileReader* pSttFileReader, SSttBlockLoadInfo* pBlockLoadInfo,
|
||||||
TStatisBlkArray* pStatisBlkArray, uint64_t suid, const uint64_t* pUidList,
|
TStatisBlkArray* pStatisBlkArray, uint64_t suid, const uint64_t* pUidList,
|
||||||
int32_t numOfTables);
|
int32_t numOfTables, int32_t* pNumOfRows);
|
||||||
void recordToBlockInfo(SFileDataBlockInfo* pBlockInfo, SBrinRecord* record);
|
void recordToBlockInfo(SFileDataBlockInfo* pBlockInfo, SBrinRecord* record);
|
||||||
|
|
||||||
void destroyLDataIter(SLDataIter* pIter);
|
void destroyLDataIter(SLDataIter* pIter);
|
||||||
|
|
Loading…
Reference in New Issue