refactor datacols
This commit is contained in:
parent
e11ae1f46e
commit
fcd3b44533
|
@ -330,9 +330,9 @@ int tdAllocMemForCol(SDataCol *pCol, int maxPoints);
|
||||||
void dataColInit(SDataCol *pDataCol, STColumn *pCol, int maxPoints);
|
void dataColInit(SDataCol *pDataCol, STColumn *pCol, int maxPoints);
|
||||||
int dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxPoints);
|
int dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxPoints);
|
||||||
void dataColSetOffset(SDataCol *pCol, int nEle);
|
void dataColSetOffset(SDataCol *pCol, int nEle);
|
||||||
|
void dataColSetNEleNull(SDataCol *pCol, int nEle);
|
||||||
|
|
||||||
bool isNEleNull(SDataCol *pCol, int nEle);
|
bool isNEleNull(SDataCol *pCol, int nEle);
|
||||||
int dataColSetNEleNull(SDataCol *pCol, int nEle, int maxPoints);
|
|
||||||
|
|
||||||
// Get the data pointer from a column-wised data
|
// Get the data pointer from a column-wised data
|
||||||
static FORCE_INLINE const void *tdGetColDataOfRow(SDataCol *pCol, int row) {
|
static FORCE_INLINE const void *tdGetColDataOfRow(SDataCol *pCol, int row) {
|
||||||
|
@ -357,13 +357,11 @@ static FORCE_INLINE int32_t dataColGetNEleLen(SDataCol *pDataCol, int rows) {
|
||||||
}
|
}
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int maxRowSize;
|
int maxCols; // max number of columns
|
||||||
int maxCols; // max number of columns
|
int maxPoints; // max number of points
|
||||||
int maxPoints; // max number of points
|
int numOfRows;
|
||||||
|
int numOfCols; // Total number of cols
|
||||||
int numOfRows;
|
int sversion; // TODO: set sversion
|
||||||
int numOfCols; // Total number of cols
|
|
||||||
int sversion; // TODO: set sversion
|
|
||||||
SDataCol *cols;
|
SDataCol *cols;
|
||||||
} SDataCols;
|
} SDataCols;
|
||||||
|
|
||||||
|
@ -407,7 +405,7 @@ static FORCE_INLINE TSKEY dataColsKeyLast(SDataCols *pCols) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows);
|
SDataCols *tdNewDataCols(int maxCols, int maxRows);
|
||||||
void tdResetDataCols(SDataCols *pCols);
|
void tdResetDataCols(SDataCols *pCols);
|
||||||
int tdInitDataCols(SDataCols *pCols, STSchema *pSchema);
|
int tdInitDataCols(SDataCols *pCols, STSchema *pSchema);
|
||||||
SDataCols *tdDupDataCols(SDataCols *pCols, bool keepData);
|
SDataCols *tdDupDataCols(SDataCols *pCols, bool keepData);
|
||||||
|
|
|
@ -247,13 +247,10 @@ int dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxPo
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if(tdAllocMemForCol(pCol, maxPoints) < 0) return -1;
|
||||||
if (numOfRows > 0) {
|
if (numOfRows > 0) {
|
||||||
// Find the first not null value, fill all previouse values as NULL
|
// Find the first not null value, fill all previouse values as NULL
|
||||||
if(dataColSetNEleNull(pCol, numOfRows, maxPoints) < 0)
|
dataColSetNEleNull(pCol, numOfRows);
|
||||||
return -1;
|
|
||||||
} else {
|
|
||||||
if(tdAllocMemForCol(pCol, maxPoints) < 0)
|
|
||||||
return -1;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -269,13 +266,21 @@ int dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxPo
|
||||||
memcpy(POINTER_SHIFT(pCol->pData, pCol->len), value, pCol->bytes);
|
memcpy(POINTER_SHIFT(pCol->pData, pCol->len), value, pCol->bytes);
|
||||||
pCol->len += pCol->bytes;
|
pCol->len += pCol->bytes;
|
||||||
}
|
}
|
||||||
return -1;
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static FORCE_INLINE const void *tdGetColDataOfRowUnsafe(SDataCol *pCol, int row) {
|
||||||
|
if (IS_VAR_DATA_TYPE(pCol->type)) {
|
||||||
|
return POINTER_SHIFT(pCol->pData, pCol->dataOff[row]);
|
||||||
|
} else {
|
||||||
|
return POINTER_SHIFT(pCol->pData, TYPE_BYTES[pCol->type] * row);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
bool isNEleNull(SDataCol *pCol, int nEle) {
|
bool isNEleNull(SDataCol *pCol, int nEle) {
|
||||||
if(isAllRowsNull(pCol)) return true;
|
if(isAllRowsNull(pCol)) return true;
|
||||||
for (int i = 0; i < nEle; i++) {
|
for (int i = 0; i < nEle; i++) {
|
||||||
if (!isNull(tdGetColDataOfRow(pCol, i), pCol->type)) return false;
|
if (!isNull(tdGetColDataOfRowUnsafe(pCol, i), pCol->type)) return false;
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -292,11 +297,7 @@ static FORCE_INLINE void dataColSetNullAt(SDataCol *pCol, int index) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int dataColSetNEleNull(SDataCol *pCol, int nEle, int maxPoints) {
|
void dataColSetNEleNull(SDataCol *pCol, int nEle) {
|
||||||
if(tdAllocMemForCol(pCol, maxPoints)){
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (IS_VAR_DATA_TYPE(pCol->type)) {
|
if (IS_VAR_DATA_TYPE(pCol->type)) {
|
||||||
pCol->len = 0;
|
pCol->len = 0;
|
||||||
for (int i = 0; i < nEle; i++) {
|
for (int i = 0; i < nEle; i++) {
|
||||||
|
@ -306,7 +307,6 @@ int dataColSetNEleNull(SDataCol *pCol, int nEle, int maxPoints) {
|
||||||
setNullN(pCol->pData, pCol->type, pCol->bytes, nEle);
|
setNullN(pCol->pData, pCol->type, pCol->bytes, nEle);
|
||||||
pCol->len = TYPE_BYTES[pCol->type] * nEle;
|
pCol->len = TYPE_BYTES[pCol->type] * nEle;
|
||||||
}
|
}
|
||||||
return 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void dataColSetOffset(SDataCol *pCol, int nEle) {
|
void dataColSetOffset(SDataCol *pCol, int nEle) {
|
||||||
|
@ -323,7 +323,7 @@ void dataColSetOffset(SDataCol *pCol, int nEle) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows) {
|
SDataCols *tdNewDataCols(int maxCols, int maxRows) {
|
||||||
SDataCols *pCols = (SDataCols *)calloc(1, sizeof(SDataCols));
|
SDataCols *pCols = (SDataCols *)calloc(1, sizeof(SDataCols));
|
||||||
if (pCols == NULL) {
|
if (pCols == NULL) {
|
||||||
uDebug("malloc failure, size:%" PRId64 " failed, reason:%s", (int64_t)sizeof(SDataCols), strerror(errno));
|
uDebug("malloc failure, size:%" PRId64 " failed, reason:%s", (int64_t)sizeof(SDataCols), strerror(errno));
|
||||||
|
@ -331,6 +331,9 @@ SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows) {
|
||||||
}
|
}
|
||||||
|
|
||||||
pCols->maxPoints = maxRows;
|
pCols->maxPoints = maxRows;
|
||||||
|
pCols->maxCols = maxCols;
|
||||||
|
pCols->numOfRows = 0;
|
||||||
|
pCols->numOfCols = 0;
|
||||||
|
|
||||||
if (maxCols > 0) {
|
if (maxCols > 0) {
|
||||||
pCols->cols = (SDataCol *)calloc(maxCols, sizeof(SDataCol));
|
pCols->cols = (SDataCol *)calloc(maxCols, sizeof(SDataCol));
|
||||||
|
@ -347,13 +350,8 @@ SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows) {
|
||||||
pCols->cols[i].pData = NULL;
|
pCols->cols[i].pData = NULL;
|
||||||
pCols->cols[i].dataOff = NULL;
|
pCols->cols[i].dataOff = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
pCols->maxCols = maxCols;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pCols->maxRowSize = maxRowSize;
|
|
||||||
|
|
||||||
|
|
||||||
return pCols;
|
return pCols;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -372,10 +370,6 @@ int tdInitDataCols(SDataCols *pCols, STSchema *pSchema) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (schemaTLen(pSchema) > pCols->maxRowSize) {
|
|
||||||
pCols->maxRowSize = schemaTLen(pSchema);
|
|
||||||
}
|
|
||||||
|
|
||||||
tdResetDataCols(pCols);
|
tdResetDataCols(pCols);
|
||||||
pCols->numOfCols = schemaNCols(pSchema);
|
pCols->numOfCols = schemaNCols(pSchema);
|
||||||
|
|
||||||
|
@ -404,7 +398,7 @@ SDataCols *tdFreeDataCols(SDataCols *pCols) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SDataCols *tdDupDataCols(SDataCols *pDataCols, bool keepData) {
|
SDataCols *tdDupDataCols(SDataCols *pDataCols, bool keepData) {
|
||||||
SDataCols *pRet = tdNewDataCols(pDataCols->maxRowSize, pDataCols->maxCols, pDataCols->maxPoints);
|
SDataCols *pRet = tdNewDataCols(pDataCols->maxCols, pDataCols->maxPoints);
|
||||||
if (pRet == NULL) return NULL;
|
if (pRet == NULL) return NULL;
|
||||||
|
|
||||||
pRet->numOfCols = pDataCols->numOfCols;
|
pRet->numOfCols = pDataCols->numOfCols;
|
||||||
|
@ -593,7 +587,7 @@ static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, i
|
||||||
if ((key1 > key2) || (key1 == key2 && !TKEY_IS_DELETED(tkey2))) {
|
if ((key1 > key2) || (key1 == key2 && !TKEY_IS_DELETED(tkey2))) {
|
||||||
for (int i = 0; i < src2->numOfCols; i++) {
|
for (int i = 0; i < src2->numOfCols; i++) {
|
||||||
ASSERT(target->cols[i].type == src2->cols[i].type);
|
ASSERT(target->cols[i].type == src2->cols[i].type);
|
||||||
if (src2->cols[i].len > 0 && (forceSetNull || (!forceSetNull && !isNull(src2->cols[i].pData, src2->cols[i].type)))) {
|
if (src2->cols[i].len > 0 && (forceSetNull || (!isNull(src2->cols[i].pData, src2->cols[i].type)))) {
|
||||||
dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src2->cols + i, *iter2), target->numOfRows,
|
dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src2->cols + i, *iter2), target->numOfRows,
|
||||||
target->maxPoints);
|
target->maxPoints);
|
||||||
}
|
}
|
||||||
|
|
|
@ -722,7 +722,7 @@ static int tsdbInitCommitH(SCommitH *pCommith, STsdbRepo *pRepo) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
pCommith->pDataCols = tdNewDataCols(0, 0, pCfg->maxRowsPerFileBlock);
|
pCommith->pDataCols = tdNewDataCols(0, pCfg->maxRowsPerFileBlock);
|
||||||
if (pCommith->pDataCols == NULL) {
|
if (pCommith->pDataCols == NULL) {
|
||||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||||
tsdbDestroyCommitH(pCommith);
|
tsdbDestroyCommitH(pCommith);
|
||||||
|
|
|
@ -296,7 +296,7 @@ static int tsdbCompactMeta(STsdbRepo *pRepo) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
pComph->pDataCols = tdNewDataCols(0, 0, pCfg->maxRowsPerFileBlock);
|
pComph->pDataCols = tdNewDataCols(0, pCfg->maxRowsPerFileBlock);
|
||||||
if (pComph->pDataCols == NULL) {
|
if (pComph->pDataCols == NULL) {
|
||||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||||
tsdbDestroyCompactH(pComph);
|
tsdbDestroyCompactH(pComph);
|
||||||
|
|
|
@ -17,7 +17,6 @@
|
||||||
#define TSDB_SUPER_TABLE_SL_LEVEL 5
|
#define TSDB_SUPER_TABLE_SL_LEVEL 5
|
||||||
#define DEFAULT_TAG_INDEX_COLUMN 0
|
#define DEFAULT_TAG_INDEX_COLUMN 0
|
||||||
|
|
||||||
static int tsdbCompareSchemaVersion(const void *key1, const void *key2);
|
|
||||||
static char * getTagIndexKey(const void *pData);
|
static char * getTagIndexKey(const void *pData);
|
||||||
static STable *tsdbNewTable();
|
static STable *tsdbNewTable();
|
||||||
static STable *tsdbCreateTableFromCfg(STableCfg *pCfg, bool isSuper, STable *pSTable);
|
static STable *tsdbCreateTableFromCfg(STableCfg *pCfg, bool isSuper, STable *pSTable);
|
||||||
|
|
|
@ -466,7 +466,7 @@ static STsdbQueryHandle* tsdbQueryTablesImpl(STsdbRepo* tsdb, STsdbQueryCond* pC
|
||||||
STsdbMeta* pMeta = tsdbGetMeta(tsdb);
|
STsdbMeta* pMeta = tsdbGetMeta(tsdb);
|
||||||
assert(pMeta != NULL);
|
assert(pMeta != NULL);
|
||||||
|
|
||||||
pQueryHandle->pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pQueryHandle->pTsdb->config.maxRowsPerFileBlock);
|
pQueryHandle->pDataCols = tdNewDataCols(pMeta->maxCols, pQueryHandle->pTsdb->config.maxRowsPerFileBlock);
|
||||||
if (pQueryHandle->pDataCols == NULL) {
|
if (pQueryHandle->pDataCols == NULL) {
|
||||||
tsdbError("%p failed to malloc buf for pDataCols, %"PRIu64, pQueryHandle, pQueryHandle->qId);
|
tsdbError("%p failed to malloc buf for pDataCols, %"PRIu64, pQueryHandle, pQueryHandle->qId);
|
||||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||||
|
@ -1446,7 +1446,7 @@ static int doBinarySearchKey(char* pValue, int num, TSKEY key, int order) {
|
||||||
return midPos;
|
return midPos;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t doCopyRowsFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t capacity, int32_t numOfRows, int32_t start, int32_t end) {
|
static int32_t doCopyRowsFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t capacity, int32_t numOfRows, int32_t start, int32_t end) {
|
||||||
char* pData = NULL;
|
char* pData = NULL;
|
||||||
int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1 : -1;
|
int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1 : -1;
|
||||||
|
|
||||||
|
@ -1481,7 +1481,7 @@ int32_t doCopyRowsFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t capacity
|
||||||
pData = (char*)pColInfo->pData + (capacity - numOfRows - num) * pColInfo->info.bytes;
|
pData = (char*)pColInfo->pData + (capacity - numOfRows - num) * pColInfo->info.bytes;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pColInfo->info.colId == src->colId) {
|
if (!isAllRowsNull(src) && pColInfo->info.colId == src->colId) {
|
||||||
if (pColInfo->info.type != TSDB_DATA_TYPE_BINARY && pColInfo->info.type != TSDB_DATA_TYPE_NCHAR) {
|
if (pColInfo->info.type != TSDB_DATA_TYPE_BINARY && pColInfo->info.type != TSDB_DATA_TYPE_NCHAR) {
|
||||||
memmove(pData, (char*)src->pData + bytes * start, bytes * num);
|
memmove(pData, (char*)src->pData + bytes * start, bytes * num);
|
||||||
} else { // handle the var-string
|
} else { // handle the var-string
|
||||||
|
|
|
@ -42,14 +42,14 @@ int tsdbInitReadH(SReadH *pReadh, STsdbRepo *pRepo) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
pReadh->pDCols[0] = tdNewDataCols(0, 0, pCfg->maxRowsPerFileBlock);
|
pReadh->pDCols[0] = tdNewDataCols(0, pCfg->maxRowsPerFileBlock);
|
||||||
if (pReadh->pDCols[0] == NULL) {
|
if (pReadh->pDCols[0] == NULL) {
|
||||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||||
tsdbDestroyReadH(pReadh);
|
tsdbDestroyReadH(pReadh);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
pReadh->pDCols[1] = tdNewDataCols(0, 0, pCfg->maxRowsPerFileBlock);
|
pReadh->pDCols[1] = tdNewDataCols(0, pCfg->maxRowsPerFileBlock);
|
||||||
if (pReadh->pDCols[1] == NULL) {
|
if (pReadh->pDCols[1] == NULL) {
|
||||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||||
tsdbDestroyReadH(pReadh);
|
tsdbDestroyReadH(pReadh);
|
||||||
|
@ -463,8 +463,9 @@ static int tsdbLoadBlockDataImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDat
|
||||||
SDataCol *pDataCol = &(pDataCols->cols[dcol]);
|
SDataCol *pDataCol = &(pDataCols->cols[dcol]);
|
||||||
if (dcol != 0 && ccol >= pBlockData->numOfCols) {
|
if (dcol != 0 && ccol >= pBlockData->numOfCols) {
|
||||||
// Set current column as NULL and forward
|
// Set current column as NULL and forward
|
||||||
// TODO: dataColSetNEleNull may fail
|
// TODO: tdAllocMemForCol may fail
|
||||||
dataColSetNEleNull(pDataCol, pBlock->numOfRows, pDataCols->maxPoints);
|
tdAllocMemForCol(pDataCol, pDataCols->maxPoints);
|
||||||
|
dataColSetNEleNull(pDataCol, pBlock->numOfRows);
|
||||||
dcol++;
|
dcol++;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -504,8 +505,9 @@ static int tsdbLoadBlockDataImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDat
|
||||||
ccol++;
|
ccol++;
|
||||||
} else {
|
} else {
|
||||||
// Set current column as NULL and forward
|
// Set current column as NULL and forward
|
||||||
// TODO: dataColSetNEleNull may fail
|
// TODO: tdAllocMemForCol may fail
|
||||||
dataColSetNEleNull(pDataCol, pBlock->numOfRows, pDataCols->maxPoints);
|
tdAllocMemForCol(pDataCol, pDataCols->maxPoints);
|
||||||
|
dataColSetNEleNull(pDataCol, pBlock->numOfRows);
|
||||||
dcol++;
|
dcol++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -610,8 +612,9 @@ static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pBlockCol == NULL) {
|
if (pBlockCol == NULL) {
|
||||||
// TODO: dataColSetNEleNull may fail
|
// TODO: tdAllocMemForCol may fail
|
||||||
dataColSetNEleNull(pDataCol, pBlock->numOfRows, pDataCols->maxPoints);
|
tdAllocMemForCol(pDataCol, pDataCols->maxPoints);
|
||||||
|
dataColSetNEleNull(pDataCol, pBlock->numOfRows);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue