refact some code
This commit is contained in:
parent
a4cfb51b99
commit
27b719febd
|
@ -0,0 +1,64 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef _TD_UTIL_TREALLOC_H_
|
||||
#define _TD_UTIL_TREALLOC_H_
|
||||
|
||||
#include "os.h"
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
static FORCE_INLINE int32_t tRealloc(uint8_t **ppBuf, int64_t size) {
|
||||
int32_t code = 0;
|
||||
int64_t bsize = 0;
|
||||
uint8_t *pBuf;
|
||||
|
||||
if (*ppBuf) {
|
||||
bsize = *(int64_t *)((*ppBuf) - sizeof(int64_t));
|
||||
}
|
||||
|
||||
if (bsize >= size) goto _exit;
|
||||
|
||||
if (bsize == 0) bsize = 64;
|
||||
while (bsize < size) {
|
||||
bsize *= 2;
|
||||
}
|
||||
|
||||
pBuf = taosMemoryRealloc(*ppBuf ? (*ppBuf) - sizeof(int64_t) : *ppBuf, bsize + sizeof(int64_t));
|
||||
if (pBuf == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
*(int64_t *)pBuf = bsize;
|
||||
*ppBuf = pBuf + sizeof(int64_t);
|
||||
|
||||
_exit:
|
||||
return code;
|
||||
}
|
||||
|
||||
static FORCE_INLINE void tFree(uint8_t *pBuf) {
|
||||
if (pBuf) {
|
||||
taosMemoryFree(pBuf - sizeof(int64_t));
|
||||
}
|
||||
}
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif /*_TD_UTIL_TREALLOC_H_*/
|
|
@ -1,55 +0,0 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef _TD_UTIL_MALLOCATOR_H_
|
||||
#define _TD_UTIL_MALLOCATOR_H_
|
||||
|
||||
#include "os.h"
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
// Memory allocator
|
||||
#define TD_MEM_ALCT(TYPE) \
|
||||
struct { \
|
||||
void *(*malloc_)(struct TYPE *, uint64_t size); \
|
||||
void (*free_)(struct TYPE *, void *ptr); \
|
||||
}
|
||||
#define TD_MA_MALLOC_FUNC(TMA) (TMA)->malloc_
|
||||
#define TD_MA_FREE_FUNC(TMA) (TMA)->free_
|
||||
|
||||
#define TD_MA_MALLOC(TMA, SIZE) (*((TMA)->malloc_))(TMA, (SIZE))
|
||||
#define TD_MA_FREE(TMA, PTR) (*((TMA)->free_))(TMA, (PTR))
|
||||
|
||||
typedef struct SMemAllocator {
|
||||
void *impl;
|
||||
TD_MEM_ALCT(SMemAllocator);
|
||||
} SMemAllocator;
|
||||
|
||||
#define tMalloc(pMA, SIZE) TD_MA_MALLOC(PMA, SIZE)
|
||||
#define tFree(pMA, PTR) TD_MA_FREE(PMA, PTR)
|
||||
|
||||
typedef struct SMemAllocatorFactory {
|
||||
void *impl;
|
||||
SMemAllocator *(*create)(struct SMemAllocatorFactory *);
|
||||
void (*destroy)(struct SMemAllocatorFactory *, SMemAllocator *);
|
||||
} SMemAllocatorFactory;
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif /*_TD_UTIL_MALLOCATOR_H_*/
|
|
@ -28,7 +28,6 @@
|
|||
|
||||
#include "tcommon.h"
|
||||
#include "tfs.h"
|
||||
#include "tmallocator.h"
|
||||
#include "tmsg.h"
|
||||
#include "trow.h"
|
||||
|
||||
|
|
|
@ -148,9 +148,6 @@ int32_t tCmprDelIdx(void const *lhs, void const *rhs);
|
|||
// SDelData
|
||||
int32_t tPutDelData(uint8_t *p, void *ph);
|
||||
int32_t tGetDelData(uint8_t *p, void *ph);
|
||||
// memory
|
||||
int32_t tsdbRealloc(uint8_t **ppBuf, int64_t size);
|
||||
void tsdbFree(uint8_t *pBuf);
|
||||
// SMapData
|
||||
#define tMapDataInit() ((SMapData){0})
|
||||
void tMapDataReset(SMapData *pMapData);
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
#include "filter.h"
|
||||
#include "qworker.h"
|
||||
#include "sync.h"
|
||||
#include "tRealloc.h"
|
||||
#include "tchecksum.h"
|
||||
#include "tcoding.h"
|
||||
#include "tcompare.h"
|
||||
|
@ -34,7 +35,6 @@
|
|||
#include "tlockfree.h"
|
||||
#include "tlosertree.h"
|
||||
#include "tlrucache.h"
|
||||
#include "tmallocator.h"
|
||||
#include "tmsgcb.h"
|
||||
#include "tref.h"
|
||||
#include "tskiplist.h"
|
||||
|
|
|
@ -99,7 +99,7 @@ int32_t tsdbWriteDelData(SDelFWriter *pWriter, SArray *aDelData, uint8_t **ppBuf
|
|||
size += sizeof(TSCKSUM);
|
||||
|
||||
// alloc
|
||||
code = tsdbRealloc(ppBuf, size);
|
||||
code = tRealloc(ppBuf, size);
|
||||
if (code) goto _err;
|
||||
|
||||
// build
|
||||
|
@ -127,12 +127,12 @@ int32_t tsdbWriteDelData(SDelFWriter *pWriter, SArray *aDelData, uint8_t **ppBuf
|
|||
pDelIdx->size = size;
|
||||
pWriter->fDel.size += size;
|
||||
|
||||
tsdbFree(pBuf);
|
||||
tFree(pBuf);
|
||||
return code;
|
||||
|
||||
_err:
|
||||
tsdbError("vgId:%d failed to write del data since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
|
||||
tsdbFree(pBuf);
|
||||
tFree(pBuf);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -154,7 +154,7 @@ int32_t tsdbWriteDelIdx(SDelFWriter *pWriter, SArray *aDelIdx, uint8_t **ppBuf)
|
|||
size += sizeof(TSCKSUM);
|
||||
|
||||
// alloc
|
||||
code = tsdbRealloc(ppBuf, size);
|
||||
code = tRealloc(ppBuf, size);
|
||||
if (code) goto _err;
|
||||
|
||||
// build
|
||||
|
@ -178,12 +178,12 @@ int32_t tsdbWriteDelIdx(SDelFWriter *pWriter, SArray *aDelIdx, uint8_t **ppBuf)
|
|||
pWriter->fDel.offset = pWriter->fDel.size;
|
||||
pWriter->fDel.size += size;
|
||||
|
||||
tsdbFree(pBuf);
|
||||
tFree(pBuf);
|
||||
return code;
|
||||
|
||||
_err:
|
||||
tsdbError("vgId:%d write del idx failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
|
||||
tsdbFree(pBuf);
|
||||
tFree(pBuf);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -253,7 +253,7 @@ int32_t tsdbDelFReaderOpen(SDelFReader **ppReader, SDelFile *pFile, STsdb *pTsdb
|
|||
#if 0
|
||||
// load and check hdr if buffer is given
|
||||
if (ppBuf) {
|
||||
code = tsdbRealloc(ppBuf, TSDB_FHDR_SIZE);
|
||||
code = tRealloc(ppBuf, TSDB_FHDR_SIZE);
|
||||
if (code) {
|
||||
goto _err;
|
||||
}
|
||||
|
@ -319,7 +319,7 @@ int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData
|
|||
}
|
||||
|
||||
// alloc
|
||||
code = tsdbRealloc(ppBuf, size);
|
||||
code = tRealloc(ppBuf, size);
|
||||
if (code) goto _err;
|
||||
|
||||
// read
|
||||
|
@ -351,12 +351,12 @@ int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData
|
|||
|
||||
ASSERT(n == size - sizeof(TSCKSUM));
|
||||
|
||||
tsdbFree(pBuf);
|
||||
tFree(pBuf);
|
||||
return code;
|
||||
|
||||
_err:
|
||||
tsdbError("vgId:%d read del data failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
|
||||
tsdbFree(pBuf);
|
||||
tFree(pBuf);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -378,7 +378,7 @@ int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx, uint8_t **ppBuf) {
|
|||
}
|
||||
|
||||
// alloc
|
||||
code = tsdbRealloc(ppBuf, size);
|
||||
code = tRealloc(ppBuf, size);
|
||||
if (code) goto _err;
|
||||
|
||||
// read
|
||||
|
@ -534,7 +534,7 @@ int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx, uint8_t **ppB
|
|||
if (!ppBuf) ppBuf = &pBuf;
|
||||
|
||||
// alloc
|
||||
code = tsdbRealloc(ppBuf, size);
|
||||
code = tRealloc(ppBuf, size);
|
||||
if (code) goto _err;
|
||||
|
||||
// seek
|
||||
|
@ -576,12 +576,12 @@ int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx, uint8_t **ppB
|
|||
|
||||
ASSERT(n + sizeof(TSCKSUM) == size);
|
||||
|
||||
tsdbFree(pBuf);
|
||||
tFree(pBuf);
|
||||
return code;
|
||||
|
||||
_err:
|
||||
tsdbError("vgId:%d read block idx failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
|
||||
tsdbFree(pBuf);
|
||||
tFree(pBuf);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -597,7 +597,7 @@ int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *mBl
|
|||
if (!ppBuf) ppBuf = &pBuf;
|
||||
|
||||
// alloc
|
||||
code = tsdbRealloc(ppBuf, size);
|
||||
code = tRealloc(ppBuf, size);
|
||||
if (code) goto _err;
|
||||
|
||||
// seek
|
||||
|
@ -637,12 +637,12 @@ int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *mBl
|
|||
n += tn;
|
||||
ASSERT(n + sizeof(TSCKSUM) == size);
|
||||
|
||||
tsdbFree(pBuf);
|
||||
tFree(pBuf);
|
||||
return code;
|
||||
|
||||
_err:
|
||||
tsdbError("vgId:%d read block failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
|
||||
tsdbFree(pBuf);
|
||||
tFree(pBuf);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -656,9 +656,9 @@ static int32_t tsdbReadBlockDataKey(SBlockData *pBlockData, SSubBlock *pSubBlock
|
|||
goto _err;
|
||||
}
|
||||
|
||||
code = tsdbRealloc((uint8_t **)&pBlockData->aVersion, sizeof(int64_t) * pSubBlock->nRow);
|
||||
code = tRealloc((uint8_t **)&pBlockData->aVersion, sizeof(int64_t) * pSubBlock->nRow);
|
||||
if (code) goto _err;
|
||||
code = tsdbRealloc((uint8_t **)&pBlockData->aTSKEY, sizeof(TSKEY) * pSubBlock->nRow);
|
||||
code = tRealloc((uint8_t **)&pBlockData->aTSKEY, sizeof(TSKEY) * pSubBlock->nRow);
|
||||
if (code) goto _err;
|
||||
|
||||
if (pSubBlock->cmprAlg == NO_COMPRESSION) {
|
||||
|
@ -673,7 +673,7 @@ static int32_t tsdbReadBlockDataKey(SBlockData *pBlockData, SSubBlock *pSubBlock
|
|||
} else {
|
||||
size = sizeof(int64_t) * pSubBlock->nRow + COMP_OVERFLOW_BYTES;
|
||||
if (pSubBlock->cmprAlg == TWO_STAGE_COMP) {
|
||||
code = tsdbRealloc(ppBuf, size);
|
||||
code = tRealloc(ppBuf, size);
|
||||
if (code) goto _err;
|
||||
}
|
||||
|
||||
|
@ -720,10 +720,10 @@ static int32_t tsdbReadColDataImpl(SSubBlock *pSubBlock, SBlockCol *pBlockCol, S
|
|||
ASSERT(pBlockCol->szBitmap);
|
||||
|
||||
size = BIT2_SIZE(pColData->nVal);
|
||||
code = tsdbRealloc(&pColData->pBitMap, size);
|
||||
code = tRealloc(&pColData->pBitMap, size);
|
||||
if (code) goto _err;
|
||||
|
||||
code = tsdbRealloc(ppBuf, size + COMP_OVERFLOW_BYTES);
|
||||
code = tRealloc(ppBuf, size + COMP_OVERFLOW_BYTES);
|
||||
if (code) goto _err;
|
||||
|
||||
n = tsDecompressTinyint(pBuf, pBlockCol->szBitmap, size, pColData->pBitMap, size, TWO_STAGE_COMP, *ppBuf,
|
||||
|
@ -744,10 +744,10 @@ static int32_t tsdbReadColDataImpl(SSubBlock *pSubBlock, SBlockCol *pBlockCol, S
|
|||
ASSERT(pBlockCol->szOffset);
|
||||
|
||||
size = sizeof(int32_t) * pColData->nVal;
|
||||
code = tsdbRealloc((uint8_t **)&pColData->aOffset, size);
|
||||
code = tRealloc((uint8_t **)&pColData->aOffset, size);
|
||||
if (code) goto _err;
|
||||
|
||||
code = tsdbRealloc(ppBuf, size + COMP_OVERFLOW_BYTES);
|
||||
code = tRealloc(ppBuf, size + COMP_OVERFLOW_BYTES);
|
||||
if (code) goto _err;
|
||||
|
||||
n = tsDecompressInt(pBuf, pBlockCol->szOffset, pColData->nVal, (char *)pColData->aOffset, size, TWO_STAGE_COMP,
|
||||
|
@ -766,14 +766,14 @@ static int32_t tsdbReadColDataImpl(SSubBlock *pSubBlock, SBlockCol *pBlockCol, S
|
|||
// VALUE
|
||||
pColData->nData = pBlockCol->szOrigin;
|
||||
|
||||
code = tsdbRealloc(&pColData->pData, pColData->nData);
|
||||
code = tRealloc(&pColData->pData, pColData->nData);
|
||||
if (code) goto _err;
|
||||
|
||||
if (pSubBlock->cmprAlg == NO_COMPRESSION) {
|
||||
memcpy(pColData->pData, pBuf, pColData->nData);
|
||||
} else {
|
||||
if (pSubBlock->cmprAlg == TWO_STAGE_COMP) {
|
||||
code = tsdbRealloc(ppBuf, pColData->nData + COMP_OVERFLOW_BYTES);
|
||||
code = tRealloc(ppBuf, pColData->nData + COMP_OVERFLOW_BYTES);
|
||||
if (code) goto _err;
|
||||
}
|
||||
|
||||
|
@ -846,7 +846,7 @@ static int32_t tsdbReadSubColData(SDataFReader *pReader, SBlockIdx *pBlockIdx, S
|
|||
size = pSubBlock->szBlockCol + sizeof(TSCKSUM) + pSubBlock->szVersion + pSubBlock->szTSKEY + sizeof(TSCKSUM);
|
||||
}
|
||||
|
||||
code = tsdbRealloc(ppBuf1, size);
|
||||
code = tRealloc(ppBuf1, size);
|
||||
if (code) goto _err;
|
||||
|
||||
n = taosLSeekFile(pFD, offset, SEEK_SET);
|
||||
|
@ -906,7 +906,7 @@ static int32_t tsdbReadSubColData(SDataFReader *pReader, SBlockIdx *pBlockIdx, S
|
|||
pSubBlock->szTSKEY + sizeof(TSCKSUM) + pBlockCol->offset;
|
||||
size = pBlockCol->szBitmap + pBlockCol->szOffset + pBlockCol->szValue + sizeof(TSCKSUM);
|
||||
|
||||
code = tsdbRealloc(ppBuf1, size);
|
||||
code = tRealloc(ppBuf1, size);
|
||||
if (code) goto _err;
|
||||
|
||||
// seek
|
||||
|
@ -982,14 +982,14 @@ int32_t tsdbReadColData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBl
|
|||
tBlockDataClear(pBlockData2);
|
||||
}
|
||||
|
||||
tsdbFree(pBuf1);
|
||||
tsdbFree(pBuf2);
|
||||
tFree(pBuf1);
|
||||
tFree(pBuf2);
|
||||
return code;
|
||||
|
||||
_err:
|
||||
tsdbError("vgId:%d tsdb read col data failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
|
||||
tsdbFree(pBuf1);
|
||||
tsdbFree(pBuf2);
|
||||
tFree(pBuf1);
|
||||
tFree(pBuf2);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -1006,7 +1006,7 @@ static int32_t tsdbReadSubBlockData(SDataFReader *pReader, SBlockIdx *pBlockIdx,
|
|||
tBlockDataReset(pBlockData);
|
||||
|
||||
// realloc
|
||||
code = tsdbRealloc(ppBuf1, pSubBlock->szBlock);
|
||||
code = tRealloc(ppBuf1, pSubBlock->szBlock);
|
||||
if (code) goto _err;
|
||||
|
||||
// seek
|
||||
|
@ -1128,14 +1128,14 @@ int32_t tsdbReadBlockData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *p
|
|||
ASSERT(tsdbKeyCmprFn(&pBlock->minKey, &TSDBROW_KEY(&tBlockDataFirstRow(pBlockData))) == 0);
|
||||
ASSERT(tsdbKeyCmprFn(&pBlock->maxKey, &TSDBROW_KEY(&tBlockDataLastRow(pBlockData))) == 0);
|
||||
|
||||
if (pBuf1) tsdbFree(pBuf1);
|
||||
if (pBuf2) tsdbFree(pBuf2);
|
||||
if (pBuf1) tFree(pBuf1);
|
||||
if (pBuf2) tFree(pBuf2);
|
||||
return code;
|
||||
|
||||
_err:
|
||||
tsdbError("vgId:%d tsdb read block data failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
|
||||
if (pBuf1) tsdbFree(pBuf1);
|
||||
if (pBuf2) tsdbFree(pBuf2);
|
||||
if (pBuf1) tFree(pBuf1);
|
||||
if (pBuf2) tFree(pBuf2);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -1150,7 +1150,7 @@ int32_t tsdbReadBlockSma(SDataFReader *pReader, SBlock *pBlock, SArray *aColumnD
|
|||
ASSERT(tBlockHasSma(pBlock));
|
||||
|
||||
if (!ppBuf) ppBuf = &pBuf;
|
||||
code = tsdbRealloc(ppBuf, size);
|
||||
code = tRealloc(ppBuf, size);
|
||||
if (code) goto _err;
|
||||
|
||||
// lseek
|
||||
|
@ -1182,12 +1182,12 @@ int32_t tsdbReadBlockSma(SDataFReader *pReader, SBlock *pBlock, SArray *aColumnD
|
|||
}
|
||||
}
|
||||
|
||||
tsdbFree(pBuf);
|
||||
tFree(pBuf);
|
||||
return code;
|
||||
|
||||
_err:
|
||||
tsdbError("vgId:%d read block sma failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
|
||||
tsdbFree(pBuf);
|
||||
tFree(pBuf);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -1440,7 +1440,7 @@ int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SArray *aBlockIdx, uint8_t **pp
|
|||
size += sizeof(TSCKSUM);
|
||||
|
||||
// alloc
|
||||
code = tsdbRealloc(ppBuf, size);
|
||||
code = tRealloc(ppBuf, size);
|
||||
if (code) goto _err;
|
||||
|
||||
// build
|
||||
|
@ -1464,12 +1464,12 @@ int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SArray *aBlockIdx, uint8_t **pp
|
|||
pHeadFile->offset = pHeadFile->size;
|
||||
pHeadFile->size += size;
|
||||
|
||||
tsdbFree(pBuf);
|
||||
tFree(pBuf);
|
||||
return code;
|
||||
|
||||
_err:
|
||||
tsdbError("vgId:%d write block idx failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
|
||||
tsdbFree(pBuf);
|
||||
tFree(pBuf);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -1488,7 +1488,7 @@ int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *mBlock, uint8_t **ppBuf,
|
|||
|
||||
// alloc
|
||||
if (!ppBuf) ppBuf = &pBuf;
|
||||
code = tsdbRealloc(ppBuf, size);
|
||||
code = tRealloc(ppBuf, size);
|
||||
if (code) goto _err;
|
||||
|
||||
// build
|
||||
|
@ -1512,13 +1512,13 @@ int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *mBlock, uint8_t **ppBuf,
|
|||
pBlockIdx->size = size;
|
||||
pHeadFile->size += size;
|
||||
|
||||
tsdbFree(pBuf);
|
||||
tFree(pBuf);
|
||||
tsdbTrace("vgId:%d write block, offset:%" PRId64 " size:%" PRId64, TD_VID(pWriter->pTsdb->pVnode), pBlockIdx->offset,
|
||||
pBlockIdx->size);
|
||||
return code;
|
||||
|
||||
_err:
|
||||
tsdbFree(pBuf);
|
||||
tFree(pBuf);
|
||||
tsdbError("vgId:%d write block failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
@ -1557,7 +1557,7 @@ static int32_t tsdbWriteBlockDataKey(SSubBlock *pSubBlock, SBlockData *pBlockDat
|
|||
pSubBlock->szVersion = sizeof(int64_t) * pSubBlock->nRow;
|
||||
pSubBlock->szTSKEY = sizeof(TSKEY) * pSubBlock->nRow;
|
||||
|
||||
code = tsdbRealloc(ppBuf1, *nDataP + pSubBlock->szVersion + pSubBlock->szTSKEY + sizeof(TSCKSUM));
|
||||
code = tRealloc(ppBuf1, *nDataP + pSubBlock->szVersion + pSubBlock->szTSKEY + sizeof(TSCKSUM));
|
||||
if (code) goto _err;
|
||||
|
||||
// VERSION
|
||||
|
@ -1568,12 +1568,12 @@ static int32_t tsdbWriteBlockDataKey(SSubBlock *pSubBlock, SBlockData *pBlockDat
|
|||
} else {
|
||||
size = (sizeof(int64_t) + sizeof(TSKEY)) * pSubBlock->nRow + COMP_OVERFLOW_BYTES * 2;
|
||||
|
||||
code = tsdbRealloc(ppBuf1, *nDataP + size + sizeof(TSCKSUM));
|
||||
code = tRealloc(ppBuf1, *nDataP + size + sizeof(TSCKSUM));
|
||||
if (code) goto _err;
|
||||
|
||||
tsize = sizeof(int64_t) * pSubBlock->nRow + COMP_OVERFLOW_BYTES;
|
||||
if (pSubBlock->cmprAlg == TWO_STAGE_COMP) {
|
||||
code = tsdbRealloc(ppBuf2, tsize);
|
||||
code = tRealloc(ppBuf2, tsize);
|
||||
if (code) goto _err;
|
||||
}
|
||||
|
||||
|
@ -1619,10 +1619,10 @@ static int32_t tsdbWriteColData(SColData *pColData, SBlockCol *pBlockCol, SSubBl
|
|||
if (pColData->flag != HAS_VALUE) {
|
||||
size = BIT2_SIZE(pColData->nVal) + COMP_OVERFLOW_BYTES;
|
||||
|
||||
code = tsdbRealloc(ppBuf1, *nDataP + n + size);
|
||||
code = tRealloc(ppBuf1, *nDataP + n + size);
|
||||
if (code) goto _err;
|
||||
|
||||
code = tsdbRealloc(ppBuf2, size);
|
||||
code = tRealloc(ppBuf2, size);
|
||||
if (code) goto _err;
|
||||
|
||||
pBlockCol->szBitmap =
|
||||
|
@ -1641,10 +1641,10 @@ static int32_t tsdbWriteColData(SColData *pColData, SBlockCol *pBlockCol, SSubBl
|
|||
if (IS_VAR_DATA_TYPE(pColData->type)) {
|
||||
size = sizeof(int32_t) * pColData->nVal + COMP_OVERFLOW_BYTES;
|
||||
|
||||
code = tsdbRealloc(ppBuf1, *nDataP + n + size);
|
||||
code = tRealloc(ppBuf1, *nDataP + n + size);
|
||||
if (code) goto _err;
|
||||
|
||||
code = tsdbRealloc(ppBuf2, size);
|
||||
code = tRealloc(ppBuf2, size);
|
||||
if (code) goto _err;
|
||||
|
||||
pBlockCol->szOffset = tsCompressInt((char *)pColData->aOffset, sizeof(int32_t) * pColData->nVal, pColData->nVal,
|
||||
|
@ -1662,18 +1662,18 @@ static int32_t tsdbWriteColData(SColData *pColData, SBlockCol *pBlockCol, SSubBl
|
|||
if (pSubBlock->cmprAlg == NO_COMPRESSION) {
|
||||
pBlockCol->szValue = pColData->nData;
|
||||
|
||||
code = tsdbRealloc(ppBuf1, *nDataP + n + pBlockCol->szValue + sizeof(TSCKSUM));
|
||||
code = tRealloc(ppBuf1, *nDataP + n + pBlockCol->szValue + sizeof(TSCKSUM));
|
||||
if (code) goto _err;
|
||||
|
||||
memcpy(*ppBuf1 + *nDataP + n, pColData->pData, pBlockCol->szValue);
|
||||
} else {
|
||||
size = pColData->nData + COMP_OVERFLOW_BYTES;
|
||||
|
||||
code = tsdbRealloc(ppBuf1, *nDataP + n + size + sizeof(TSCKSUM));
|
||||
code = tRealloc(ppBuf1, *nDataP + n + size + sizeof(TSCKSUM));
|
||||
if (code) goto _err;
|
||||
|
||||
if (pSubBlock->cmprAlg == TWO_STAGE_COMP) {
|
||||
code = tsdbRealloc(ppBuf2, size);
|
||||
code = tRealloc(ppBuf2, size);
|
||||
if (code) goto _err;
|
||||
}
|
||||
|
||||
|
@ -1713,7 +1713,7 @@ static int32_t tsdbWriteBlockDataImpl(TdFilePtr pFD, SSubBlock *pSubBlock, SBloc
|
|||
pSubBlock->szBlockCol += tPutBlockCol(NULL, taosArrayGet(aBlockCol, iBlockCol));
|
||||
}
|
||||
|
||||
code = tsdbRealloc(ppBuf, pSubBlock->szBlockCol + sizeof(TSCKSUM));
|
||||
code = tRealloc(ppBuf, pSubBlock->szBlockCol + sizeof(TSCKSUM));
|
||||
if (code) goto _err;
|
||||
|
||||
n = 0;
|
||||
|
@ -1762,7 +1762,7 @@ static int32_t tsdbWriteBlockSma(TdFilePtr pFD, SBlockData *pBlockData, SSubBloc
|
|||
if (pSubBlock->nSma == 0) goto _exit;
|
||||
|
||||
// calc
|
||||
code = tsdbRealloc(ppBuf, sizeof(SColumnDataAgg) * pSubBlock->nSma + sizeof(TSCKSUM));
|
||||
code = tRealloc(ppBuf, sizeof(SColumnDataAgg) * pSubBlock->nSma + sizeof(TSCKSUM));
|
||||
if (code) goto _err;
|
||||
n = 0;
|
||||
for (int32_t iColData = 0; iColData < taosArrayGetSize(pBlockData->aColDataP); iColData++) {
|
||||
|
@ -1882,15 +1882,15 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_
|
|||
}
|
||||
|
||||
_exit:
|
||||
tsdbFree(pBuf1);
|
||||
tsdbFree(pBuf2);
|
||||
tFree(pBuf1);
|
||||
tFree(pBuf2);
|
||||
taosArrayDestroy(aBlockCol);
|
||||
return code;
|
||||
|
||||
_err:
|
||||
tsdbError("vgId:%d write block data failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
|
||||
tsdbFree(pBuf1);
|
||||
tsdbFree(pBuf2);
|
||||
tFree(pBuf1);
|
||||
tFree(pBuf2);
|
||||
taosArrayDestroy(aBlockCol);
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -22,8 +22,8 @@ void tMapDataReset(SMapData *pMapData) {
|
|||
}
|
||||
|
||||
void tMapDataClear(SMapData *pMapData) {
|
||||
tsdbFree((uint8_t *)pMapData->aOffset);
|
||||
tsdbFree(pMapData->pData);
|
||||
tFree((uint8_t *)pMapData->aOffset);
|
||||
tFree(pMapData->pData);
|
||||
}
|
||||
|
||||
int32_t tMapDataPutItem(SMapData *pMapData, void *pItem, int32_t (*tPutItemFn)(uint8_t *, void *)) {
|
||||
|
@ -35,9 +35,9 @@ int32_t tMapDataPutItem(SMapData *pMapData, void *pItem, int32_t (*tPutItemFn)(u
|
|||
pMapData->nData += tPutItemFn(NULL, pItem);
|
||||
|
||||
// alloc
|
||||
code = tsdbRealloc((uint8_t **)&pMapData->aOffset, sizeof(int32_t) * pMapData->nItem);
|
||||
code = tRealloc((uint8_t **)&pMapData->aOffset, sizeof(int32_t) * pMapData->nItem);
|
||||
if (code) goto _err;
|
||||
code = tsdbRealloc(&pMapData->pData, pMapData->nData);
|
||||
code = tRealloc(&pMapData->pData, pMapData->nData);
|
||||
if (code) goto _err;
|
||||
|
||||
// put
|
||||
|
@ -109,14 +109,14 @@ int32_t tGetMapData(uint8_t *p, SMapData *pMapData) {
|
|||
|
||||
n += tGetI32v(p + n, &pMapData->nItem);
|
||||
if (pMapData->nItem) {
|
||||
if (tsdbRealloc((uint8_t **)&pMapData->aOffset, sizeof(int32_t) * pMapData->nItem)) return -1;
|
||||
if (tRealloc((uint8_t **)&pMapData->aOffset, sizeof(int32_t) * pMapData->nItem)) return -1;
|
||||
|
||||
for (int32_t iItem = 0; iItem < pMapData->nItem; iItem++) {
|
||||
n += tGetI32v(p + n, &pMapData->aOffset[iItem]);
|
||||
}
|
||||
|
||||
n += tGetI32v(p + n, &pMapData->nData);
|
||||
if (tsdbRealloc(&pMapData->pData, pMapData->nData)) return -1;
|
||||
if (tRealloc(&pMapData->pData, pMapData->nData)) return -1;
|
||||
memcpy(pMapData->pData, p + n, pMapData->nData);
|
||||
n += pMapData->nData;
|
||||
}
|
||||
|
@ -124,42 +124,6 @@ int32_t tGetMapData(uint8_t *p, SMapData *pMapData) {
|
|||
return n;
|
||||
}
|
||||
|
||||
// Memory =======================================================================
|
||||
int32_t tsdbRealloc(uint8_t **ppBuf, int64_t size) {
|
||||
int32_t code = 0;
|
||||
int64_t bsize = 0;
|
||||
uint8_t *pBuf;
|
||||
|
||||
if (*ppBuf) {
|
||||
bsize = *(int64_t *)((*ppBuf) - sizeof(int64_t));
|
||||
}
|
||||
|
||||
if (bsize >= size) goto _exit;
|
||||
|
||||
if (bsize == 0) bsize = 64;
|
||||
while (bsize < size) {
|
||||
bsize *= 2;
|
||||
}
|
||||
|
||||
pBuf = taosMemoryRealloc(*ppBuf ? (*ppBuf) - sizeof(int64_t) : *ppBuf, bsize + sizeof(int64_t));
|
||||
if (pBuf == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
*(int64_t *)pBuf = bsize;
|
||||
*ppBuf = pBuf + sizeof(int64_t);
|
||||
|
||||
_exit:
|
||||
return code;
|
||||
}
|
||||
|
||||
void tsdbFree(uint8_t *pBuf) {
|
||||
if (pBuf) {
|
||||
taosMemoryFree(pBuf - sizeof(int64_t));
|
||||
}
|
||||
}
|
||||
|
||||
// TABLEID =======================================================================
|
||||
int32_t tTABLEIDCmprFn(const void *p1, const void *p2) {
|
||||
TABLEID *pId1 = (TABLEID *)p1;
|
||||
|
@ -796,9 +760,9 @@ void tColDataReset(SColData *pColData) {
|
|||
void tColDataClear(void *ph) {
|
||||
SColData *pColData = (SColData *)ph;
|
||||
|
||||
tsdbFree(pColData->pBitMap);
|
||||
tsdbFree((uint8_t *)pColData->aOffset);
|
||||
tsdbFree(pColData->pData);
|
||||
tFree(pColData->pBitMap);
|
||||
tFree((uint8_t *)pColData->aOffset);
|
||||
tFree(pColData->pData);
|
||||
}
|
||||
|
||||
int32_t tColDataAppendValue(SColData *pColData, SColVal *pColVal) {
|
||||
|
@ -812,7 +776,7 @@ int32_t tColDataAppendValue(SColData *pColData, SColVal *pColVal) {
|
|||
|
||||
// realloc bitmap
|
||||
size = BIT2_SIZE(pColData->nVal + 1);
|
||||
code = tsdbRealloc(&pColData->pBitMap, size);
|
||||
code = tRealloc(&pColData->pBitMap, size);
|
||||
if (code) goto _exit;
|
||||
|
||||
// put value
|
||||
|
@ -830,19 +794,19 @@ int32_t tColDataAppendValue(SColData *pColData, SColVal *pColVal) {
|
|||
|
||||
if (IS_VAR_DATA_TYPE(pColData->type)) {
|
||||
// offset
|
||||
code = tsdbRealloc((uint8_t **)&pColData->aOffset, sizeof(int32_t) * (pColData->nVal + 1));
|
||||
code = tRealloc((uint8_t **)&pColData->aOffset, sizeof(int32_t) * (pColData->nVal + 1));
|
||||
if (code) goto _exit;
|
||||
pColData->aOffset[pColData->nVal] = pColData->nData;
|
||||
|
||||
// value
|
||||
if ((!pColVal->isNone) && (!pColVal->isNull)) {
|
||||
code = tsdbRealloc(&pColData->pData, pColData->nData + pColVal->value.nData);
|
||||
code = tRealloc(&pColData->pData, pColData->nData + pColVal->value.nData);
|
||||
if (code) goto _exit;
|
||||
memcpy(pColData->pData + pColData->nData, pColVal->value.pData, pColVal->value.nData);
|
||||
pColData->nData += pColVal->value.nData;
|
||||
}
|
||||
} else {
|
||||
code = tsdbRealloc(&pColData->pData, pColData->nData + tPutValue(NULL, pValue, pColVal->type));
|
||||
code = tRealloc(&pColData->pData, pColData->nData + tPutValue(NULL, pValue, pColVal->type));
|
||||
if (code) goto _exit;
|
||||
pColData->nData += tPutValue(pColData->pData + pColData->nData, pValue, pColVal->type);
|
||||
}
|
||||
|
@ -864,20 +828,20 @@ int32_t tColDataCopy(SColData *pColDataSrc, SColData *pColDataDest) {
|
|||
pColDataDest->flag = pColDataSrc->flag;
|
||||
|
||||
size = BIT2_SIZE(pColDataSrc->nVal);
|
||||
code = tsdbRealloc(&pColDataDest->pBitMap, size);
|
||||
code = tRealloc(&pColDataDest->pBitMap, size);
|
||||
if (code) goto _exit;
|
||||
memcpy(pColDataDest->pBitMap, pColDataSrc->pBitMap, size);
|
||||
|
||||
if (IS_VAR_DATA_TYPE(pColDataDest->type)) {
|
||||
size = sizeof(int32_t) * pColDataSrc->nVal;
|
||||
|
||||
code = tsdbRealloc((uint8_t **)&pColDataDest->aOffset, size);
|
||||
code = tRealloc((uint8_t **)&pColDataDest->aOffset, size);
|
||||
if (code) goto _exit;
|
||||
|
||||
memcpy(pColDataDest->aOffset, pColDataSrc->aOffset, size);
|
||||
}
|
||||
|
||||
code = tsdbRealloc(&pColDataDest->pData, pColDataSrc->nData);
|
||||
code = tRealloc(&pColDataDest->pData, pColDataSrc->nData);
|
||||
if (code) goto _exit;
|
||||
pColDataDest->nData = pColDataSrc->nData;
|
||||
memcpy(pColDataDest->pData, pColDataSrc->pData, pColDataDest->nData);
|
||||
|
@ -970,8 +934,8 @@ void tBlockDataReset(SBlockData *pBlockData) {
|
|||
}
|
||||
|
||||
void tBlockDataClear(SBlockData *pBlockData) {
|
||||
tsdbFree((uint8_t *)pBlockData->aVersion);
|
||||
tsdbFree((uint8_t *)pBlockData->aTSKEY);
|
||||
tFree((uint8_t *)pBlockData->aVersion);
|
||||
tFree((uint8_t *)pBlockData->aTSKEY);
|
||||
taosArrayDestroy(pBlockData->aColDataP);
|
||||
taosArrayDestroyEx(pBlockData->aColData, tColDataClear);
|
||||
}
|
||||
|
@ -1033,9 +997,9 @@ int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTS
|
|||
int32_t code = 0;
|
||||
|
||||
// TSDBKEY
|
||||
code = tsdbRealloc((uint8_t **)&pBlockData->aVersion, sizeof(int64_t) * (pBlockData->nRow + 1));
|
||||
code = tRealloc((uint8_t **)&pBlockData->aVersion, sizeof(int64_t) * (pBlockData->nRow + 1));
|
||||
if (code) goto _err;
|
||||
code = tsdbRealloc((uint8_t **)&pBlockData->aTSKEY, sizeof(TSKEY) * (pBlockData->nRow + 1));
|
||||
code = tRealloc((uint8_t **)&pBlockData->aTSKEY, sizeof(TSKEY) * (pBlockData->nRow + 1));
|
||||
if (code) goto _err;
|
||||
pBlockData->aVersion[pBlockData->nRow] = TSDBROW_VERSION(pRow);
|
||||
pBlockData->aTSKEY[pBlockData->nRow] = TSDBROW_TS(pRow);
|
||||
|
@ -1196,9 +1160,9 @@ int32_t tBlockDataCopy(SBlockData *pBlockDataSrc, SBlockData *pBlockDataDest) {
|
|||
|
||||
pBlockDataDest->nRow = pBlockDataSrc->nRow;
|
||||
// TSDBKEY
|
||||
code = tsdbRealloc((uint8_t **)&pBlockDataDest->aVersion, sizeof(int64_t) * pBlockDataSrc->nRow);
|
||||
code = tRealloc((uint8_t **)&pBlockDataDest->aVersion, sizeof(int64_t) * pBlockDataSrc->nRow);
|
||||
if (code) goto _exit;
|
||||
code = tsdbRealloc((uint8_t **)&pBlockDataDest->aTSKEY, sizeof(TSKEY) * pBlockDataSrc->nRow);
|
||||
code = tRealloc((uint8_t **)&pBlockDataDest->aTSKEY, sizeof(TSKEY) * pBlockDataSrc->nRow);
|
||||
if (code) goto _exit;
|
||||
memcpy(pBlockDataDest->aVersion, pBlockDataSrc->aVersion, sizeof(int64_t) * pBlockDataSrc->nRow);
|
||||
memcpy(pBlockDataDest->aTSKEY, pBlockDataSrc->aTSKEY, sizeof(TSKEY) * pBlockDataSrc->nRow);
|
||||
|
|
|
@ -1,110 +0,0 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "tmallocator.h"
|
||||
|
||||
/* ------------------------ HEAP ALLOCATOR ------------------------ */
|
||||
#if 0
|
||||
typedef struct {
|
||||
size_t tusage;
|
||||
} SHeapAllocator;
|
||||
|
||||
static void * haMalloc(SMemAllocator *pma, size_t size);
|
||||
static void * haCalloc(SMemAllocator *pma, size_t nmemb, size_t size);
|
||||
static void * haRealloc(SMemAllocator *pma, void *ptr, size_t size);
|
||||
static void haFree(SMemAllocator *pma, void *ptr);
|
||||
static size_t haUsage(SMemAllocator *pma);
|
||||
|
||||
SMemAllocator *tdCreateHeapAllocator() {
|
||||
SMemAllocator *pma = NULL;
|
||||
|
||||
pma = taosMemoryCalloc(1, sizeof(SMemAllocator) + sizeof(SHeapAllocator));
|
||||
if (pma) {
|
||||
pma->impl = POINTER_SHIFT(pma, sizeof(SMemAllocator));
|
||||
pma->malloc = haMalloc;
|
||||
pma->calloc = haCalloc;
|
||||
pma->realloc = haRealloc;
|
||||
pma->free = haFree;
|
||||
pma->usage = haUsage;
|
||||
}
|
||||
|
||||
return pma;
|
||||
}
|
||||
|
||||
void tdDestroyHeapAllocator(SMemAllocator *pMemAllocator) {
|
||||
// TODO
|
||||
}
|
||||
|
||||
static void *haMalloc(SMemAllocator *pma, size_t size) {
|
||||
void * ptr;
|
||||
size_t tsize = size + sizeof(size_t);
|
||||
SHeapAllocator *pha = (SHeapAllocator *)(pma->impl);
|
||||
|
||||
ptr = taosMemoryMalloc(tsize);
|
||||
if (ptr) {
|
||||
*(size_t *)ptr = size;
|
||||
ptr = POINTER_SHIFT(ptr, sizeof(size_t));
|
||||
atomic_fetch_add_64(&(pha->tusage), tsize);
|
||||
}
|
||||
|
||||
return ptr;
|
||||
}
|
||||
|
||||
static void *haCalloc(SMemAllocator *pma, size_t nmemb, size_t size) {
|
||||
void * ptr;
|
||||
size_t tsize = nmemb * size;
|
||||
|
||||
ptr = haMalloc(pma, tsize);
|
||||
if (ptr) {
|
||||
memset(ptr, 0, tsize);
|
||||
}
|
||||
|
||||
return ptr;
|
||||
}
|
||||
|
||||
static void *haRealloc(SMemAllocator *pma, void *ptr, size_t size) {
|
||||
size_t psize;
|
||||
size_t tsize = size + sizeof(size_t);
|
||||
|
||||
if (ptr == NULL) {
|
||||
psize = 0;
|
||||
} else {
|
||||
psize = *(size_t *)POINTER_SHIFT(ptr, -sizeof(size_t));
|
||||
}
|
||||
|
||||
if (psize < size) {
|
||||
// TODO
|
||||
} else {
|
||||
return ptr;
|
||||
}
|
||||
}
|
||||
|
||||
static void haFree(SMemAllocator *pma, void *ptr) { /* TODO */
|
||||
SHeapAllocator *pha = (SHeapAllocator *)(pma->impl);
|
||||
if (ptr) {
|
||||
size_t tsize = *(size_t *)POINTER_SHIFT(ptr, -sizeof(size_t)) + sizeof(size_t);
|
||||
atomic_fetch_sub_64(&(pha->tusage), tsize);
|
||||
taosMemoryFree(POINTER_SHIFT(ptr, -sizeof(size_t)));
|
||||
}
|
||||
}
|
||||
|
||||
static size_t haUsage(SMemAllocator *pma) { return ((SHeapAllocator *)(pma->impl))->tusage; }
|
||||
|
||||
/* ------------------------ ARENA ALLOCATOR ------------------------ */
|
||||
typedef struct {
|
||||
size_t usage;
|
||||
} SArenaAllocator;
|
||||
#endif
|
Loading…
Reference in New Issue