fix:fix coverity issue and remove unnecessary files.
This commit is contained in:
parent
d9f6cf3966
commit
9b70c0c457
|
@ -2327,65 +2327,5 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
|
|||
taosThreadMutexUnlock(&execNodeList.lock);
|
||||
|
||||
taosArrayDestroy(req.pTaskStatus);
|
||||
|
||||
// bool nodeChanged = false;
|
||||
// SArray* pList = taosArrayInit(4, sizeof(int32_t));
|
||||
/*
|
||||
// record the timeout node
|
||||
for(int32_t i = 0; i < taosArrayGetSize(execNodeList.pNodeEntryList); ++i) {
|
||||
SNodeEntry* pEntry = taosArrayGet(execNodeList.pNodeEntryList, i);
|
||||
int64_t duration = now - pEntry->hbTimestamp;
|
||||
if (duration > MND_STREAM_HB_INTERVAL) { // execNode timeout, try next
|
||||
taosArrayPush(pList, &pEntry);
|
||||
mWarn("nodeId:%d stream node timeout, since last hb:%"PRId64"s", pEntry->nodeId, duration);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (pEntry->nodeId != req.vgId) {
|
||||
continue;
|
||||
}
|
||||
|
||||
pEntry->hbTimestamp = now;
|
||||
|
||||
// check epset to identify whether the node has been transferred to other dnodes.
|
||||
// node the epset is changed, which means the node transfer has occurred for this node.
|
||||
// if (!isEpsetEqual(&pEntry->epset, &req.epset)) {
|
||||
// nodeChanged = true;
|
||||
// break;
|
||||
// }
|
||||
}
|
||||
|
||||
// todo handle the node timeout case. Once the vnode is off-line, we should check the dnode status from mnode,
|
||||
// to identify whether the dnode is truely offline or not.
|
||||
|
||||
// handle the node changed case
|
||||
if (!nodeChanged) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t nodeId = req.vgId;
|
||||
|
||||
{// check all streams that involved this vnode should update the epset info
|
||||
SStreamObj *pStream = NULL;
|
||||
void *pIter = NULL;
|
||||
while (1) {
|
||||
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
|
||||
if (pIter == NULL) {
|
||||
break;
|
||||
}
|
||||
|
||||
// update the related upstream and downstream tasks, todo remove this, no need this function
|
||||
taosWLockLatch(&pStream->lock);
|
||||
// streamTaskUpdateEpInfo(pStream->tasks, req.vgId, &req.epset);
|
||||
// streamTaskUpdateEpInfo(pStream->pHTasksList, req.vgId, &req.epset);
|
||||
taosWUnLockLatch(&pStream->lock);
|
||||
|
||||
// code = createStreamUpdateTrans(pMnode, pStream, nodeId, );
|
||||
// if (code != TSDB_CODE_SUCCESS) {
|
||||
// todo
|
||||
//// }
|
||||
// }
|
||||
}
|
||||
*/
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
|
|
@ -186,7 +186,7 @@ int64_t tsdbGetNumOfRowsInMemTable2(STsdbReader *pHandle);
|
|||
void *tsdbGetIdx2(SMeta *pMeta);
|
||||
void *tsdbGetIvtIdx2(SMeta *pMeta);
|
||||
uint64_t tsdbGetReaderMaxVersion2(STsdbReader *pReader);
|
||||
void tsdbReaderSetCloseFlag2(STsdbReader *pReader);
|
||||
void tsdbReaderSetCloseFlag(STsdbReader *pReader);
|
||||
int64_t tsdbGetLastTimestamp2(SVnode *pVnode, void *pTableList, int32_t numOfTables, const char *pIdStr);
|
||||
//======================================================================================================================
|
||||
|
||||
|
|
|
@ -302,12 +302,11 @@ int32_t tsdbDelFReaderClose(SDelFReader **ppReader);
|
|||
int32_t tsdbReadDelDatav1(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData, int64_t maxVer);
|
||||
int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData);
|
||||
int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx);
|
||||
// tsdbRead.c ==============================================================================================
|
||||
int32_t tsdbTakeReadSnap(STsdbReader *pReader, _query_reseek_func_t reseek, STsdbReadSnap **ppSnap);
|
||||
void tsdbUntakeReadSnap(STsdbReader *pReader, STsdbReadSnap *pSnap, bool proactive);
|
||||
|
||||
// tsdbRead.c ==============================================================================================
|
||||
int32_t tsdbTakeReadSnap2(STsdbReader *pReader, _query_reseek_func_t reseek, STsdbReadSnap **ppSnap);
|
||||
void tsdbUntakeReadSnap2(STsdbReader *pReader, STsdbReadSnap *pSnap, bool proactive);
|
||||
|
||||
// tsdbMerge.c ==============================================================================================
|
||||
int32_t tsdbMerge(void *arg);
|
||||
|
||||
|
@ -830,7 +829,6 @@ bool tMergeTreeNext(SMergeTree *pMTree);
|
|||
bool tMergeTreeIgnoreEarlierTs(SMergeTree *pMTree);
|
||||
void tMergeTreeClose(SMergeTree *pMTree);
|
||||
|
||||
SSttBlockLoadInfo *tCreateLastBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols, int32_t numOfStt);
|
||||
SSttBlockLoadInfo *tCreateOneLastBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols);
|
||||
void resetLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo);
|
||||
void getSttBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo, SSttBlockLoadCostInfo *pLoadCost);
|
||||
|
|
|
@ -22,38 +22,6 @@
|
|||
static void tLDataIterClose2(SLDataIter *pIter);
|
||||
|
||||
// SLDataIter =================================================
|
||||
SSttBlockLoadInfo *tCreateLastBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols,
|
||||
int32_t numOfSttTrigger) {
|
||||
SSttBlockLoadInfo *pLoadInfo = taosMemoryCalloc(numOfSttTrigger, sizeof(SSttBlockLoadInfo));
|
||||
if (pLoadInfo == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < numOfSttTrigger; ++i) {
|
||||
pLoadInfo[i].blockIndex[0] = -1;
|
||||
pLoadInfo[i].blockIndex[1] = -1;
|
||||
pLoadInfo[i].currentLoadBlockIndex = 1;
|
||||
|
||||
int32_t code = tBlockDataCreate(&pLoadInfo[i].blockData[0]);
|
||||
if (code) {
|
||||
terrno = code;
|
||||
}
|
||||
|
||||
code = tBlockDataCreate(&pLoadInfo[i].blockData[1]);
|
||||
if (code) {
|
||||
terrno = code;
|
||||
}
|
||||
|
||||
pLoadInfo[i].aSttBlk = taosArrayInit(4, sizeof(SSttBlk));
|
||||
pLoadInfo[i].pSchema = pSchema;
|
||||
pLoadInfo[i].colIds = colList;
|
||||
pLoadInfo[i].numOfCols = numOfCols;
|
||||
}
|
||||
|
||||
return pLoadInfo;
|
||||
}
|
||||
|
||||
SSttBlockLoadInfo *tCreateOneLastBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols) {
|
||||
SSttBlockLoadInfo *pLoadInfo = taosMemoryCalloc(1, sizeof(SSttBlockLoadInfo));
|
||||
if (pLoadInfo == NULL) {
|
||||
|
@ -83,25 +51,6 @@ SSttBlockLoadInfo *tCreateOneLastBlockLoadInfo(STSchema *pSchema, int16_t *colLi
|
|||
return pLoadInfo;
|
||||
}
|
||||
|
||||
void resetLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) {
|
||||
for (int32_t i = 0; i < 1; ++i) {
|
||||
pLoadInfo[i].currentLoadBlockIndex = 1;
|
||||
pLoadInfo[i].blockIndex[0] = -1;
|
||||
pLoadInfo[i].blockIndex[1] = -1;
|
||||
|
||||
taosArrayClear(pLoadInfo[i].aSttBlk);
|
||||
|
||||
pLoadInfo[i].cost.loadBlocks = 0;
|
||||
pLoadInfo[i].cost.blockElapsedTime = 0;
|
||||
pLoadInfo[i].cost.statisElapsedTime = 0;
|
||||
pLoadInfo[i].cost.loadStatisBlocks = 0;
|
||||
pLoadInfo[i].statisBlockIndex = -1;
|
||||
tStatisBlockDestroy(pLoadInfo[i].statisBlock);
|
||||
|
||||
pLoadInfo[i].sttBlockLoaded = false;
|
||||
}
|
||||
}
|
||||
|
||||
void getSttBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo, SSttBlockLoadCostInfo* pLoadCost) {
|
||||
for (int32_t i = 0; i < 1; ++i) {
|
||||
pLoadCost->blockElapsedTime += pLoadInfo[i].cost.blockElapsedTime;
|
||||
|
@ -309,12 +258,6 @@ static int32_t binarySearchForStartRowIndex(uint64_t *uidList, int32_t num, uint
|
|||
}
|
||||
}
|
||||
|
||||
int32_t tLDataIterOpen(struct SLDataIter *pIter, SDataFReader *pReader, int32_t iStt, int8_t backward, uint64_t suid,
|
||||
uint64_t uid, STimeWindow *pTimeWindow, SVersionRange *pRange, SSttBlockLoadInfo *pBlockLoadInfo,
|
||||
const char *idStr, bool strictTimeRange) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t extractSttBlockInfo(SLDataIter *pIter, const TSttBlkArray *pArray, SSttBlockLoadInfo *pBlockLoadInfo,
|
||||
uint64_t suid) {
|
||||
if (TARRAY2_SIZE(pArray) <= 0) {
|
||||
|
@ -767,50 +710,6 @@ static FORCE_INLINE int32_t tLDataIterDescCmprFn(const SRBTreeNode *p1, const SR
|
|||
return -1 * tLDataIterCmprFn(p1, p2);
|
||||
}
|
||||
|
||||
int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid,
|
||||
STimeWindow *pTimeWindow, SVersionRange *pVerRange, SSttBlockLoadInfo *pBlockLoadInfo,
|
||||
bool destroyLoadInfo, const char *idStr, bool strictTimeRange, SLDataIter *pLDataIter) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
pMTree->backward = backward;
|
||||
pMTree->pIter = NULL;
|
||||
pMTree->idStr = idStr;
|
||||
|
||||
if (!pMTree->backward) { // asc
|
||||
tRBTreeCreate(&pMTree->rbt, tLDataIterCmprFn);
|
||||
} else { // desc
|
||||
tRBTreeCreate(&pMTree->rbt, tLDataIterDescCmprFn);
|
||||
}
|
||||
|
||||
pMTree->pLoadInfo = pBlockLoadInfo;
|
||||
pMTree->destroyLoadInfo = destroyLoadInfo;
|
||||
pMTree->ignoreEarlierTs = false;
|
||||
|
||||
for (int32_t i = 0; i < pFReader->pSet->nSttF; ++i) { // open all last file
|
||||
memset(&pLDataIter[i], 0, sizeof(SLDataIter));
|
||||
code = tLDataIterOpen(&pLDataIter[i], pFReader, i, pMTree->backward, suid, uid, pTimeWindow, pVerRange,
|
||||
&pMTree->pLoadInfo[i], pMTree->idStr, strictTimeRange);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _end;
|
||||
}
|
||||
|
||||
bool hasVal = tLDataIterNextRow(&pLDataIter[i], pMTree->idStr);
|
||||
if (hasVal) {
|
||||
tMergeTreeAddIter(pMTree, &pLDataIter[i]);
|
||||
} else {
|
||||
if (!pMTree->ignoreEarlierTs) {
|
||||
pMTree->ignoreEarlierTs = pLDataIter[i].ignoreEarlierTs;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return code;
|
||||
|
||||
_end:
|
||||
tMergeTreeClose(pMTree);
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -2760,6 +2760,7 @@ static int32_t doSumFileBlockRows(STsdbReader* pReader, SDataFReader* pFileReade
|
|||
goto _end;
|
||||
}
|
||||
|
||||
#if 0
|
||||
int32_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap);
|
||||
|
||||
SArray* aBlockIdx = (SArray*)taosLRUCacheValue(pFileReader->pTsdb->biCache, handle);
|
||||
|
@ -2788,6 +2789,7 @@ static int32_t doSumFileBlockRows(STsdbReader* pReader, SDataFReader* pFileReade
|
|||
// pReader->rowsNum += block.nRow;
|
||||
// }
|
||||
}
|
||||
#endif
|
||||
|
||||
_end:
|
||||
tsdbBICacheRelease(pFileReader->pTsdb->biCache, handle);
|
||||
|
@ -4453,7 +4455,11 @@ static void doFillNullColSMA(SBlockLoadSuppInfo* pSup, int32_t numOfRows, int32_
|
|||
// do fill all null column value SMA info
|
||||
int32_t i = 0, j = 0;
|
||||
int32_t size = (int32_t)TARRAY2_SIZE(&pSup->colAggArray);
|
||||
TARRAY2_INSERT_PTR(&pSup->colAggArray, 0, pTsAgg);
|
||||
int32_t code = TARRAY2_INSERT_PTR(&pSup->colAggArray, 0, pTsAgg);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return;
|
||||
}
|
||||
|
||||
size++;
|
||||
|
||||
while (j < numOfCols && i < size) {
|
||||
|
@ -4466,7 +4472,11 @@ static void doFillNullColSMA(SBlockLoadSuppInfo* pSup, int32_t numOfRows, int32_
|
|||
} else if (pSup->colId[j] < pAgg->colId) {
|
||||
if (pSup->colId[j] != PRIMARYKEY_TIMESTAMP_COL_ID) {
|
||||
SColumnDataAgg nullColAgg = {.colId = pSup->colId[j], .numOfNull = numOfRows};
|
||||
TARRAY2_INSERT_PTR(&pSup->colAggArray, i, &nullColAgg);
|
||||
code = TARRAY2_INSERT_PTR(&pSup->colAggArray, i, &nullColAgg);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return;
|
||||
}
|
||||
|
||||
i += 1;
|
||||
size++;
|
||||
}
|
||||
|
@ -4477,7 +4487,11 @@ static void doFillNullColSMA(SBlockLoadSuppInfo* pSup, int32_t numOfRows, int32_
|
|||
while (j < numOfCols) {
|
||||
if (pSup->colId[j] != PRIMARYKEY_TIMESTAMP_COL_ID) {
|
||||
SColumnDataAgg nullColAgg = {.colId = pSup->colId[j], .numOfNull = numOfRows};
|
||||
TARRAY2_INSERT_PTR(&pSup->colAggArray, i, &nullColAgg);
|
||||
code = TARRAY2_INSERT_PTR(&pSup->colAggArray, i, &nullColAgg);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return;
|
||||
}
|
||||
|
||||
i += 1;
|
||||
}
|
||||
j++;
|
||||
|
@ -4835,7 +4849,7 @@ int64_t tsdbGetNumOfRowsInMemTable2(STsdbReader* pReader) {
|
|||
return rows;
|
||||
}
|
||||
|
||||
int32_t tsdbGetTableSchema2(void* pVnode, int64_t uid, STSchema** pSchema, int64_t* suid) {
|
||||
int32_t tsdbGetTableSchema(void* pVnode, int64_t uid, STSchema** pSchema, int64_t* suid) {
|
||||
SMetaReader mr = {0};
|
||||
metaReaderDoInit(&mr, ((SVnode*)pVnode)->pMeta, 0);
|
||||
int32_t code = metaReaderGetTableEntryByUidCache(&mr, uid);
|
||||
|
@ -4970,4 +4984,4 @@ void tsdbReaderSetId2(STsdbReader* pReader, const char* idstr) {
|
|||
pReader->status.fileIter.pLastBlockReader->mergeTree.idStr = pReader->idStr;
|
||||
}
|
||||
|
||||
void tsdbReaderSetCloseFlag2(STsdbReader* pReader) { pReader->code = TSDB_CODE_TSC_QUERY_CANCELLED; }
|
||||
void tsdbReaderSetCloseFlag(STsdbReader* pReader) { /*pReader->code = TSDB_CODE_TSC_QUERY_CANCELLED;*/ }
|
||||
|
|
|
@ -6997,8 +6997,8 @@ static int32_t createLastTsSelectStmt(char* pDb, char* pTable, STableMeta* pMeta
|
|||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
strcpy(col->tableAlias, pTable);
|
||||
strcpy(col->colName, pMeta->schema[0].name);
|
||||
tstrncpy(col->tableAlias, pTable, tListLen(col->tableAlias));
|
||||
tstrncpy(col->colName, pMeta->schema[0].name, tListLen(col->colName));
|
||||
SNodeList* pParamterList = nodesMakeList();
|
||||
if (NULL == pParamterList) {
|
||||
nodesDestroyNode((SNode*)col);
|
||||
|
|
Loading…
Reference in New Issue