refactor: remove the invalid return value.

This commit is contained in:
Haojun Liao 2024-07-16 18:52:01 +08:00
parent 19ba5af5e7
commit 94f596c4ce
12 changed files with 344 additions and 188 deletions

View File

@ -17,13 +17,13 @@
#define _TD_COMMON_MSG_CB_H_ #define _TD_COMMON_MSG_CB_H_
#include "os.h" #include "os.h"
#include "tmsg.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
typedef struct SRpcMsg SRpcMsg; typedef struct SRpcMsg SRpcMsg;
typedef struct SEpSet SEpSet;
typedef struct SMgmtWrapper SMgmtWrapper; typedef struct SMgmtWrapper SMgmtWrapper;
typedef struct SRpcHandleInfo SRpcHandleInfo; typedef struct SRpcHandleInfo SRpcHandleInfo;
@ -46,7 +46,7 @@ typedef int32_t (*PutToQueueFp)(void* pMgmt, EQueueType qtype, SRpcMsg* pMsg);
typedef int32_t (*GetQueueSizeFp)(void* pMgmt, int32_t vgId, EQueueType qtype); typedef int32_t (*GetQueueSizeFp)(void* pMgmt, int32_t vgId, EQueueType qtype);
typedef int32_t (*SendReqFp)(const SEpSet* pEpSet, SRpcMsg* pMsg); typedef int32_t (*SendReqFp)(const SEpSet* pEpSet, SRpcMsg* pMsg);
typedef void (*SendRspFp)(SRpcMsg* pMsg); typedef void (*SendRspFp)(SRpcMsg* pMsg);
typedef void (*RegisterBrokenLinkArgFp)(SRpcMsg* pMsg); typedef void (*RegisterBrokenLinkArgFp)(struct SRpcMsg* pMsg);
typedef void (*ReleaseHandleFp)(SRpcHandleInfo* pHandle, int8_t type); typedef void (*ReleaseHandleFp)(SRpcHandleInfo* pHandle, int8_t type);
typedef void (*ReportStartup)(const char* name, const char* desc); typedef void (*ReportStartup)(const char* name, const char* desc);

View File

@ -105,7 +105,7 @@ typedef struct SMTbCursor {
} SMTbCursor; } SMTbCursor;
typedef struct SMCtbCursor { typedef struct SMCtbCursor {
SMeta* pMeta; struct SMeta* pMeta;
void* pCur; void* pCur;
tb_uid_t suid; tb_uid_t suid;
void* pKey; void* pKey;
@ -134,7 +134,7 @@ typedef struct SMetaTableInfo {
} SMetaTableInfo; } SMetaTableInfo;
typedef struct SSnapContext { typedef struct SSnapContext {
SMeta* pMeta; struct SMeta* pMeta;
int64_t snapVersion; int64_t snapVersion;
void* pCur; void* pCur;
int64_t suid; int64_t suid;
@ -178,7 +178,7 @@ typedef struct TsdReader {
int32_t (*tsdNextDataBlock)(); int32_t (*tsdNextDataBlock)();
int32_t (*tsdReaderRetrieveBlockSMAInfo)(); int32_t (*tsdReaderRetrieveBlockSMAInfo)();
SSDataBlock *(*tsdReaderRetrieveDataBlock)(); int32_t (*tsdReaderRetrieveDataBlock)();
void (*tsdReaderReleaseDataBlock)(); void (*tsdReaderReleaseDataBlock)();

View File

@ -70,7 +70,6 @@ typedef int64_t SyncIndex;
typedef int64_t SyncTerm; typedef int64_t SyncTerm;
typedef struct SSyncNode SSyncNode; typedef struct SSyncNode SSyncNode;
typedef struct SWal SWal;
typedef struct SSyncRaftEntry SSyncRaftEntry; typedef struct SSyncRaftEntry SSyncRaftEntry;
typedef enum { typedef enum {
@ -238,7 +237,7 @@ typedef struct SSyncInfo {
int32_t batchSize; int32_t batchSize;
SSyncCfg syncCfg; SSyncCfg syncCfg;
char path[TSDB_FILENAME_LEN]; char path[TSDB_FILENAME_LEN];
SWal* pWal; struct SWal* pWal;
SSyncFSM* pFsm; SSyncFSM* pFsm;
SMsgCb* msgcb; SMsgCb* msgcb;
int32_t pingMs; int32_t pingMs;

View File

@ -167,7 +167,7 @@ void tsdbReaderClose2(STsdbReader *pReader);
int32_t tsdbNextDataBlock2(STsdbReader *pReader, bool *hasNext); int32_t tsdbNextDataBlock2(STsdbReader *pReader, bool *hasNext);
int32_t tsdbRetrieveDatablockSMA2(STsdbReader *pReader, SSDataBlock *pDataBlock, bool *allHave, bool *hasNullSMA); int32_t tsdbRetrieveDatablockSMA2(STsdbReader *pReader, SSDataBlock *pDataBlock, bool *allHave, bool *hasNullSMA);
void tsdbReleaseDataBlock2(STsdbReader *pReader); void tsdbReleaseDataBlock2(STsdbReader *pReader);
SSDataBlock *tsdbRetrieveDataBlock2(STsdbReader *pTsdbReadHandle, SArray *pColumnIdList); int32_t tsdbRetrieveDataBlock2(STsdbReader *pReader, SSDataBlock **pBlock, SArray *pIdList);
int32_t tsdbReaderReset2(STsdbReader *pReader, SQueryTableDataCond *pCond); int32_t tsdbReaderReset2(STsdbReader *pReader, SQueryTableDataCond *pCond);
int32_t tsdbGetFileBlocksDistInfo2(STsdbReader *pReader, STableBlockDistInfo *pTableBlockInfo); int32_t tsdbGetFileBlocksDistInfo2(STsdbReader *pReader, STableBlockDistInfo *pTableBlockInfo);
int64_t tsdbGetNumOfRowsInMemTable2(STsdbReader *pHandle); int64_t tsdbGetNumOfRowsInMemTable2(STsdbReader *pHandle);

View File

@ -54,7 +54,6 @@ extern "C" {
#endif #endif
typedef struct SVnodeInfo SVnodeInfo; typedef struct SVnodeInfo SVnodeInfo;
typedef struct SMeta SMeta;
typedef struct SSma SSma; typedef struct SSma SSma;
typedef struct STsdb STsdb; typedef struct STsdb STsdb;
typedef struct STQ STQ; typedef struct STQ STQ;
@ -153,7 +152,6 @@ int32_t vnodeBufPoolRegisterQuery(SVBufPool* pPool, SQueryNode* pQNode);
void vnodeBufPoolDeregisterQuery(SVBufPool* pPool, SQueryNode* pQNode, bool proactive); void vnodeBufPoolDeregisterQuery(SVBufPool* pPool, SQueryNode* pQNode, bool proactive);
// meta // meta
typedef struct SMCtbCursor SMCtbCursor;
typedef struct SMStbCursor SMStbCursor; typedef struct SMStbCursor SMStbCursor;
typedef struct STbUidStore STbUidStore; typedef struct STbUidStore STbUidStore;

View File

@ -3212,7 +3212,10 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs
pIter->pMemDelData = NULL; pIter->pMemDelData = NULL;
loadMemTombData(&pIter->pMemDelData, pMem, pIMem, pr->info.verRange.maxVer); code = loadMemTombData(&pIter->pMemDelData, pMem, pIMem, pr->info.verRange.maxVer);
if (code != TSDB_CODE_SUCCESS) {
goto _err;
}
pIter->idx = (SBlockIdx){.suid = suid, .uid = uid}; pIter->idx = (SBlockIdx){.suid = suid, .uid = uid};

File diff suppressed because it is too large Load Diff

View File

@ -842,9 +842,12 @@ int32_t loadSttTombDataForAll(STsdbReader* pReader, SSttFileReader* pSttFileRead
return doLoadTombDataFromTombBlk(pBlkArray, pReader, pSttFileReader, false); return doLoadTombDataFromTombBlk(pBlkArray, pReader, pSttFileReader, false);
} }
void loadMemTombData(SArray** ppMemDelData, STbData* pMemTbData, STbData* piMemTbData, int64_t ver) { int32_t loadMemTombData(SArray** ppMemDelData, STbData* pMemTbData, STbData* piMemTbData, int64_t ver) {
if (*ppMemDelData == NULL) { if (*ppMemDelData == NULL) {
*ppMemDelData = taosArrayInit(4, sizeof(SDelData)); *ppMemDelData = taosArrayInit(4, sizeof(SDelData));
if (*ppMemDelData == NULL) {
return TSDB_CODE_SUCCESS;
}
} }
SArray* pMemDelData = *ppMemDelData; SArray* pMemDelData = *ppMemDelData;
@ -872,6 +875,8 @@ void loadMemTombData(SArray** ppMemDelData, STbData* pMemTbData, STbData* piMemT
p = p->pNext; p = p->pNext;
} }
} }
return TSDB_CODE_SUCCESS;
} }
int32_t getNumOfRowsInSttBlock(SSttFileReader* pSttFileReader, SSttBlockLoadInfo* pBlockLoadInfo, int32_t getNumOfRowsInSttBlock(SSttFileReader* pSttFileReader, SSttBlockLoadInfo* pBlockLoadInfo,

View File

@ -339,7 +339,7 @@ int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int3
bool blockIteratorNext(SDataBlockIter* pBlockIter, const char* idStr); bool blockIteratorNext(SDataBlockIter* pBlockIter, const char* idStr);
// load tomb data API (stt/mem only for one table each, tomb data from data files are load for all tables at one time) // load tomb data API (stt/mem only for one table each, tomb data from data files are load for all tables at one time)
void loadMemTombData(SArray** ppMemDelData, STbData* pMemTbData, STbData* piMemTbData, int64_t ver); int32_t loadMemTombData(SArray** ppMemDelData, STbData* pMemTbData, STbData* piMemTbData, int64_t ver);
int32_t loadDataFileTombDataForAll(STsdbReader* pReader); int32_t loadDataFileTombDataForAll(STsdbReader* pReader);
int32_t loadSttTombDataForAll(STsdbReader* pReader, SSttFileReader* pSttFileReader, SSttBlockLoadInfo* pLoadInfo); int32_t loadSttTombDataForAll(STsdbReader* pReader, SSttFileReader* pSttFileReader, SSttBlockLoadInfo* pLoadInfo);
int32_t getNumOfRowsInSttBlock(SSttFileReader* pSttFileReader, SSttBlockLoadInfo* pBlockLoadInfo, int32_t getNumOfRowsInSttBlock(SSttFileReader* pSttFileReader, SSttBlockLoadInfo* pBlockLoadInfo,

View File

@ -388,9 +388,10 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca
pCost->totalCheckedRows += pBlock->info.rows; pCost->totalCheckedRows += pBlock->info.rows;
pCost->loadBlocks += 1; pCost->loadBlocks += 1;
SSDataBlock* p = pAPI->tsdReader.tsdReaderRetrieveDataBlock(pTableScanInfo->dataReader, NULL); SSDataBlock* p = NULL;
if (p == NULL) { int32_t code = pAPI->tsdReader.tsdReaderRetrieveDataBlock(pTableScanInfo->dataReader, &p, NULL);
return terrno; if (p == NULL || code != TSDB_CODE_SUCCESS) {
return code;
} }
ASSERT(p == pBlock); ASSERT(p == pBlock);
@ -1351,7 +1352,12 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU
} }
if (hasNext) { if (hasNext) {
/*SSDataBlock* p = */ pAPI->tsdReader.tsdReaderRetrieveDataBlock(pReader, NULL); SSDataBlock* p = NULL;
code = pAPI->tsdReader.tsdReaderRetrieveDataBlock(pReader, &p, NULL);
if (code != TSDB_CODE_SUCCESS) {
return NULL;
}
doSetTagColumnData(&pTableScanInfo->base, pBlock, pTaskInfo, pBlock->info.rows); doSetTagColumnData(&pTableScanInfo->base, pBlock, pTaskInfo, pBlock->info.rows);
pBlock->info.id.groupId = tableListGetTableGroupId(pTableScanInfo->base.pTableListInfo, pBlock->info.id.uid); pBlock->info.id.groupId = tableListGetTableGroupId(pTableScanInfo->base.pTableListInfo, pBlock->info.id.uid);
} }
@ -2929,8 +2935,9 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
} }
SSDataBlock* pBlock = pAPI->tsdReader.tsdReaderRetrieveDataBlock(pInfo->dataReader, NULL); SSDataBlock* pBlock = NULL;
if (pBlock == NULL) { code = pAPI->tsdReader.tsdReaderRetrieveDataBlock(pInfo->dataReader, &pBlock, NULL);
if (pBlock == NULL || code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, terrno); T_LONG_JMP(pTaskInfo->env, terrno);
} }

View File

@ -74,16 +74,16 @@ typedef struct SRaftStore {
TdThreadMutex mutex; TdThreadMutex mutex;
} SRaftStore; } SRaftStore;
typedef struct SSyncHbTimerData { struct SSyncHbTimerData {
int64_t syncNodeRid; int64_t syncNodeRid;
SSyncTimer* pTimer; SSyncTimer* pTimer;
SRaftId destId; SRaftId destId;
uint64_t logicClock; uint64_t logicClock;
int64_t execTime; int64_t execTime;
int64_t rid; int64_t rid;
} SSyncHbTimerData; };
typedef struct SSyncTimer { struct SSyncTimer {
void* pTimer; void* pTimer;
TAOS_TMR_CALLBACK timerCb; TAOS_TMR_CALLBACK timerCb;
uint64_t logicClock; uint64_t logicClock;
@ -92,7 +92,7 @@ typedef struct SSyncTimer {
int64_t timeStamp; int64_t timeStamp;
SRaftId destId; SRaftId destId;
int64_t hbDataRid; int64_t hbDataRid;
} SSyncTimer; };
typedef struct SElectTimerParam { typedef struct SElectTimerParam {
uint64_t logicClock; uint64_t logicClock;
@ -106,7 +106,7 @@ typedef struct SPeerState {
int64_t lastSendTime; int64_t lastSendTime;
} SPeerState; } SPeerState;
typedef struct SSyncNode { struct SSyncNode {
// init by SSyncInfo // init by SSyncInfo
SyncGroupId vgId; SyncGroupId vgId;
SRaftCfg raftCfg; SRaftCfg raftCfg;
@ -116,7 +116,7 @@ typedef struct SSyncNode {
// sync io // sync io
SSyncLogBuffer* pLogBuf; SSyncLogBuffer* pLogBuf;
SWal* pWal; struct SWal* pWal;
const SMsgCb* msgcb; const SMsgCb* msgcb;
int32_t (*syncSendMSg)(const SEpSet* pEpSet, SRpcMsg* pMsg); int32_t (*syncSendMSg)(const SEpSet* pEpSet, SRpcMsg* pMsg);
int32_t (*syncEqMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg); int32_t (*syncEqMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg);
@ -234,7 +234,7 @@ typedef struct SSyncNode {
bool isStart; bool isStart;
} SSyncNode; };
// open/close -------------- // open/close --------------
SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion); SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion);

View File

@ -103,7 +103,7 @@ typedef void* queue[2];
#define TRANS_MAGIC_NUM 0x5f375a86 #define TRANS_MAGIC_NUM 0x5f375a86
#define TRANS_NOVALID_PACKET(src) ((src) != TRANS_MAGIC_NUM ? 1 : 0) #define TRANS_NOVALID_PACKET(src) ((src) != TRANS_MAGIC_NUM ? 1 : 0)
typedef SRpcMsg STransMsg; typedef struct SRpcMsg STransMsg;
typedef SRpcCtx STransCtx; typedef SRpcCtx STransCtx;
typedef SRpcCtxVal STransCtxVal; typedef SRpcCtxVal STransCtxVal;
typedef SRpcInfo STrans; typedef SRpcInfo STrans;