diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index c09e96d16d..862bbee776 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -32,8 +32,8 @@ typedef struct SBlockOrderInfo { SColumnInfoData* pColData; } SBlockOrderInfo; -#define BLOCK_VERSION_1 1 -#define BLOCK_VERSION_2 2 +#define BLOCK_VERSION_1 1 +#define BLOCK_VERSION_2 2 #define NBIT (3u) #define BitPos(_n) ((_n) & ((1 << NBIT) - 1)) @@ -46,9 +46,9 @@ typedef struct SBlockOrderInfo { BMCharPos(bm_, r_) |= (1u << (7u - BitPos(r_))); \ } while (0) -#define colDataSetNull_f_s(c_, r_) \ - do { \ - colDataSetNull_f((c_)->nullbitmap, r_); \ +#define colDataSetNull_f_s(c_, r_) \ + do { \ + colDataSetNull_f((c_)->nullbitmap, r_); \ (void)memset(((char*)(c_)->pData) + (c_)->info.bytes * (r_), 0, (c_)->info.bytes); \ } while (0) @@ -143,7 +143,7 @@ static FORCE_INLINE void colDataSetNNULL(SColumnInfoData* pColumnInfoData, uint3 for (int32_t i = start; i < start + nRows; ++i) { colDataSetNull_f(pColumnInfoData->nullbitmap, i); } - memset(pColumnInfoData->pData + start * pColumnInfoData->info.bytes, 0, pColumnInfoData->info.bytes * nRows); + (void)memset(pColumnInfoData->pData + start * pColumnInfoData->info.bytes, 0, pColumnInfoData->info.bytes * nRows); } pColumnInfoData->hasNull = true; @@ -192,15 +192,17 @@ int32_t getJsonValueLen(const char* data); int32_t colDataSetVal(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, bool isNull); int32_t colDataReassignVal(SColumnInfoData* pColumnInfoData, uint32_t dstRowIdx, uint32_t srcRowIdx, const char* pData); -int32_t colDataSetNItems(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, uint32_t numOfRows, bool trimValue); -void colDataSetNItemsNull(SColumnInfoData* pColumnInfoData, uint32_t currentRow, uint32_t numOfRows); -int32_t colDataCopyNItems(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, - uint32_t numOfRows, bool isNull); +int32_t colDataSetNItems(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, uint32_t numOfRows, + bool trimValue); +void colDataSetNItemsNull(SColumnInfoData* pColumnInfoData, uint32_t currentRow, uint32_t numOfRows); +int32_t colDataCopyNItems(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, uint32_t numOfRows, + bool isNull); int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, int32_t* capacity, const SColumnInfoData* pSource, int32_t numOfRow2); int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* pSource, int32_t numOfRows, const SDataBlockInfo* pBlockInfo); -int32_t colDataAssignNRows(SColumnInfoData* pDst, int32_t dstIdx, const SColumnInfoData* pSrc, int32_t srcIdx, int32_t numOfRows); +int32_t colDataAssignNRows(SColumnInfoData* pDst, int32_t dstIdx, const SColumnInfoData* pSrc, int32_t srcIdx, + int32_t numOfRows); int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock, int32_t tsColumnIndex); int32_t blockDataUpdatePkRange(SSDataBlock* pDataBlock, int32_t pkColumnIndex, bool asc); @@ -214,7 +216,7 @@ size_t blockDataGetNumOfRows(const SSDataBlock* pBlock); int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc); int32_t blockDataMergeNRows(SSDataBlock* pDest, const SSDataBlock* pSrc, int32_t srcIdx, int32_t numOfRows); -void blockDataShrinkNRows(SSDataBlock* pBlock, int32_t numOfRows); +void blockDataShrinkNRows(SSDataBlock* pBlock, int32_t numOfRows); int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startIndex, int32_t* stopIndex, int32_t pageSize); int32_t blockDataToBuf(char* buf, const SSDataBlock* pBlock); @@ -237,12 +239,12 @@ int32_t blockDataGetSortedRows(SSDataBlock* pDataBlock, SArray* pOrderInfo); int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, uint32_t numOfRows, bool clearPayload); int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows); -void colInfoDataCleanup(SColumnInfoData* pColumn, uint32_t numOfRows); -void blockDataCleanup(SSDataBlock* pDataBlock); -void blockDataReset(SSDataBlock* pDataBlock); -void blockDataEmpty(SSDataBlock* pDataBlock); +void colInfoDataCleanup(SColumnInfoData* pColumn, uint32_t numOfRows); +void blockDataCleanup(SSDataBlock* pDataBlock); +void blockDataReset(SSDataBlock* pDataBlock); +void blockDataEmpty(SSDataBlock* pDataBlock); int32_t doEnsureCapacity(SColumnInfoData* pColumn, const SDataBlockInfo* pBlockInfo, uint32_t numOfRows, - bool clearPayload); + bool clearPayload); size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize, int32_t extraSize); @@ -264,20 +266,20 @@ int32_t blockDataAppendColInfo(SSDataBlock* pBlock, SColumnInfoData* pColIn SColumnInfoData createColumnInfoData(int16_t type, int32_t bytes, int16_t colId); SColumnInfoData* bdGetColumnInfoData(const SSDataBlock* pBlock, int32_t index); -int32_t blockGetEncodeSize(const SSDataBlock* pBlock); -int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols); +int32_t blockGetEncodeSize(const SSDataBlock* pBlock); +int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols); const char* blockDecode(SSDataBlock* pBlock, const char* pData); // for debug char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** dumpBuf, const char* taskIdStr); -int32_t buildSubmitReqFromDataBlock(SSubmitReq2** pReq, const SSDataBlock* pDataBlocks, const STSchema* pTSchema, int64_t uid, int32_t vgId, - tb_uid_t suid); +int32_t buildSubmitReqFromDataBlock(SSubmitReq2** pReq, const SSDataBlock* pDataBlocks, const STSchema* pTSchema, + int64_t uid, int32_t vgId, tb_uid_t suid); -bool alreadyAddGroupId(char* ctbName, int64_t groupId); -bool isAutoTableName(char* ctbName); -void buildCtbNameAddGroupId(const char* stbName, char* ctbName, uint64_t groupId); -char* buildCtbNameByGroupId(const char* stbName, uint64_t groupId); +bool alreadyAddGroupId(char* ctbName, int64_t groupId); +bool isAutoTableName(char* ctbName); +void buildCtbNameAddGroupId(const char* stbName, char* ctbName, uint64_t groupId); +char* buildCtbNameByGroupId(const char* stbName, uint64_t groupId); int32_t buildCtbNameByGroupIdImpl(const char* stbName, uint64_t groupId, char* pBuf); void trimDataBlock(SSDataBlock* pBlock, int32_t totalRows, const bool* pBoolList); diff --git a/include/common/tdataformat.h b/include/common/tdataformat.h index 04e13fbdb3..0acb28fabd 100644 --- a/include/common/tdataformat.h +++ b/include/common/tdataformat.h @@ -315,7 +315,7 @@ struct STag { do { \ VarDataLenT __len = (VarDataLenT)strlen(str); \ *(VarDataLenT *)(x) = __len; \ - memcpy(varDataVal(x), (str), __len); \ + (void)memcpy(varDataVal(x), (str), __len); \ } while (0); #define STR_WITH_MAXSIZE_TO_VARSTR(x, str, _maxs) \ @@ -324,10 +324,10 @@ struct STag { varDataSetLen(x, (_e - (x)-VARSTR_HEADER_SIZE)); \ } while (0) -#define STR_WITH_SIZE_TO_VARSTR(x, str, _size) \ - do { \ - *(VarDataLenT *)(x) = (VarDataLenT)(_size); \ - memcpy(varDataVal(x), (str), (_size)); \ +#define STR_WITH_SIZE_TO_VARSTR(x, str, _size) \ + do { \ + *(VarDataLenT *)(x) = (VarDataLenT)(_size); \ + (void)memcpy(varDataVal(x), (str), (_size)); \ } while (0); // STSchema ================================ diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 0629e8ee37..878543faa6 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -691,7 +691,7 @@ static FORCE_INLINE SColCmprWrapper* tCloneSColCmprWrapper(const SColCmprWrapper int32_t size = sizeof(SColCmpr) * pDstWrapper->nCols; pDstWrapper->pColCmpr = (SColCmpr*)taosMemoryCalloc(1, size); - memcpy(pDstWrapper->pColCmpr, pSrcWrapper->pColCmpr, size); + (void)memcpy(pDstWrapper->pColCmpr, pSrcWrapper->pColCmpr, size); return pDstWrapper; } @@ -732,7 +732,7 @@ static FORCE_INLINE SSchemaWrapper* tCloneSSchemaWrapper(const SSchemaWrapper* p return NULL; } - memcpy(pSW->pSchema, pSchemaWrapper->pSchema, pSW->nCols * sizeof(SSchema)); + (void)memcpy(pSW->pSchema, pSchemaWrapper->pSchema, pSW->nCols * sizeof(SSchema)); return pSW; } @@ -2837,13 +2837,13 @@ static FORCE_INLINE int32_t tDeserializeSCMSubscribeReq(void* buf, SCMSubscribeR buf = taosDecodeFixedI32(buf, &topicNum); pReq->topicNames = taosArrayInit(topicNum, sizeof(void*)); - if (pReq->topicNames == NULL){ + if (pReq->topicNames == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } for (int32_t i = 0; i < topicNum; i++) { char* name = NULL; buf = taosDecodeString(buf, &name); - if (taosArrayPush(pReq->topicNames, &name) == NULL){ + if (taosArrayPush(pReq->topicNames, &name) == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } } diff --git a/include/common/ttszip.h b/include/common/ttszip.h deleted file mode 100644 index 8eb99bd45e..0000000000 --- a/include/common/ttszip.h +++ /dev/null @@ -1,147 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef _TD_COMMON_TTSZIP_H_ -#define _TD_COMMON_TTSZIP_H_ - -#include "os.h" -#include "tdef.h" -#include "tvariant.h" - -#ifdef __cplusplus -extern "C" { -#endif - -#define MEM_BUF_SIZE (1 << 20) -#define TS_COMP_FILE_MAGIC 0x87F5EC4C -#define TS_COMP_FILE_GROUP_MAX 512 - -typedef struct STSList { - char* rawBuf; - int32_t allocSize; - int32_t threshold; - int32_t len; -} STSList; - -typedef struct STSElem { - TSKEY ts; - SVariant* tag; - int32_t id; -} STSElem; - -typedef struct STSCursor { - int32_t vgroupIndex; - int32_t blockIndex; - int32_t tsIndex; - uint32_t order; -} STSCursor; - -typedef struct STSBlock { - SVariant tag; // tag value - int32_t numOfElem; // number of elements - int32_t compLen; // size after compressed - int32_t padding; // 0xFFFFFFFF by default, after the payload - char* payload; // actual data that is compressed -} STSBlock; - -/* - * The size of buffer file should not be greater than 2G, - * and the offset of int32_t type is enough - */ -typedef struct STSGroupBlockInfo { - int32_t id; // group id - int32_t offset; // offset set value in file - int32_t numOfBlocks; // number of total blocks - int32_t compLen; // compressed size -} STSGroupBlockInfo; - -typedef struct STSGroupBlockInfoEx { - STSGroupBlockInfo info; - int32_t len; // length before compress -} STSGroupBlockInfoEx; - -typedef struct STSBuf { - TdFilePtr pFile; - char path[PATH_MAX]; - uint32_t fileSize; - - // todo use array - STSGroupBlockInfoEx* pData; - uint32_t numOfAlloc; - uint32_t numOfGroups; - - char* assistBuf; - int32_t bufSize; - STSBlock block; - STSList tsData; // uncompressed raw ts data - uint64_t numOfTotal; - bool autoDelete; - bool remainOpen; - int32_t tsOrder; // order of timestamp in ts comp buffer - STSCursor cur; -} STSBuf; - -typedef struct STSBufFileHeader { - uint32_t magic; // file magic number - uint32_t numOfGroup; // number of group stored in current file - int32_t tsOrder; // timestamp order in current file -} STSBufFileHeader; - -STSBuf* tsBufCreate(bool autoDelete, int32_t order); -STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete); -STSBuf* tsBufCreateFromCompBlocks(const char* pData, int32_t numOfBlocks, int32_t len, int32_t tsOrder, int32_t id); - -void* tsBufDestroy(STSBuf* pTSBuf); - -void tsBufAppend(STSBuf* pTSBuf, int32_t id, SVariant* tag, const char* pData, int32_t len); -int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf); - -STSBuf* tsBufClone(STSBuf* pTSBuf); - -STSGroupBlockInfo* tsBufGetGroupBlockInfo(STSBuf* pTSBuf, int32_t id); - -void tsBufFlush(STSBuf* pTSBuf); -void tsBufResetPos(STSBuf* pTSBuf); -bool tsBufNextPos(STSBuf* pTSBuf); - -STSElem tsBufGetElem(STSBuf* pTSBuf); -STSElem tsBufGetElemStartPos(STSBuf* pTSBuf, int32_t id, SVariant* tag); - -STSCursor tsBufGetCursor(STSBuf* pTSBuf); -void tsBufSetTraverseOrder(STSBuf* pTSBuf, int32_t order); - -void tsBufSetCursor(STSBuf* pTSBuf, STSCursor* pCur); - -/** - * display all data in comp block file, for debug purpose only - * @param pTSBuf - */ -void tsBufDisplay(STSBuf* pTSBuf); - -int32_t tsBufGetNumOfGroup(STSBuf* pTSBuf); - -void tsBufGetGroupIdList(STSBuf* pTSBuf, int32_t* num, int32_t** id); - -int32_t dumpFileBlockByGroupId(STSBuf* pTSBuf, int32_t id, void* buf, int32_t* len, int32_t* numOfBlocks); - -STSElem tsBufFindElemStartPosByTag(STSBuf* pTSBuf, SVariant* pTag); - -bool tsBufIsValidElem(STSElem* pElem); - -#ifdef __cplusplus -} -#endif - -#endif /*_TD_COMMON_TTSZIP_H_*/ diff --git a/include/common/ttypes.h b/include/common/ttypes.h index f10f419b6f..3934553b1c 100644 --- a/include/common/ttypes.h +++ b/include/common/ttypes.h @@ -46,7 +46,7 @@ typedef struct { #pragma pack(pop) #define varDataTLen(v) (sizeof(VarDataLenT) + varDataLen(v)) -#define varDataCopy(dst, v) memcpy((dst), (void *)(v), varDataTLen(v)) +#define varDataCopy(dst, v) (void)memcpy((dst), (void *)(v), varDataTLen(v)) #define varDataLenByData(v) (*(VarDataLenT *)(((char *)(v)) - VARSTR_HEADER_SIZE)) #define varDataSetLen(v, _len) (((VarDataLenT *)(v))[0] = (VarDataLenT)(_len)) diff --git a/include/libs/function/taosudf.h b/include/libs/function/taosudf.h index 935f9f9b03..79abbc4e68 100644 --- a/include/libs/function/taosudf.h +++ b/include/libs/function/taosudf.h @@ -105,12 +105,14 @@ typedef uint16_t VarDataLenT; // maxVarDataLen: 65535 #define varDataLen(v) ((VarDataLenT *)(v))[0] #define varDataVal(v) ((char *)(v) + VARSTR_HEADER_SIZE) #define varDataTLen(v) (sizeof(VarDataLenT) + varDataLen(v)) -#define varDataCopy(dst, v) memcpy((dst), (void *)(v), varDataTLen(v)) +#define varDataCopy(dst, v) (void)memcpy((dst), (void *)(v), varDataTLen(v)) #define varDataLenByData(v) (*(VarDataLenT *)(((char *)(v)) - VARSTR_HEADER_SIZE)) #define varDataSetLen(v, _len) (((VarDataLenT *)(v))[0] = (VarDataLenT)(_len)) -#define IS_VAR_DATA_TYPE(t) \ - (((t) == TSDB_DATA_TYPE_VARCHAR) || ((t) == TSDB_DATA_TYPE_VARBINARY) || ((t) == TSDB_DATA_TYPE_NCHAR) || ((t) == TSDB_DATA_TYPE_JSON) || ((t) == TSDB_DATA_TYPE_GEOMETRY)) -#define IS_STR_DATA_TYPE(t) (((t) == TSDB_DATA_TYPE_VARCHAR) || ((t) == TSDB_DATA_TYPE_VARBINARY) || ((t) == TSDB_DATA_TYPE_NCHAR)) +#define IS_VAR_DATA_TYPE(t) \ + (((t) == TSDB_DATA_TYPE_VARCHAR) || ((t) == TSDB_DATA_TYPE_VARBINARY) || ((t) == TSDB_DATA_TYPE_NCHAR) || \ + ((t) == TSDB_DATA_TYPE_JSON) || ((t) == TSDB_DATA_TYPE_GEOMETRY)) +#define IS_STR_DATA_TYPE(t) \ + (((t) == TSDB_DATA_TYPE_VARCHAR) || ((t) == TSDB_DATA_TYPE_VARBINARY) || ((t) == TSDB_DATA_TYPE_NCHAR)) static FORCE_INLINE char *udfColDataGetData(const SUdfColumn *pColumn, int32_t row) { if (IS_VAR_DATA_TYPE(pColumn->colMeta.type)) { @@ -158,7 +160,7 @@ static FORCE_INLINE int32_t udfColEnsureCapacity(SUdfColumn *pColumn, int32_t ne } data->varLenCol.varOffsets = (int32_t *)tmp; data->varLenCol.varOffsetsLen = sizeof(int32_t) * allocCapacity; - memset(&data->varLenCol.varOffsets[existedRows], 0, sizeof(int32_t) * (allocCapacity - existedRows)); + (void)memset(&data->varLenCol.varOffsets[existedRows], 0, sizeof(int32_t) * (allocCapacity - existedRows)); // for payload, add data in udfColDataAppend } else { char *tmp = (char *)realloc(data->fixLenCol.nullBitmap, BitmapLen(allocCapacity)); @@ -166,11 +168,11 @@ static FORCE_INLINE int32_t udfColEnsureCapacity(SUdfColumn *pColumn, int32_t ne return TSDB_CODE_OUT_OF_MEMORY; } uint32_t extend = BitmapLen(allocCapacity) - BitmapLen(data->rowsAlloc); - memset(tmp + BitmapLen(data->rowsAlloc), 0, extend); + (void)memset(tmp + BitmapLen(data->rowsAlloc), 0, extend); data->fixLenCol.nullBitmap = tmp; data->fixLenCol.nullBitmapLen = BitmapLen(allocCapacity); int32_t oldLen = BitmapLen(existedRows); - memset(&data->fixLenCol.nullBitmap[oldLen], 0, BitmapLen(allocCapacity) - oldLen); + (void)memset(&data->fixLenCol.nullBitmap[oldLen], 0, BitmapLen(allocCapacity) - oldLen); if (meta->type == TSDB_DATA_TYPE_NULL) { return TSDB_CODE_SUCCESS; @@ -198,7 +200,8 @@ static FORCE_INLINE void udfColDataSetNull(SUdfColumn *pColumn, int32_t row) { udfColDataSetNull_f(pColumn, row); } pColumn->hasNull = true; - pColumn->colData.numOfRows = ((int32_t)(row + 1) > pColumn->colData.numOfRows) ? (int32_t)(row + 1) : pColumn->colData.numOfRows; + pColumn->colData.numOfRows = + ((int32_t)(row + 1) > pColumn->colData.numOfRows) ? (int32_t)(row + 1) : pColumn->colData.numOfRows; } static FORCE_INLINE int32_t udfColDataSet(SUdfColumn *pColumn, uint32_t currentRow, const char *pData, bool isNull) { @@ -211,7 +214,7 @@ static FORCE_INLINE int32_t udfColDataSet(SUdfColumn *pColumn, uint32_t currentR } else { if (!isVarCol) { udfColDataSetNotNull_f(pColumn, currentRow); - memcpy(data->fixLenCol.data + meta->bytes * currentRow, pData, meta->bytes); + (void)memcpy(data->fixLenCol.data + meta->bytes * currentRow, pData, meta->bytes); } else { int32_t dataLen = varDataTLen(pData); if (meta->type == TSDB_DATA_TYPE_JSON) { @@ -249,7 +252,7 @@ static FORCE_INLINE int32_t udfColDataSet(SUdfColumn *pColumn, uint32_t currentR uint32_t len = data->varLenCol.payloadLen; data->varLenCol.varOffsets[currentRow] = len; - memcpy(data->varLenCol.payload + len, pData, dataLen); + (void)memcpy(data->varLenCol.payload + len, pData, dataLen); data->varLenCol.payloadLen += dataLen; } } @@ -278,8 +281,8 @@ typedef enum EUdfFuncType { UDF_FUNC_TYPE_SCALAR = 1, UDF_FUNC_TYPE_AGG = 2 } EU typedef struct SScriptUdfInfo { const char *name; - int32_t version; - int64_t createdTime; + int32_t version; + int64_t createdTime; EUdfFuncType funcType; int8_t scriptType; diff --git a/include/util/tarray2.h b/include/util/tarray2.h index 0d1ceded6c..ce24a1d2ce 100644 --- a/include/util/tarray2.h +++ b/include/util/tarray2.h @@ -71,9 +71,9 @@ static FORCE_INLINE int32_t tarray2InsertBatch(void *arr, int32_t idx, const voi } if (ret == 0) { if (idx < a->size) { - memmove(a->data + (idx + numEle) * eleSize, a->data + idx * eleSize, (a->size - idx) * eleSize); + (void)memmove(a->data + (idx + numEle) * eleSize, a->data + idx * eleSize, (a->size - idx) * eleSize); } - memcpy(a->data + idx * eleSize, elePtr, numEle * eleSize); + (void)memcpy(a->data + idx * eleSize, elePtr, numEle * eleSize); a->size += numEle; } return ret; @@ -145,18 +145,18 @@ static FORCE_INLINE int32_t tarray2SortInsert(void *arr, const void *elePtr, int #define TARRAY2_SORT_INSERT(a, e, cmp) tarray2SortInsert(a, &(e), sizeof(((a)->data[0])), (__compar_fn_t)cmp) #define TARRAY2_SORT_INSERT_P(a, ep, cmp) tarray2SortInsert(a, ep, sizeof(((a)->data[0])), (__compar_fn_t)cmp) -#define TARRAY2_REMOVE(a, idx, cb) \ - do { \ - if ((idx) < (a)->size) { \ - if (cb) { \ - TArray2Cb cb_ = (TArray2Cb)(cb); \ - cb_((a)->data + (idx)); \ - } \ - if ((idx) < (a)->size - 1) { \ - memmove((a)->data + (idx), (a)->data + (idx) + 1, sizeof((*(a)->data)) * ((a)->size - (idx)-1)); \ - } \ - (a)->size--; \ - } \ +#define TARRAY2_REMOVE(a, idx, cb) \ + do { \ + if ((idx) < (a)->size) { \ + if (cb) { \ + TArray2Cb cb_ = (TArray2Cb)(cb); \ + cb_((a)->data + (idx)); \ + } \ + if ((idx) < (a)->size - 1) { \ + (void)memmove((a)->data + (idx), (a)->data + (idx) + 1, sizeof((*(a)->data)) * ((a)->size - (idx)-1)); \ + } \ + (a)->size--; \ + } \ } while (0) #define TARRAY2_FOREACH(a, e) for (int32_t __i = 0; __i < (a)->size && ((e) = (a)->data[__i], 1); __i++) diff --git a/include/util/tcoding.h b/include/util/tcoding.h index b1bd09e123..1040adf431 100644 --- a/include/util/tcoding.h +++ b/include/util/tcoding.h @@ -79,7 +79,7 @@ static FORCE_INLINE void *taosDecodeFixedBool(const void *buf, bool *value) { static FORCE_INLINE int32_t taosEncodeFixedU16(void **buf, uint16_t value) { if (buf != NULL) { if (IS_LITTLE_ENDIAN()) { - memcpy(*buf, &value, sizeof(value)); + TAOS_MEMCPY(*buf, &value, sizeof(value)); } else { ((uint8_t *)(*buf))[0] = value & 0xff; ((uint8_t *)(*buf))[1] = (value >> 8) & 0xff; @@ -92,7 +92,7 @@ static FORCE_INLINE int32_t taosEncodeFixedU16(void **buf, uint16_t value) { static FORCE_INLINE void *taosDecodeFixedU16(const void *buf, uint16_t *value) { if (IS_LITTLE_ENDIAN()) { - memcpy(value, buf, sizeof(*value)); + TAOS_MEMCPY(value, buf, sizeof(*value)); } else { ((uint8_t *)value)[1] = ((uint8_t *)buf)[0]; ((uint8_t *)value)[0] = ((uint8_t *)buf)[1]; @@ -117,7 +117,7 @@ static FORCE_INLINE void *taosDecodeFixedI16(const void *buf, int16_t *value) { static FORCE_INLINE int32_t taosEncodeFixedU32(void **buf, uint32_t value) { if (buf != NULL) { if (IS_LITTLE_ENDIAN()) { - memcpy(*buf, &value, sizeof(value)); + TAOS_MEMCPY(*buf, &value, sizeof(value)); } else { ((uint8_t *)(*buf))[0] = value & 0xff; ((uint8_t *)(*buf))[1] = (value >> 8) & 0xff; @@ -132,7 +132,7 @@ static FORCE_INLINE int32_t taosEncodeFixedU32(void **buf, uint32_t value) { static FORCE_INLINE void *taosDecodeFixedU32(const void *buf, uint32_t *value) { if (IS_LITTLE_ENDIAN()) { - memcpy(value, buf, sizeof(*value)); + TAOS_MEMCPY(value, buf, sizeof(*value)); } else { ((uint8_t *)value)[3] = ((uint8_t *)buf)[0]; ((uint8_t *)value)[2] = ((uint8_t *)buf)[1]; @@ -159,7 +159,7 @@ static FORCE_INLINE void *taosDecodeFixedI32(const void *buf, int32_t *value) { static FORCE_INLINE int32_t taosEncodeFixedU64(void **buf, uint64_t value) { if (buf != NULL) { if (IS_LITTLE_ENDIAN()) { - memcpy(*buf, &value, sizeof(value)); + TAOS_MEMCPY(*buf, &value, sizeof(value)); } else { ((uint8_t *)(*buf))[0] = value & 0xff; ((uint8_t *)(*buf))[1] = (value >> 8) & 0xff; @@ -179,7 +179,7 @@ static FORCE_INLINE int32_t taosEncodeFixedU64(void **buf, uint64_t value) { static FORCE_INLINE void *taosDecodeFixedU64(const void *buf, uint64_t *value) { if (IS_LITTLE_ENDIAN()) { - memcpy(value, buf, sizeof(*value)); + TAOS_MEMCPY(value, buf, sizeof(*value)); } else { ((uint8_t *)value)[7] = ((uint8_t *)buf)[0]; ((uint8_t *)value)[6] = ((uint8_t *)buf)[1]; @@ -357,7 +357,7 @@ static FORCE_INLINE int32_t taosEncodeString(void **buf, const char *value) { tlen += taosEncodeVariantU64(buf, size); if (buf != NULL) { - memcpy(*buf, value, size); + TAOS_MEMCPY(*buf, value, size); *buf = POINTER_SHIFT(*buf, size); } tlen += (int32_t)size; @@ -372,7 +372,7 @@ static FORCE_INLINE void *taosDecodeString(const void *buf, char **value) { *value = (char *)taosMemoryMalloc((size_t)size + 1); if (*value == NULL) return NULL; - memcpy(*value, buf, (size_t)size); + TAOS_MEMCPY(*value, buf, (size_t)size); (*value)[size] = '\0'; @@ -383,7 +383,7 @@ static FORCE_INLINE void *taosDecodeStringTo(const void *buf, char *value) { uint64_t size = 0; buf = taosDecodeVariantU64(buf, &size); - memcpy(value, buf, (size_t)size); + TAOS_MEMCPY(value, buf, (size_t)size); value[size] = '\0'; @@ -395,7 +395,7 @@ static FORCE_INLINE int32_t taosEncodeBinary(void **buf, const void *value, int3 int32_t tlen = 0; if (buf != NULL) { - memcpy(*buf, value, valueLen); + TAOS_MEMCPY(*buf, value, valueLen); *buf = POINTER_SHIFT(*buf, valueLen); } tlen += (int32_t)valueLen; @@ -406,13 +406,13 @@ static FORCE_INLINE int32_t taosEncodeBinary(void **buf, const void *value, int3 static FORCE_INLINE void *taosDecodeBinary(const void *buf, void **value, int32_t valueLen) { *value = taosMemoryMalloc((size_t)valueLen); if (*value == NULL) return NULL; - memcpy(*value, buf, (size_t)valueLen); + TAOS_MEMCPY(*value, buf, (size_t)valueLen); return POINTER_SHIFT(buf, valueLen); } static FORCE_INLINE void *taosDecodeBinaryTo(const void *buf, void *value, int32_t valueLen) { - memcpy(value, buf, (size_t)valueLen); + TAOS_MEMCPY(value, buf, (size_t)valueLen); return POINTER_SHIFT(buf, valueLen); } diff --git a/include/util/tdef.h b/include/util/tdef.h index 53bb8a493c..c9677845ac 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -131,10 +131,10 @@ static const int64_t TICK_PER_SECOND[] = { : ((precision) == TSDB_TIME_PRECISION_MICRO ? 1000000LL : 1000000000LL))) #define T_MEMBER_SIZE(type, member) sizeof(((type *)0)->member) -#define T_APPEND_MEMBER(dst, ptr, type, member) \ - do { \ - memcpy((void *)(dst), (void *)(&((ptr)->member)), T_MEMBER_SIZE(type, member)); \ - dst = (void *)((char *)(dst) + T_MEMBER_SIZE(type, member)); \ +#define T_APPEND_MEMBER(dst, ptr, type, member) \ + do { \ + (void)memcpy((void *)(dst), (void *)(&((ptr)->member)), T_MEMBER_SIZE(type, member)); \ + dst = (void *)((char *)(dst) + T_MEMBER_SIZE(type, member)); \ } while (0) #define T_READ_MEMBER(src, type, target) \ do { \ @@ -556,7 +556,7 @@ typedef struct { char name[TSDB_LOG_VAR_LEN]; } SLogVar; -#define TMQ_SEPARATOR ":" +#define TMQ_SEPARATOR ":" #define TMQ_SEPARATOR_CHAR ':' enum { diff --git a/include/util/tencode.h b/include/util/tencode.h index 9b9f8af1b6..b66e79fa60 100644 --- a/include/util/tencode.h +++ b/include/util/tencode.h @@ -127,7 +127,7 @@ static FORCE_INLINE int32_t tEncodeFixed(SEncoder* pCoder, const void* val, uint if (pCoder->pos + size > pCoder->size) { TAOS_RETURN(TSDB_CODE_OUT_OF_RANGE); } - memcpy(pCoder->data + pCoder->pos, val, size); + TAOS_MEMCPY(pCoder->data + pCoder->pos, val, size); } pCoder->pos += size; @@ -212,7 +212,7 @@ static FORCE_INLINE int32_t tEncodeBinary(SEncoder* pCoder, const uint8_t* val, if (pCoder->pos + len > pCoder->size) { TAOS_RETURN(TSDB_CODE_OUT_OF_RANGE); } - memcpy(pCoder->data + pCoder->pos, val, len); + TAOS_MEMCPY(pCoder->data + pCoder->pos, val, len); } pCoder->pos += len; @@ -233,7 +233,7 @@ static int32_t tDecodeFixed(SDecoder* pCoder, void* val, uint32_t size) { if (pCoder->pos + size > pCoder->size) { TAOS_RETURN(TSDB_CODE_OUT_OF_RANGE); } else if (val) { - memcpy(val, pCoder->data + pCoder->pos, size); + TAOS_MEMCPY(val, pCoder->data + pCoder->pos, size); } pCoder->pos += size; return 0; @@ -427,7 +427,7 @@ static int32_t tDecodeCStrTo(SDecoder* pCoder, char* val) { uint32_t len; TAOS_CHECK_RETURN(tDecodeCStrAndLen(pCoder, &pStr, &len)); - memcpy(val, pStr, len + 1); + TAOS_MEMCPY(val, pStr, len + 1); return 0; } @@ -446,7 +446,7 @@ static FORCE_INLINE int32_t tDecodeBinaryAlloc(SDecoder* pCoder, void** val, uin TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); } - memcpy(*val, pCoder->data + pCoder->pos, length); + TAOS_MEMCPY(*val, pCoder->data + pCoder->pos, length); pCoder->pos += length; } else { @@ -468,7 +468,7 @@ static FORCE_INLINE int32_t tDecodeBinaryAlloc32(SDecoder* pCoder, void** val, u if (*val == NULL) { TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); } - memcpy(*val, pCoder->data + pCoder->pos, length); + TAOS_MEMCPY(*val, pCoder->data + pCoder->pos, length); pCoder->pos += length; } else { @@ -757,7 +757,7 @@ static FORCE_INLINE int32_t tPutBinary(uint8_t* p, uint8_t* pData, uint32_t nDat int n = 0; n += tPutU32v(p ? p + n : p, nData); - if (p) memcpy(p + n, pData, nData); + if (p) TAOS_MEMCPY(p + n, pData, nData); n += nData; return n; diff --git a/include/util/tutil.h b/include/util/tutil.h index a41f7d5860..305af76dea 100644 --- a/include/util/tutil.h +++ b/include/util/tutil.h @@ -64,7 +64,7 @@ static FORCE_INLINE void taosEncryptPass(uint8_t *inBuf, size_t inLen, char *tar tMD5Init(&context); tMD5Update(&context, inBuf, (uint32_t)inLen); tMD5Final(&context); - memcpy(target, context.digest, tListLen(context.digest)); + (void)memcpy(target, context.digest, tListLen(context.digest)); } static FORCE_INLINE void taosEncryptPass_c(uint8_t *inBuf, size_t len, char *target) { @@ -79,7 +79,7 @@ static FORCE_INLINE void taosEncryptPass_c(uint8_t *inBuf, size_t len, char *tar context.digest[2], context.digest[3], context.digest[4], context.digest[5], context.digest[6], context.digest[7], context.digest[8], context.digest[9], context.digest[10], context.digest[11], context.digest[12], context.digest[13], context.digest[14], context.digest[15]); - memcpy(target, buf, TSDB_PASSWORD_LEN); + (void)memcpy(target, buf, TSDB_PASSWORD_LEN); } static FORCE_INLINE int32_t taosCreateMD5Hash(char *pBuf, int32_t len) { diff --git a/include/util/types.h b/include/util/types.h index 0aa01a66f5..1797021b55 100644 --- a/include/util/types.h +++ b/include/util/types.h @@ -38,7 +38,7 @@ static FORCE_INLINE float taos_align_get_float(const char *pBuf) { assert(sizeof(float) == sizeof(uint32_t)); #endif float fv = 0; - memcpy(&fv, pBuf, sizeof(fv)); // in ARM, return *((const float*)(pBuf)) may cause problem + (void)memcpy(&fv, pBuf, sizeof(fv)); // in ARM, return *((const float*)(pBuf)) may cause problem return fv; } @@ -49,7 +49,7 @@ static FORCE_INLINE double taos_align_get_double(const char *pBuf) { assert(sizeof(double) == sizeof(uint64_t)); #endif double dv = 0; - memcpy(&dv, pBuf, sizeof(dv)); // in ARM, return *((const double*)(pBuf)) may cause problem + (void)memcpy(&dv, pBuf, sizeof(dv)); // in ARM, return *((const double*)(pBuf)) may cause problem return dv; } diff --git a/source/common/src/ttszip.c b/source/common/src/ttszip.c deleted file mode 100644 index 38659eea44..0000000000 --- a/source/common/src/ttszip.c +++ /dev/null @@ -1,1143 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#define _DEFAULT_SOURCE -#include "ttszip.h" -#include "taoserror.h" -#include "tcompression.h" -#include "tlog.h" - -static int32_t getDataStartOffset(); -static void TSBufUpdateGroupInfo(STSBuf* pTSBuf, int32_t index, STSGroupBlockInfo* pBlockInfo); -static STSBuf* allocResForTSBuf(STSBuf* pTSBuf); -static int32_t STSBufUpdateHeader(STSBuf* pTSBuf, STSBufFileHeader* pHeader); - -/** - * todo error handling - * support auto closeable tmp file - * @param path - * @return - */ -STSBuf* tsBufCreate(bool autoDelete, int32_t order) { - if (!osTempSpaceAvailable()) { - terrno = TSDB_CODE_NO_DISKSPACE; - // tscError("tmp file created failed since %s", terrstr()); - return NULL; - } - - STSBuf* pTSBuf = taosMemoryCalloc(1, sizeof(STSBuf)); - if (pTSBuf == NULL) { - return NULL; - } - - pTSBuf->autoDelete = autoDelete; - - taosGetTmpfilePath(tsTempDir, "join", pTSBuf->path); - // pTSBuf->pFile = fopen(pTSBuf->path, "wb+"); - pTSBuf->pFile = taosOpenFile(pTSBuf->path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC); - if (pTSBuf->pFile == NULL) { - taosMemoryFree(pTSBuf); - return NULL; - } - - if (!autoDelete) { - if (taosRemoveFile(pTSBuf->path) != 0) { - taosMemoryFree(pTSBuf); - return NULL; - } - } - - if (allocResForTSBuf(pTSBuf) == NULL) { - return NULL; - } - - // update the header info - STSBufFileHeader header = {.magic = TS_COMP_FILE_MAGIC, .numOfGroup = pTSBuf->numOfGroups, .tsOrder = TSDB_ORDER_ASC}; - STSBufUpdateHeader(pTSBuf, &header); - - tsBufResetPos(pTSBuf); - pTSBuf->cur.order = TSDB_ORDER_ASC; - - pTSBuf->tsOrder = order; - - return pTSBuf; -} - -STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete) { - STSBuf* pTSBuf = taosMemoryCalloc(1, sizeof(STSBuf)); - if (pTSBuf == NULL) { - return NULL; - } - - pTSBuf->autoDelete = autoDelete; - - tstrncpy(pTSBuf->path, path, sizeof(pTSBuf->path)); - - // pTSBuf->pFile = fopen(pTSBuf->path, "rb+"); - pTSBuf->pFile = taosOpenFile(pTSBuf->path, TD_FILE_WRITE | TD_FILE_READ); - if (pTSBuf->pFile == NULL) { - taosMemoryFree(pTSBuf); - return NULL; - } - - if (allocResForTSBuf(pTSBuf) == NULL) { - return NULL; - } - - // validate the file magic number - STSBufFileHeader header = {0}; - int32_t ret = taosLSeekFile(pTSBuf->pFile, 0, SEEK_SET); - UNUSED(ret); - size_t sz = taosReadFile(pTSBuf->pFile, &header, sizeof(STSBufFileHeader)); - UNUSED(sz); - - // invalid file - if (header.magic != TS_COMP_FILE_MAGIC) { - tsBufDestroy(pTSBuf); - return NULL; - } - - if (header.numOfGroup > pTSBuf->numOfAlloc) { - pTSBuf->numOfAlloc = header.numOfGroup; - STSGroupBlockInfoEx* tmp = taosMemoryRealloc(pTSBuf->pData, sizeof(STSGroupBlockInfoEx) * pTSBuf->numOfAlloc); - if (tmp == NULL) { - tsBufDestroy(pTSBuf); - return NULL; - } - - pTSBuf->pData = tmp; - } - - pTSBuf->numOfGroups = header.numOfGroup; - - // check the ts order - pTSBuf->tsOrder = header.tsOrder; - if (pTSBuf->tsOrder != TSDB_ORDER_ASC && pTSBuf->tsOrder != TSDB_ORDER_DESC) { - // tscError("invalid order info in buf:%d", pTSBuf->tsOrder); - tsBufDestroy(pTSBuf); - return NULL; - } - - size_t infoSize = sizeof(STSGroupBlockInfo) * pTSBuf->numOfGroups; - - STSGroupBlockInfo* buf = (STSGroupBlockInfo*)taosMemoryCalloc(1, infoSize); - if (buf == NULL) { - tsBufDestroy(pTSBuf); - return NULL; - } - - // int64_t pos = ftell(pTSBuf->pFile); //pos not used - sz = taosReadFile(pTSBuf->pFile, buf, infoSize); - UNUSED(sz); - - // the length value for each vnode is not kept in file, so does not set the length value - for (int32_t i = 0; i < pTSBuf->numOfGroups; ++i) { - STSGroupBlockInfoEx* pBlockList = &pTSBuf->pData[i]; - memcpy(&pBlockList->info, &buf[i], sizeof(STSGroupBlockInfo)); - } - taosMemoryFree(buf); - - ret = taosLSeekFile(pTSBuf->pFile, 0, SEEK_END); - UNUSED(ret); - - int64_t file_size; - if (taosFStatFile(pTSBuf->pFile, &file_size, NULL) != 0) { - tsBufDestroy(pTSBuf); - return NULL; - } - - pTSBuf->fileSize = (uint32_t)file_size; - tsBufResetPos(pTSBuf); - - // ascending by default - pTSBuf->cur.order = TSDB_ORDER_ASC; - - // tscDebug("create tsBuf from file:%s, fd:%d, size:%d, numOfGroups:%d, autoDelete:%d", pTSBuf->path, - // fileno(pTSBuf->pFile), - // pTSBuf->fileSize, pTSBuf->numOfGroups, pTSBuf->autoDelete); - - return pTSBuf; -} - -void* tsBufDestroy(STSBuf* pTSBuf) { - if (pTSBuf == NULL) { - return NULL; - } - - taosMemoryFreeClear(pTSBuf->assistBuf); - taosMemoryFreeClear(pTSBuf->tsData.rawBuf); - - taosMemoryFreeClear(pTSBuf->pData); - taosMemoryFreeClear(pTSBuf->block.payload); - - if (!pTSBuf->remainOpen) { - taosCloseFile(&pTSBuf->pFile); - } - - if (pTSBuf->autoDelete) { - // ("tsBuf %p destroyed, delete tmp file:%s", pTSBuf, pTSBuf->path); - if (taosRemoveFile(pTSBuf->path) != 0) { - // tscError("tsBuf %p destroyed, failed to remove tmp file:%s", pTSBuf, pTSBuf->path); - } - } else { - // tscDebug("tsBuf %p destroyed, tmp file:%s, remains", pTSBuf, pTSBuf->path); - } - - taosVariantDestroy(&pTSBuf->block.tag); - taosMemoryFree(pTSBuf); - return NULL; -} - -static STSGroupBlockInfoEx* tsBufGetLastGroupInfo(STSBuf* pTSBuf) { - int32_t last = pTSBuf->numOfGroups - 1; - - ASSERT(last >= 0); - return &pTSBuf->pData[last]; -} - -static STSGroupBlockInfoEx* addOneGroupInfo(STSBuf* pTSBuf, int32_t id) { - if (pTSBuf->numOfAlloc <= pTSBuf->numOfGroups) { - uint32_t newSize = (uint32_t)(pTSBuf->numOfAlloc * 1.5); - ASSERT((int32_t)newSize > pTSBuf->numOfAlloc); - - STSGroupBlockInfoEx* tmp = - (STSGroupBlockInfoEx*)taosMemoryRealloc(pTSBuf->pData, sizeof(STSGroupBlockInfoEx) * newSize); - if (tmp == NULL) { - return NULL; - } - - pTSBuf->pData = tmp; - pTSBuf->numOfAlloc = newSize; - memset(&pTSBuf->pData[pTSBuf->numOfGroups], 0, sizeof(STSGroupBlockInfoEx) * (newSize - pTSBuf->numOfGroups)); - } - - if (pTSBuf->numOfGroups > 0) { - STSGroupBlockInfoEx* pPrevBlockInfoEx = tsBufGetLastGroupInfo(pTSBuf); - - // update prev vnode length info in file - TSBufUpdateGroupInfo(pTSBuf, pTSBuf->numOfGroups - 1, &pPrevBlockInfoEx->info); - } - - // set initial value for vnode block - STSGroupBlockInfo* pBlockInfo = &pTSBuf->pData[pTSBuf->numOfGroups].info; - pBlockInfo->id = id; - pBlockInfo->offset = pTSBuf->fileSize; - ASSERT(pBlockInfo->offset >= getDataStartOffset()); - - // update vnode info in file - TSBufUpdateGroupInfo(pTSBuf, pTSBuf->numOfGroups, pBlockInfo); - - // add one vnode info - pTSBuf->numOfGroups += 1; - - // update the header info - STSBufFileHeader header = { - .magic = TS_COMP_FILE_MAGIC, .numOfGroup = pTSBuf->numOfGroups, .tsOrder = pTSBuf->tsOrder}; - - STSBufUpdateHeader(pTSBuf, &header); - return tsBufGetLastGroupInfo(pTSBuf); -} - -static void shrinkBuffer(STSList* ptsData) { - // shrink tmp buffer size if it consumes too many memory compared to the pre-defined size - if (ptsData->allocSize >= ptsData->threshold * 2) { - char* rawBuf = taosMemoryRealloc(ptsData->rawBuf, MEM_BUF_SIZE); - if (rawBuf) { - ptsData->rawBuf = rawBuf; - ptsData->allocSize = MEM_BUF_SIZE; - } - } -} - -static int32_t getTagAreaLength(SVariant* pa) { - int32_t t = sizeof(pa->nLen) * 2 + sizeof(pa->nType); - if (pa->nType != TSDB_DATA_TYPE_NULL) { - t += pa->nLen; - } - - return t; -} - -static void writeDataToDisk(STSBuf* pTSBuf) { - if (pTSBuf->tsData.len == 0) { - return; - } - - STSBlock* pBlock = &pTSBuf->block; - STSList* pTsData = &pTSBuf->tsData; - - pBlock->numOfElem = pTsData->len / TSDB_KEYSIZE; - pBlock->compLen = tsCompressTimestamp(pTsData->rawBuf, pTsData->len, pTsData->len / TSDB_KEYSIZE, pBlock->payload, - pTsData->allocSize, TWO_STAGE_COMP, pTSBuf->assistBuf, pTSBuf->bufSize); - - int64_t r = taosLSeekFile(pTSBuf->pFile, pTSBuf->fileSize, SEEK_SET); - ASSERT(r == 0); - - /* - * format for output data: - * 1. tags, number of ts, size after compressed, payload, size after compressed - * 2. tags, number of ts, size after compressed, payload, size after compressed - * - * both side has the compressed length is used to support load data forwards/backwords. - */ - int32_t metaLen = 0; - metaLen += (int32_t)taosWriteFile(pTSBuf->pFile, &pBlock->tag.nType, sizeof(pBlock->tag.nType)); - - int32_t trueLen = pBlock->tag.nLen; - if (pBlock->tag.nType == TSDB_DATA_TYPE_BINARY || pBlock->tag.nType == TSDB_DATA_TYPE_VARBINARY || - pBlock->tag.nType == TSDB_DATA_TYPE_NCHAR || pBlock->tag.nType == TSDB_DATA_TYPE_GEOMETRY) { - metaLen += (int32_t)taosWriteFile(pTSBuf->pFile, &pBlock->tag.nLen, sizeof(pBlock->tag.nLen)); - metaLen += (int32_t)taosWriteFile(pTSBuf->pFile, pBlock->tag.pz, (size_t)pBlock->tag.nLen); - } else if (pBlock->tag.nType == TSDB_DATA_TYPE_FLOAT) { - metaLen += (int32_t)taosWriteFile(pTSBuf->pFile, &pBlock->tag.nLen, sizeof(pBlock->tag.nLen)); - float tfloat = (float)pBlock->tag.d; - metaLen += (int32_t)taosWriteFile(pTSBuf->pFile, &tfloat, (size_t)pBlock->tag.nLen); - } else if (pBlock->tag.nType != TSDB_DATA_TYPE_NULL) { - metaLen += (int32_t)taosWriteFile(pTSBuf->pFile, &pBlock->tag.nLen, sizeof(pBlock->tag.nLen)); - metaLen += (int32_t)taosWriteFile(pTSBuf->pFile, &pBlock->tag.i, (size_t)pBlock->tag.nLen); - } else { - trueLen = 0; - metaLen += (int32_t)taosWriteFile(pTSBuf->pFile, &trueLen, sizeof(pBlock->tag.nLen)); - } - - taosWriteFile(pTSBuf->pFile, &pBlock->numOfElem, sizeof(pBlock->numOfElem)); - taosWriteFile(pTSBuf->pFile, &pBlock->compLen, sizeof(pBlock->compLen)); - taosWriteFile(pTSBuf->pFile, pBlock->payload, (size_t)pBlock->compLen); - taosWriteFile(pTSBuf->pFile, &pBlock->compLen, sizeof(pBlock->compLen)); - - metaLen += (int32_t)taosWriteFile(pTSBuf->pFile, &trueLen, sizeof(pBlock->tag.nLen)); - ASSERT(metaLen == getTagAreaLength(&pBlock->tag)); - - int32_t blockSize = metaLen + sizeof(pBlock->numOfElem) + sizeof(pBlock->compLen) * 2 + pBlock->compLen; - pTSBuf->fileSize += blockSize; - - pTSBuf->tsData.len = 0; - - STSGroupBlockInfoEx* pGroupBlockInfoEx = tsBufGetLastGroupInfo(pTSBuf); - - pGroupBlockInfoEx->info.compLen += blockSize; - pGroupBlockInfoEx->info.numOfBlocks += 1; - - shrinkBuffer(&pTSBuf->tsData); -} - -static void expandBuffer(STSList* ptsData, int32_t inputSize) { - if (ptsData->allocSize - ptsData->len < inputSize) { - int32_t newSize = inputSize + ptsData->len; - char* tmp = taosMemoryRealloc(ptsData->rawBuf, (size_t)newSize); - if (tmp == NULL) { - // todo - } - - ptsData->rawBuf = tmp; - ptsData->allocSize = newSize; - } -} - -STSBlock* readDataFromDisk(STSBuf* pTSBuf, int32_t order, bool decomp) { - STSBlock* pBlock = &pTSBuf->block; - - // clear the memory buffer - pBlock->compLen = 0; - pBlock->padding = 0; - pBlock->numOfElem = 0; - - int32_t offset = -1; - - if (order == TSDB_ORDER_DESC) { - /* - * set the right position for the reversed traverse, the reversed traverse is started from - * the end of each comp data block - */ - int32_t prev = -(int32_t)(sizeof(pBlock->padding) + sizeof(pBlock->tag.nLen)); - int32_t ret = taosLSeekFile(pTSBuf->pFile, prev, SEEK_CUR); - size_t sz = taosReadFile(pTSBuf->pFile, &pBlock->padding, sizeof(pBlock->padding)); - sz = taosReadFile(pTSBuf->pFile, &pBlock->tag.nLen, sizeof(pBlock->tag.nLen)); - UNUSED(sz); - - pBlock->compLen = pBlock->padding; - - offset = pBlock->compLen + sizeof(pBlock->compLen) * 2 + sizeof(pBlock->numOfElem) + getTagAreaLength(&pBlock->tag); - ret = taosLSeekFile(pTSBuf->pFile, -offset, SEEK_CUR); - UNUSED(ret); - } - - int32_t ret = taosReadFile(pTSBuf->pFile, &pBlock->tag.nType, sizeof(pBlock->tag.nType)); - ret = taosReadFile(pTSBuf->pFile, &pBlock->tag.nLen, sizeof(pBlock->tag.nLen)); - - // NOTE: mix types tags are not supported - size_t sz = 0; - if (pBlock->tag.nType == TSDB_DATA_TYPE_BINARY || pBlock->tag.nType == TSDB_DATA_TYPE_VARBINARY || - pBlock->tag.nType == TSDB_DATA_TYPE_NCHAR || pBlock->tag.nType == TSDB_DATA_TYPE_GEOMETRY) { - char* tp = taosMemoryRealloc(pBlock->tag.pz, pBlock->tag.nLen + 1); - ASSERT(tp != NULL); - - memset(tp, 0, pBlock->tag.nLen + 1); - pBlock->tag.pz = tp; - - sz = taosReadFile(pTSBuf->pFile, pBlock->tag.pz, (size_t)pBlock->tag.nLen); - UNUSED(sz); - } else if (pBlock->tag.nType == TSDB_DATA_TYPE_FLOAT) { - float tfloat = 0; - sz = taosReadFile(pTSBuf->pFile, &tfloat, (size_t)pBlock->tag.nLen); - pBlock->tag.d = (double)tfloat; - UNUSED(sz); - } else if (pBlock->tag.nType != TSDB_DATA_TYPE_NULL) { // TODO check the return value - sz = taosReadFile(pTSBuf->pFile, &pBlock->tag.i, (size_t)pBlock->tag.nLen); - UNUSED(sz); - } - - sz = taosReadFile(pTSBuf->pFile, &pBlock->numOfElem, sizeof(pBlock->numOfElem)); - UNUSED(sz); - sz = taosReadFile(pTSBuf->pFile, &pBlock->compLen, sizeof(pBlock->compLen)); - UNUSED(sz); - sz = taosReadFile(pTSBuf->pFile, pBlock->payload, (size_t)pBlock->compLen); - - if (decomp) { - pTSBuf->tsData.len = - tsDecompressTimestamp(pBlock->payload, pBlock->compLen, pBlock->numOfElem, pTSBuf->tsData.rawBuf, - pTSBuf->tsData.allocSize, TWO_STAGE_COMP, pTSBuf->assistBuf, pTSBuf->bufSize); - } - - // read the comp length at the length of comp block - sz = taosReadFile(pTSBuf->pFile, &pBlock->padding, sizeof(pBlock->padding)); - ASSERT(pBlock->padding == pBlock->compLen); - - int32_t n = 0; - sz = taosReadFile(pTSBuf->pFile, &n, sizeof(pBlock->tag.nLen)); - if (pBlock->tag.nType == TSDB_DATA_TYPE_NULL) { - ASSERT(n == 0); - } else { - ASSERT(n == pBlock->tag.nLen); - } - - UNUSED(sz); - - // for backwards traverse, set the start position at the end of previous block - if (order == TSDB_ORDER_DESC) { - int32_t r = taosLSeekFile(pTSBuf->pFile, -offset, SEEK_CUR); - UNUSED(r); - } - - return pBlock; -} - -// set the order of ts buffer if the ts order has not been set yet -static int32_t setCheckTSOrder(STSBuf* pTSBuf, const char* pData, int32_t len) { - STSList* ptsData = &pTSBuf->tsData; - - if (pTSBuf->tsOrder == -1) { - if (ptsData->len > 0) { - TSKEY lastKey = *(TSKEY*)(ptsData->rawBuf + ptsData->len - TSDB_KEYSIZE); - - if (lastKey > *(TSKEY*)pData) { - pTSBuf->tsOrder = TSDB_ORDER_DESC; - } else { - pTSBuf->tsOrder = TSDB_ORDER_ASC; - } - } else if (len > TSDB_KEYSIZE) { - // no data in current vnode, more than one ts is added, check the orders - TSKEY k1 = *(TSKEY*)(pData); - TSKEY k2 = *(TSKEY*)(pData + TSDB_KEYSIZE); - - if (k1 < k2) { - pTSBuf->tsOrder = TSDB_ORDER_ASC; - } else if (k1 > k2) { - pTSBuf->tsOrder = TSDB_ORDER_DESC; - } else { - // todo handle error - } - } - } else { - // todo the timestamp order is set, check the asc/desc order of appended data - } - - return TSDB_CODE_SUCCESS; -} - -void tsBufAppend(STSBuf* pTSBuf, int32_t id, SVariant* tag, const char* pData, int32_t len) { - STSGroupBlockInfoEx* pBlockInfo = NULL; - STSList* ptsData = &pTSBuf->tsData; - - if (pTSBuf->numOfGroups == 0 || tsBufGetLastGroupInfo(pTSBuf)->info.id != id) { - writeDataToDisk(pTSBuf); - shrinkBuffer(ptsData); - - pBlockInfo = addOneGroupInfo(pTSBuf, id); - } else { - pBlockInfo = tsBufGetLastGroupInfo(pTSBuf); - } - - ASSERT(pBlockInfo->info.id == id); - - if ((taosVariantCompare(&pTSBuf->block.tag, tag) != 0) && ptsData->len > 0) { - // new arrived data with different tags value, save current value into disk first - writeDataToDisk(pTSBuf); - } else { - expandBuffer(ptsData, len); - } - - taosVariantAssign(&pTSBuf->block.tag, tag); - memcpy(ptsData->rawBuf + ptsData->len, pData, (size_t)len); - - // todo check return value - setCheckTSOrder(pTSBuf, pData, len); - - ptsData->len += len; - pBlockInfo->len += len; - - pTSBuf->numOfTotal += len / TSDB_KEYSIZE; - - // the size of raw data exceeds the size of the default prepared buffer, so - // during getBufBlock, the output buffer needs to be large enough. - if (ptsData->len >= ptsData->threshold) { - writeDataToDisk(pTSBuf); - shrinkBuffer(ptsData); - } - - tsBufResetPos(pTSBuf); -} - -void tsBufFlush(STSBuf* pTSBuf) { - if (pTSBuf->tsData.len <= 0) { - return; - } - - writeDataToDisk(pTSBuf); - shrinkBuffer(&pTSBuf->tsData); - - STSGroupBlockInfoEx* pBlockInfoEx = tsBufGetLastGroupInfo(pTSBuf); - - // update prev vnode length info in file - TSBufUpdateGroupInfo(pTSBuf, pTSBuf->numOfGroups - 1, &pBlockInfoEx->info); - - // save the ts order into header - STSBufFileHeader header = { - .magic = TS_COMP_FILE_MAGIC, .numOfGroup = pTSBuf->numOfGroups, .tsOrder = pTSBuf->tsOrder}; - STSBufUpdateHeader(pTSBuf, &header); -} - -static int32_t tsBufFindGroupById(STSGroupBlockInfoEx* pGroupInfoEx, int32_t numOfGroups, int32_t id) { - int32_t j = -1; - for (int32_t i = 0; i < numOfGroups; ++i) { - if (pGroupInfoEx[i].info.id == id) { - j = i; - break; - } - } - - return j; -} - -// todo opt performance by cache blocks info -static int32_t tsBufFindBlock(STSBuf* pTSBuf, STSGroupBlockInfo* pBlockInfo, int32_t blockIndex) { - if (taosLSeekFile(pTSBuf->pFile, pBlockInfo->offset, SEEK_SET) != 0) { - return -1; - } - - // sequentially read the compressed data blocks, start from the beginning of the comp data block of this vnode - int32_t i = 0; - bool decomp = false; - - while ((i++) <= blockIndex) { - if (readDataFromDisk(pTSBuf, TSDB_ORDER_ASC, decomp) == NULL) { - return -1; - } - } - - // set the file position to be the end of previous comp block - if (pTSBuf->cur.order == TSDB_ORDER_DESC) { - STSBlock* pBlock = &pTSBuf->block; - int32_t compBlockSize = - pBlock->compLen + sizeof(pBlock->compLen) * 2 + sizeof(pBlock->numOfElem) + getTagAreaLength(&pBlock->tag); - int32_t ret = taosLSeekFile(pTSBuf->pFile, -compBlockSize, SEEK_CUR); - UNUSED(ret); - } - - return 0; -} - -static int32_t tsBufFindBlockByTag(STSBuf* pTSBuf, STSGroupBlockInfo* pBlockInfo, SVariant* tag) { - bool decomp = false; - - int64_t offset = 0; - if (pTSBuf->cur.order == TSDB_ORDER_ASC) { - offset = pBlockInfo->offset; - } else { // reversed traverse starts from the end of block - offset = pBlockInfo->offset + pBlockInfo->compLen; - } - - if (taosLSeekFile(pTSBuf->pFile, (int32_t)offset, SEEK_SET) != 0) { - return -1; - } - - for (int32_t i = 0; i < pBlockInfo->numOfBlocks; ++i) { - if (readDataFromDisk(pTSBuf, pTSBuf->cur.order, decomp) == NULL) { - return -1; - } - - if (taosVariantCompare(&pTSBuf->block.tag, tag) == 0) { - return (pTSBuf->cur.order == TSDB_ORDER_ASC) ? i : (pBlockInfo->numOfBlocks - (i + 1)); - } - } - - return -1; -} - -static void tsBufGetBlock(STSBuf* pTSBuf, int32_t groupIndex, int32_t blockIndex) { - STSGroupBlockInfo* pBlockInfo = &pTSBuf->pData[groupIndex].info; - if (pBlockInfo->numOfBlocks <= blockIndex) { - ASSERT(false); - } - - STSCursor* pCur = &pTSBuf->cur; - if (pCur->vgroupIndex == groupIndex && ((pCur->blockIndex <= blockIndex && pCur->order == TSDB_ORDER_ASC) || - (pCur->blockIndex >= blockIndex && pCur->order == TSDB_ORDER_DESC))) { - int32_t i = 0; - bool decomp = false; - int32_t step = abs(blockIndex - pCur->blockIndex); - - while ((++i) <= step) { - if (readDataFromDisk(pTSBuf, pCur->order, decomp) == NULL) { - return; - } - } - } else { - if (tsBufFindBlock(pTSBuf, pBlockInfo, blockIndex) == -1) { - ASSERT(false); - } - } - - STSBlock* pBlock = &pTSBuf->block; - - size_t s = pBlock->numOfElem * TSDB_KEYSIZE; - - /* - * In order to accommodate all the qualified data, the actual buffer size for one block with identical tags value - * may exceed the maximum allowed size during *tsBufAppend* function by invoking expandBuffer function - */ - if (s > pTSBuf->tsData.allocSize) { - expandBuffer(&pTSBuf->tsData, (int32_t)s); - } - - pTSBuf->tsData.len = - tsDecompressTimestamp(pBlock->payload, pBlock->compLen, pBlock->numOfElem, pTSBuf->tsData.rawBuf, - pTSBuf->tsData.allocSize, TWO_STAGE_COMP, pTSBuf->assistBuf, pTSBuf->bufSize); - - ASSERT((pTSBuf->tsData.len / TSDB_KEYSIZE == pBlock->numOfElem) && (pTSBuf->tsData.allocSize >= pTSBuf->tsData.len)); - - pCur->vgroupIndex = groupIndex; - pCur->blockIndex = blockIndex; - - pCur->tsIndex = (pCur->order == TSDB_ORDER_ASC) ? 0 : pBlock->numOfElem - 1; -} - -static int32_t doUpdateGroupInfo(STSBuf* pTSBuf, int64_t offset, STSGroupBlockInfo* pVInfo) { - if (offset < 0 || offset >= getDataStartOffset()) { - return -1; - } - - if (taosLSeekFile(pTSBuf->pFile, (int32_t)offset, SEEK_SET) != 0) { - return -1; - } - - taosWriteFile(pTSBuf->pFile, pVInfo, sizeof(STSGroupBlockInfo)); - return 0; -} - -STSGroupBlockInfo* tsBufGetGroupBlockInfo(STSBuf* pTSBuf, int32_t id) { - int32_t j = tsBufFindGroupById(pTSBuf->pData, pTSBuf->numOfGroups, id); - if (j == -1) { - return NULL; - } - - return &pTSBuf->pData[j].info; -} - -int32_t STSBufUpdateHeader(STSBuf* pTSBuf, STSBufFileHeader* pHeader) { - if ((pTSBuf->pFile == NULL) || pHeader == NULL || pHeader->numOfGroup == 0 || pHeader->magic != TS_COMP_FILE_MAGIC) { - return -1; - } - - if (pHeader->tsOrder != TSDB_ORDER_ASC && pHeader->tsOrder != TSDB_ORDER_DESC) { - return -1; - } - - int32_t r = taosLSeekFile(pTSBuf->pFile, 0, SEEK_SET); - if (r != 0) { - // qError("fseek failed, errno:%d", errno); - return -1; - } - - size_t ws = taosWriteFile(pTSBuf->pFile, pHeader, sizeof(STSBufFileHeader)); - if (ws != 1) { - // qError("ts update header fwrite failed, size:%d, expected size:%d", (int32_t)ws, - // (int32_t)sizeof(STSBufFileHeader)); - return -1; - } - return 0; -} - -bool tsBufNextPos(STSBuf* pTSBuf) { - if (pTSBuf == NULL || pTSBuf->numOfGroups == 0) { - return false; - } - - STSCursor* pCur = &pTSBuf->cur; - - // get the first/last position according to traverse order - if (pCur->vgroupIndex == -1) { - if (pCur->order == TSDB_ORDER_ASC) { - tsBufGetBlock(pTSBuf, 0, 0); - - if (pTSBuf->block.numOfElem == 0) { // the whole list is empty, return - tsBufResetPos(pTSBuf); - return false; - } else { - return true; - } - - } else { // get the last timestamp record in the last block of the last vnode - ASSERT(pTSBuf->numOfGroups > 0); - - int32_t groupIndex = pTSBuf->numOfGroups - 1; - pCur->vgroupIndex = groupIndex; - - int32_t id = pTSBuf->pData[pCur->vgroupIndex].info.id; - STSGroupBlockInfo* pBlockInfo = tsBufGetGroupBlockInfo(pTSBuf, id); - int32_t blockIndex = pBlockInfo->numOfBlocks - 1; - - tsBufGetBlock(pTSBuf, groupIndex, blockIndex); - - pCur->tsIndex = pTSBuf->block.numOfElem - 1; - if (pTSBuf->block.numOfElem == 0) { - tsBufResetPos(pTSBuf); - return false; - } else { - return true; - } - } - } - - int32_t step = pCur->order == TSDB_ORDER_ASC ? 1 : -1; - - while (1) { - ASSERT(pTSBuf->tsData.len == pTSBuf->block.numOfElem * TSDB_KEYSIZE); - - if ((pCur->order == TSDB_ORDER_ASC && pCur->tsIndex >= pTSBuf->block.numOfElem - 1) || - (pCur->order == TSDB_ORDER_DESC && pCur->tsIndex <= 0)) { - int32_t id = pTSBuf->pData[pCur->vgroupIndex].info.id; - - STSGroupBlockInfo* pBlockInfo = tsBufGetGroupBlockInfo(pTSBuf, id); - if (pBlockInfo == NULL || (pCur->blockIndex >= pBlockInfo->numOfBlocks - 1 && pCur->order == TSDB_ORDER_ASC) || - (pCur->blockIndex <= 0 && pCur->order == TSDB_ORDER_DESC)) { - if ((pCur->vgroupIndex >= pTSBuf->numOfGroups - 1 && pCur->order == TSDB_ORDER_ASC) || - (pCur->vgroupIndex <= 0 && pCur->order == TSDB_ORDER_DESC)) { - pCur->vgroupIndex = -1; - return false; - } - - if (pBlockInfo == NULL) { - return false; - } - - int32_t blockIndex = (pCur->order == TSDB_ORDER_ASC) ? 0 : (pBlockInfo->numOfBlocks - 1); - tsBufGetBlock(pTSBuf, pCur->vgroupIndex + step, blockIndex); - break; - - } else { - tsBufGetBlock(pTSBuf, pCur->vgroupIndex, pCur->blockIndex + step); - break; - } - } else { - pCur->tsIndex += step; - break; - } - } - - return true; -} - -void tsBufResetPos(STSBuf* pTSBuf) { - if (pTSBuf == NULL) { - return; - } - - pTSBuf->cur = (STSCursor){.tsIndex = -1, .blockIndex = -1, .vgroupIndex = -1, .order = pTSBuf->cur.order}; -} - -STSElem tsBufGetElem(STSBuf* pTSBuf) { - STSElem elem1 = {.id = -1}; - if (pTSBuf == NULL) { - return elem1; - } - - STSCursor* pCur = &pTSBuf->cur; - if (pCur != NULL && pCur->vgroupIndex < 0) { - return elem1; - } - - STSBlock* pBlock = &pTSBuf->block; - - elem1.id = pTSBuf->pData[pCur->vgroupIndex].info.id; - elem1.ts = *(TSKEY*)(pTSBuf->tsData.rawBuf + pCur->tsIndex * TSDB_KEYSIZE); - elem1.tag = &pBlock->tag; - - return elem1; -} - -/** - * current only support ts comp data from two vnode merge - * @param pDestBuf - * @param pSrcBuf - * @param id - * @return - */ -int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf) { - if (pDestBuf == NULL || pSrcBuf == NULL || pSrcBuf->numOfGroups <= 0) { - return 0; - } - - if (pDestBuf->numOfGroups + pSrcBuf->numOfGroups > TS_COMP_FILE_GROUP_MAX) { - return -1; - } - - // src can only have one vnode index - ASSERT(pSrcBuf->numOfGroups == 1); - - // there are data in buffer, flush to disk first - tsBufFlush(pDestBuf); - - // compared with the last vnode id - int32_t id = tsBufGetLastGroupInfo((STSBuf*)pSrcBuf)->info.id; - if (id != tsBufGetLastGroupInfo(pDestBuf)->info.id) { - int32_t oldSize = pDestBuf->numOfGroups; - int32_t newSize = oldSize + pSrcBuf->numOfGroups; - - if (pDestBuf->numOfAlloc < newSize) { - pDestBuf->numOfAlloc = newSize; - - STSGroupBlockInfoEx* tmp = taosMemoryRealloc(pDestBuf->pData, sizeof(STSGroupBlockInfoEx) * newSize); - if (tmp == NULL) { - return -1; - } - - pDestBuf->pData = tmp; - } - - // directly copy the vnode index information - memcpy(&pDestBuf->pData[oldSize], pSrcBuf->pData, (size_t)pSrcBuf->numOfGroups * sizeof(STSGroupBlockInfoEx)); - - // set the new offset value - for (int32_t i = 0; i < pSrcBuf->numOfGroups; ++i) { - STSGroupBlockInfoEx* pBlockInfoEx = &pDestBuf->pData[i + oldSize]; - pBlockInfoEx->info.offset = (pSrcBuf->pData[i].info.offset - getDataStartOffset()) + pDestBuf->fileSize; - pBlockInfoEx->info.id = id; - } - - pDestBuf->numOfGroups = newSize; - } else { - STSGroupBlockInfoEx* pBlockInfoEx = tsBufGetLastGroupInfo(pDestBuf); - - pBlockInfoEx->len += pSrcBuf->pData[0].len; - pBlockInfoEx->info.numOfBlocks += pSrcBuf->pData[0].info.numOfBlocks; - pBlockInfoEx->info.compLen += pSrcBuf->pData[0].info.compLen; - pBlockInfoEx->info.id = id; - } - - int32_t r = taosLSeekFile(pDestBuf->pFile, 0, SEEK_END); - ASSERT(r == 0); - - int64_t offset = getDataStartOffset(); - int32_t size = (int32_t)pSrcBuf->fileSize - (int32_t)offset; - int64_t written = taosFSendFile(pDestBuf->pFile, pSrcBuf->pFile, &offset, size); - - if (written == -1 || written != size) { - return -1; - } - - pDestBuf->numOfTotal += pSrcBuf->numOfTotal; - - int32_t oldSize = pDestBuf->fileSize; - - // file meta data may be cached, close and reopen the file for accurate file size. - taosCloseFile(&pDestBuf->pFile); - // pDestBuf->pFile = fopen(pDestBuf->path, "rb+"); - pDestBuf->pFile = taosOpenFile(pDestBuf->path, TD_FILE_WRITE | TD_FILE_READ); - if (pDestBuf->pFile == NULL) { - return -1; - } - - int64_t file_size; - if (taosFStatFile(pDestBuf->pFile, &file_size, NULL) != 0) { - return -1; - } - pDestBuf->fileSize = (uint32_t)file_size; - - ASSERT(pDestBuf->fileSize == oldSize + size); - - return 0; -} - -STSBuf* tsBufCreateFromCompBlocks(const char* pData, int32_t numOfBlocks, int32_t len, int32_t order, int32_t id) { - STSBuf* pTSBuf = tsBufCreate(true, order); - - STSGroupBlockInfo* pBlockInfo = &(addOneGroupInfo(pTSBuf, 0)->info); - pBlockInfo->numOfBlocks = numOfBlocks; - pBlockInfo->compLen = len; - pBlockInfo->offset = getDataStartOffset(); - pBlockInfo->id = id; - - // update prev vnode length info in file - TSBufUpdateGroupInfo(pTSBuf, pTSBuf->numOfGroups - 1, pBlockInfo); - - int32_t ret = taosLSeekFile(pTSBuf->pFile, pBlockInfo->offset, SEEK_SET); - if (ret == -1) { - // qError("fseek failed, errno:%d", errno); - tsBufDestroy(pTSBuf); - return NULL; - } - size_t sz = taosWriteFile(pTSBuf->pFile, (void*)pData, len); - if (sz != len) { - // qError("ts data fwrite failed, write size:%d, expected size:%d", (int32_t)sz, len); - tsBufDestroy(pTSBuf); - return NULL; - } - pTSBuf->fileSize += len; - - pTSBuf->tsOrder = order; - if (order != TSDB_ORDER_ASC && order != TSDB_ORDER_DESC) { - tsBufDestroy(pTSBuf); - return NULL; - } - - STSBufFileHeader header = { - .magic = TS_COMP_FILE_MAGIC, .numOfGroup = pTSBuf->numOfGroups, .tsOrder = pTSBuf->tsOrder}; - if (STSBufUpdateHeader(pTSBuf, &header) < 0) { - tsBufDestroy(pTSBuf); - return NULL; - } - - // TODO taosFsync?? - // if (taosFsync(fileno(pTSBuf->pFile)) == -1) { - //// qError("fsync failed, errno:%d", errno); - // tsBufDestroy(pTSBuf); - // return NULL; - // } - - return pTSBuf; -} - -STSElem tsBufGetElemStartPos(STSBuf* pTSBuf, int32_t id, SVariant* tag) { - STSElem elem = {.id = -1}; - - if (pTSBuf == NULL) { - return elem; - } - - int32_t j = tsBufFindGroupById(pTSBuf->pData, pTSBuf->numOfGroups, id); - if (j == -1) { - return elem; - } - - // for debug purpose - // tsBufDisplay(pTSBuf); - - STSCursor* pCur = &pTSBuf->cur; - STSGroupBlockInfo* pBlockInfo = &pTSBuf->pData[j].info; - - int32_t blockIndex = tsBufFindBlockByTag(pTSBuf, pBlockInfo, tag); - if (blockIndex < 0) { - return elem; - } - - pCur->vgroupIndex = j; - pCur->blockIndex = blockIndex; - tsBufGetBlock(pTSBuf, j, blockIndex); - - return tsBufGetElem(pTSBuf); -} - -STSCursor tsBufGetCursor(STSBuf* pTSBuf) { - STSCursor c = {.vgroupIndex = -1}; - if (pTSBuf == NULL) { - return c; - } - - return pTSBuf->cur; -} - -void tsBufSetCursor(STSBuf* pTSBuf, STSCursor* pCur) { - if (pTSBuf == NULL || pCur == NULL) { - return; - } - - if (pCur->vgroupIndex != -1) { - tsBufGetBlock(pTSBuf, pCur->vgroupIndex, pCur->blockIndex); - } - - pTSBuf->cur = *pCur; -} - -void tsBufSetTraverseOrder(STSBuf* pTSBuf, int32_t order) { - if (pTSBuf == NULL) { - return; - } - - pTSBuf->cur.order = order; -} - -STSBuf* tsBufClone(STSBuf* pTSBuf) { - if (pTSBuf == NULL) { - return NULL; - } - - tsBufFlush(pTSBuf); - - return tsBufCreateFromFile(pTSBuf->path, false); -} - -void tsBufDisplay(STSBuf* pTSBuf) { - printf("-------start of ts comp file-------\n"); - printf("number of vnode:%d\n", pTSBuf->numOfGroups); - - int32_t old = pTSBuf->cur.order; - pTSBuf->cur.order = TSDB_ORDER_ASC; - - tsBufResetPos(pTSBuf); - - while (tsBufNextPos(pTSBuf)) { - STSElem elem = tsBufGetElem(pTSBuf); - if (elem.tag->nType == TSDB_DATA_TYPE_BIGINT) { - printf("%d-%" PRId64 "-%" PRId64 "\n", elem.id, elem.tag->i, elem.ts); - } - } - - pTSBuf->cur.order = old; - printf("-------end of ts comp file-------\n"); -} - -static int32_t getDataStartOffset() { - return sizeof(STSBufFileHeader) + TS_COMP_FILE_GROUP_MAX * sizeof(STSGroupBlockInfo); -} - -// update prev vnode length info in file -static void TSBufUpdateGroupInfo(STSBuf* pTSBuf, int32_t index, STSGroupBlockInfo* pBlockInfo) { - int32_t offset = sizeof(STSBufFileHeader) + index * sizeof(STSGroupBlockInfo); - doUpdateGroupInfo(pTSBuf, offset, pBlockInfo); -} - -static STSBuf* allocResForTSBuf(STSBuf* pTSBuf) { - const int32_t INITIAL_GROUPINFO_SIZE = 4; - - pTSBuf->numOfAlloc = INITIAL_GROUPINFO_SIZE; - pTSBuf->pData = taosMemoryCalloc(pTSBuf->numOfAlloc, sizeof(STSGroupBlockInfoEx)); - if (pTSBuf->pData == NULL) { - tsBufDestroy(pTSBuf); - return NULL; - } - - pTSBuf->tsData.rawBuf = taosMemoryMalloc(MEM_BUF_SIZE); - if (pTSBuf->tsData.rawBuf == NULL) { - tsBufDestroy(pTSBuf); - return NULL; - } - - pTSBuf->bufSize = MEM_BUF_SIZE; - pTSBuf->tsData.threshold = MEM_BUF_SIZE; - pTSBuf->tsData.allocSize = MEM_BUF_SIZE; - - pTSBuf->assistBuf = taosMemoryMalloc(MEM_BUF_SIZE); - if (pTSBuf->assistBuf == NULL) { - tsBufDestroy(pTSBuf); - return NULL; - } - - pTSBuf->block.payload = taosMemoryMalloc(MEM_BUF_SIZE); - if (pTSBuf->block.payload == NULL) { - tsBufDestroy(pTSBuf); - return NULL; - } - - pTSBuf->fileSize += getDataStartOffset(); - return pTSBuf; -} - -int32_t tsBufGetNumOfGroup(STSBuf* pTSBuf) { - if (pTSBuf == NULL) { - return 0; - } - - return pTSBuf->numOfGroups; -} - -void tsBufGetGroupIdList(STSBuf* pTSBuf, int32_t* num, int32_t** id) { - int32_t size = tsBufGetNumOfGroup(pTSBuf); - if (num != NULL) { - *num = size; - } - - *id = NULL; - if (size == 0) { - return; - } - - (*id) = taosMemoryMalloc(tsBufGetNumOfGroup(pTSBuf) * sizeof(int32_t)); - - for (int32_t i = 0; i < size; ++i) { - (*id)[i] = pTSBuf->pData[i].info.id; - } -} - -int32_t dumpFileBlockByGroupId(STSBuf* pTSBuf, int32_t groupIndex, void* buf, int32_t* len, int32_t* numOfBlocks) { - ASSERT(groupIndex >= 0 && groupIndex < pTSBuf->numOfGroups); - STSGroupBlockInfo* pBlockInfo = &pTSBuf->pData[groupIndex].info; - - *len = 0; - *numOfBlocks = 0; - - if (taosLSeekFile(pTSBuf->pFile, pBlockInfo->offset, SEEK_SET) != 0) { - int32_t code = TAOS_SYSTEM_ERROR(taosGetErrorFile(pTSBuf->pFile)); - // qError("%p: fseek failed: %s", pSql, tstrerror(code)); - return code; - } - - size_t s = taosReadFile(pTSBuf->pFile, buf, pBlockInfo->compLen); - if (s != pBlockInfo->compLen) { - int32_t code = TAOS_SYSTEM_ERROR(taosGetErrorFile(pTSBuf->pFile)); - // tscError("%p: fread didn't return expected data: %s", pSql, tstrerror(code)); - return code; - } - - *len = pBlockInfo->compLen; - *numOfBlocks = pBlockInfo->numOfBlocks; - - return TSDB_CODE_SUCCESS; -} - -STSElem tsBufFindElemStartPosByTag(STSBuf* pTSBuf, SVariant* pTag) { - STSElem el = {.id = -1}; - - for (int32_t i = 0; i < pTSBuf->numOfGroups; ++i) { - el = tsBufGetElemStartPos(pTSBuf, pTSBuf->pData[i].info.id, pTag); - if (el.id == pTSBuf->pData[i].info.id) { - return el; - } - } - - return el; -} - -bool tsBufIsValidElem(STSElem* pElem) { return pElem->id >= 0; } diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index 7b27b41177..fbb61c7c90 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -30,11 +30,11 @@ longjmp((_obj), (_c)); \ } while (0) -#define SET_RES_WINDOW_KEY(_k, _ori, _len, _uid) \ - do { \ - assert(sizeof(_uid) == sizeof(uint64_t)); \ - *(uint64_t*)(_k) = (_uid); \ - memcpy((_k) + sizeof(uint64_t), (_ori), (_len)); \ +#define SET_RES_WINDOW_KEY(_k, _ori, _len, _uid) \ + do { \ + assert(sizeof(_uid) == sizeof(uint64_t)); \ + *(uint64_t*)(_k) = (_uid); \ + (void)memcpy((_k) + sizeof(uint64_t), (_ori), (_len)); \ } while (0) #define GET_RES_WINDOW_KEY_LEN(_l) ((_l) + sizeof(uint64_t)) @@ -175,7 +175,7 @@ int32_t extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNod int32_t type, SColMatchInfo* pMatchInfo); int32_t createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId); -int32_t createExprFromTargetNode(SExprInfo* pExp, STargetNode* pTargetNode); +int32_t createExprFromTargetNode(SExprInfo* pExp, STargetNode* pTargetNode); SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* numOfExprs); SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, int32_t** rowEntryInfoOffset, diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index 31fdc5b291..eb36141ef0 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -24,7 +24,6 @@ extern "C" { #include "theap.h" #include "tlosertree.h" #include "tsort.h" -#include "ttszip.h" #include "tvariant.h" #include "dataSinkMgt.h" @@ -904,7 +903,7 @@ int32_t appendDataToSpecialBlock(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pE uint64_t calGroupIdByData(SPartitionBySupporter* pParSup, SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t rowId); void finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, SExprSupp* pSup, - SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo); + SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo); bool groupbyTbname(SNodeList* pGroupList); void getNextIntervalWindow(SInterval* pInterval, STimeWindow* tw, int32_t order); diff --git a/source/libs/function/inc/tpercentile.h b/source/libs/function/inc/tpercentile.h index 90fb279259..118571c8aa 100644 --- a/source/libs/function/inc/tpercentile.h +++ b/source/libs/function/inc/tpercentile.h @@ -21,17 +21,16 @@ extern "C" { #endif #include "tpagedbuf.h" -#include "ttszip.h" typedef struct MinMaxEntry { union { - double dMinVal; - //double i64MinVal; + double dMinVal; + // double i64MinVal; uint64_t u64MinVal; }; union { - double dMaxVal; - //double i64MaxVal; + double dMaxVal; + // double i64MaxVal; int64_t u64MaxVal; }; } MinMaxEntry; diff --git a/source/libs/function/src/tfunctionInt.c b/source/libs/function/src/tfunctionInt.c index 80b869b2d2..6a841f6ffa 100644 --- a/source/libs/function/src/tfunctionInt.c +++ b/source/libs/function/src/tfunctionInt.c @@ -25,5 +25,4 @@ #include "tfunctionInt.h" #include "thistogram.h" #include "tpercentile.h" -#include "ttszip.h" #include "tudf.h" diff --git a/source/libs/index/inc/indexUtil.h b/source/libs/index/inc/indexUtil.h index 148a521d5a..308e213ac9 100644 --- a/source/libs/index/inc/indexUtil.h +++ b/source/libs/index/inc/indexUtil.h @@ -21,29 +21,29 @@ extern "C" { #endif -#define SERIALIZE_MEM_TO_BUF(buf, key, mem) \ - do { \ - memcpy((void *)buf, (void *)(&key->mem), sizeof(key->mem)); \ - buf += sizeof(key->mem); \ +#define SERIALIZE_MEM_TO_BUF(buf, key, mem) \ + do { \ + (void)memcpy((void *)buf, (void *)(&key->mem), sizeof(key->mem)); \ + buf += sizeof(key->mem); \ } while (0) -#define SERIALIZE_STR_MEM_TO_BUF(buf, key, mem, len) \ - do { \ - memcpy((void *)buf, (void *)key->mem, len); \ - buf += len; \ +#define SERIALIZE_STR_MEM_TO_BUF(buf, key, mem, len) \ + do { \ + (void)memcpy((void *)buf, (void *)key->mem, len); \ + buf += len; \ } while (0) -#define SERIALIZE_VAR_TO_BUF(buf, var, type) \ - do { \ - type c = var; \ - memcpy((void *)buf, (void *)&c, sizeof(c)); \ - buf += sizeof(c); \ +#define SERIALIZE_VAR_TO_BUF(buf, var, type) \ + do { \ + type c = var; \ + (void)memcpy((void *)buf, (void *)&c, sizeof(c)); \ + buf += sizeof(c); \ } while (0) -#define SERIALIZE_STR_VAR_TO_BUF(buf, var, len) \ - do { \ - memcpy((void *)buf, (void *)var, len); \ - buf += len; \ +#define SERIALIZE_STR_VAR_TO_BUF(buf, var, len) \ + do { \ + (void)memcpy((void *)buf, (void *)var, len); \ + buf += len; \ } while (0) #define INDEX_MERGE_ADD_DEL(src, dst, tgt) \ diff --git a/source/libs/scalar/inc/filterInt.h b/source/libs/scalar/inc/filterInt.h index a645c43f86..ab04f06b02 100644 --- a/source/libs/scalar/inc/filterInt.h +++ b/source/libs/scalar/inc/filterInt.h @@ -103,7 +103,7 @@ typedef int32_t (*filer_get_col_from_name)(void *, int32_t, char *, void **); typedef struct SFilterDataInfo { int32_t idx; - void* addr; + void *addr; } SFilterDataInfo; typedef struct SFilterRangeCompare { @@ -227,10 +227,9 @@ typedef struct SFltTreeStat { SFilterInfo *info; } SFltTreeStat; - typedef struct SFltScalarCtx { - SNode *node; - SArray* fltSclRange; + SNode *node; + SArray *fltSclRange; } SFltScalarCtx; typedef struct SFltBuildGroupCtx { @@ -271,9 +270,9 @@ struct SFilterInfo { SFilterPCtx pctx; }; -#define FILTER_NO_MERGE_DATA_TYPE(t) \ - ((t) == TSDB_DATA_TYPE_BINARY || (t) == TSDB_DATA_TYPE_VARBINARY || (t) == TSDB_DATA_TYPE_NCHAR || (t) == TSDB_DATA_TYPE_JSON || \ - (t) == TSDB_DATA_TYPE_GEOMETRY) +#define FILTER_NO_MERGE_DATA_TYPE(t) \ + ((t) == TSDB_DATA_TYPE_BINARY || (t) == TSDB_DATA_TYPE_VARBINARY || (t) == TSDB_DATA_TYPE_NCHAR || \ + (t) == TSDB_DATA_TYPE_JSON || (t) == TSDB_DATA_TYPE_GEOMETRY) #define FILTER_NO_MERGE_OPTR(o) ((o) == OP_TYPE_IS_NULL || (o) == OP_TYPE_IS_NOT_NULL || (o) == FILTER_DUMMY_EMPTY_OPTR) #define MR_EMPTY_RES(ctx) (ctx->rs == NULL) @@ -361,29 +360,29 @@ struct SFilterInfo { _r = n; \ } \ } while (0) -#define INSERT_RANGE(ctx, r, ra) \ - do { \ - SFilterRangeNode *n = NULL; \ - FLT_ERR_RET(filterNewRange(ctx, ra, &n)); \ - n->prev = (r)->prev; \ - if ((r)->prev) { \ - (r)->prev->next = n; \ - } else { \ - (ctx)->rs = n; \ - } \ - (r)->prev = n; \ - n->next = r; \ +#define INSERT_RANGE(ctx, r, ra) \ + do { \ + SFilterRangeNode *n = NULL; \ + FLT_ERR_RET(filterNewRange(ctx, ra, &n)); \ + n->prev = (r)->prev; \ + if ((r)->prev) { \ + (r)->prev->next = n; \ + } else { \ + (ctx)->rs = n; \ + } \ + (r)->prev = n; \ + n->next = r; \ } while (0) -#define APPEND_RANGE(ctx, r, ra) \ - do { \ - SFilterRangeNode *n = NULL; \ - FLT_ERR_RET(filterNewRange(ctx, ra, &n)); \ - n->prev = (r); \ - if (r) { \ - (r)->next = n; \ - } else { \ - (ctx)->rs = n; \ - } \ +#define APPEND_RANGE(ctx, r, ra) \ + do { \ + SFilterRangeNode *n = NULL; \ + FLT_ERR_RET(filterNewRange(ctx, ra, &n)); \ + n->prev = (r); \ + if (r) { \ + (r)->next = n; \ + } else { \ + (ctx)->rs = n; \ + } \ } while (0) #define FLT_IS_COMPARISON_OPERATOR(_op) ((_op) >= OP_TYPE_GREATER_THAN && (_op) < OP_TYPE_IS_NOT_UNKNOWN) @@ -454,7 +453,7 @@ struct SFilterInfo { #define FILTER_UNIT_OPTR(u) ((u)->compare.optr) #define FILTER_UNIT_COMP_FUNC(u) ((u)->compare.func) -#define FILTER_UNIT_CLR_F(i) memset((i)->unitFlags, 0, (i)->unitNum * sizeof(*info->unitFlags)) +#define FILTER_UNIT_CLR_F(i) (void)memset((i)->unitFlags, 0, (i)->unitNum * sizeof(*info->unitFlags)) #define FILTER_UNIT_SET_F(i, idx) (i)->unitFlags[idx] = 1 #define FILTER_UNIT_GET_F(i, idx) ((i)->unitFlags[idx]) #define FILTER_UNIT_GET_R(i, idx) ((i)->unitRes[idx]) @@ -482,7 +481,7 @@ struct SFilterInfo { #define FILTER_COPY_IDX(dst, src, n) \ do { \ *(dst) = taosMemoryMalloc(sizeof(uint32_t) * n); \ - memcpy(*(dst), src, sizeof(uint32_t) * n); \ + (void)memcpy(*(dst), src, sizeof(uint32_t) * n); \ } while (0) #define FILTER_ADD_CTX_TO_GRES(gres, idx, ctx) \ diff --git a/source/libs/tdb/src/db/tdbDb.c b/source/libs/tdb/src/db/tdbDb.c index ca08231d97..47a7b6fe7a 100644 --- a/source/libs/tdb/src/db/tdbDb.c +++ b/source/libs/tdb/src/db/tdbDb.c @@ -31,7 +31,7 @@ int32_t tdbOpen(const char *dbname, int32_t szPage, int32_t pages, TDB **ppDb, i pPtr = (uint8_t *)tdbOsCalloc(1, zsize); if (pPtr == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; + return terrno; } pDb = (TDB *)pPtr; @@ -64,7 +64,7 @@ int32_t tdbOpen(const char *dbname, int32_t szPage, int32_t pages, TDB **ppDb, i tsize = sizeof(SPager *) * pDb->nPgrHash; pDb->pgrHash = tdbOsMalloc(tsize); if (pDb->pgrHash == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; + return terrno; } memset(pDb->pgrHash, 0, tsize); @@ -125,7 +125,7 @@ int32_t tdbBegin(TDB *pDb, TXN **ppTxn, void *(*xMalloc)(void *, size_t), void ( TXN *pTxn = tdbOsCalloc(1, sizeof(*pTxn)); if (!pTxn) { - return TSDB_CODE_OUT_OF_MEMORY; + return terrno; } ret = tdbTxnOpen(pTxn, txnId, xMalloc, xFree, xArg, flags); diff --git a/utils/TSZ/zstd/common/bitstream.h b/utils/TSZ/zstd/common/bitstream.h index 2f91460c5e..6a154ceb57 100644 --- a/utils/TSZ/zstd/common/bitstream.h +++ b/utils/TSZ/zstd/common/bitstream.h @@ -286,7 +286,7 @@ MEM_STATIC size_t BIT_closeCStream(BIT_CStream_t* bitC) */ MEM_STATIC size_t BIT_initDStream(BIT_DStream_t* bitD, const void* srcBuffer, size_t srcSize) { - if (srcSize < 1) { memset(bitD, 0, sizeof(*bitD)); return ERROR(srcSize_wrong); } + if (srcSize < 1) { (void)memset(bitD, 0, sizeof(*bitD)); return ERROR(srcSize_wrong); } bitD->start = (const char*)srcBuffer; bitD->limitPtr = bitD->start + sizeof(bitD->bitContainer);