From 9de8ca6725960dc5078196db8f78e9a4e6e3e8fa Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Tue, 3 Aug 2021 15:29:00 +0800 Subject: [PATCH 01/14] [TD-5694]: alloc mem for datacols dynamically --- src/common/inc/tdataformat.h | 6 +- src/common/src/tdataformat.c | 180 ++++++++++++++------------------- src/tsdb/inc/tsdbRowMergeBuf.h | 4 +- src/tsdb/src/tsdbMeta.c | 2 + src/tsdb/src/tsdbReadImpl.c | 9 ++ 5 files changed, 95 insertions(+), 106 deletions(-) diff --git a/src/common/inc/tdataformat.h b/src/common/inc/tdataformat.h index 47bd8a72b2..99c612c86c 100644 --- a/src/common/inc/tdataformat.h +++ b/src/common/inc/tdataformat.h @@ -325,7 +325,7 @@ typedef struct SDataCol { #define isAllRowsNull(pCol) ((pCol)->len == 0) static FORCE_INLINE void dataColReset(SDataCol *pDataCol) { pDataCol->len = 0; } -void dataColInit(SDataCol *pDataCol, STColumn *pCol, void **pBuf, int maxPoints); +void dataColInit(SDataCol *pDataCol, STColumn *pCol, int maxPoints); void dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxPoints); void dataColSetOffset(SDataCol *pCol, int nEle); @@ -358,12 +358,12 @@ typedef struct { int maxRowSize; int maxCols; // max number of columns int maxPoints; // max number of points - int bufSize; + //int bufSize; int numOfRows; int numOfCols; // Total number of cols int sversion; // TODO: set sversion - void * buf; + //void * buf; SDataCol *cols; } SDataCols; diff --git a/src/common/src/tdataformat.c b/src/common/src/tdataformat.c index 8ef3d083c7..ad928211a1 100644 --- a/src/common/src/tdataformat.c +++ b/src/common/src/tdataformat.c @@ -207,24 +207,16 @@ SMemRow tdMemRowDup(SMemRow row) { return trow; } -void dataColInit(SDataCol *pDataCol, STColumn *pCol, void **pBuf, int maxPoints) { +void dataColInit(SDataCol *pDataCol, STColumn *pCol, int maxPoints) { pDataCol->type = colType(pCol); pDataCol->colId = colColId(pCol); pDataCol->bytes = colBytes(pCol); pDataCol->offset = colOffset(pCol) + TD_DATA_ROW_HEAD_SIZE; pDataCol->len = 0; - if (IS_VAR_DATA_TYPE(pDataCol->type)) { - pDataCol->dataOff = (VarDataOffsetT *)(*pBuf); - pDataCol->pData = POINTER_SHIFT(*pBuf, sizeof(VarDataOffsetT) * maxPoints); - pDataCol->spaceSize = pDataCol->bytes * maxPoints; - *pBuf = POINTER_SHIFT(*pBuf, pDataCol->spaceSize + sizeof(VarDataOffsetT) * maxPoints); - } else { - pDataCol->spaceSize = pDataCol->bytes * maxPoints; - pDataCol->dataOff = NULL; - pDataCol->pData = *pBuf; - *pBuf = POINTER_SHIFT(*pBuf, pDataCol->spaceSize); - } + pDataCol->spaceSize = pDataCol->bytes * maxPoints; + pDataCol->pData = NULL; + pDataCol->dataOff = NULL; } // value from timestamp should be TKEY here instead of TSKEY void dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxPoints) { @@ -239,6 +231,15 @@ void dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxP if (numOfRows > 0) { // Find the first not null value, fill all previouse values as NULL dataColSetNEleNull(pCol, numOfRows, maxPoints); + } else { + if(pCol->pData == NULL) { + pCol->pData = malloc(maxPoints * pCol->bytes); + ASSERT(pCol->pData != NULL); + if(IS_VAR_DATA_TYPE(pCol->type)) { + pCol->dataOff = malloc(maxPoints * sizeof(VarDataOffsetT)); + ASSERT(pCol->dataOff != NULL); + } + } } } @@ -263,7 +264,7 @@ bool isNEleNull(SDataCol *pCol, int nEle) { return true; } -FORCE_INLINE void dataColSetNullAt(SDataCol *pCol, int index) { +static FORCE_INLINE void dataColSetNullAt(SDataCol *pCol, int index) { if (IS_VAR_DATA_TYPE(pCol->type)) { pCol->dataOff[index] = pCol->len; char *ptr = POINTER_SHIFT(pCol->pData, pCol->len); @@ -277,6 +278,15 @@ FORCE_INLINE void dataColSetNullAt(SDataCol *pCol, int index) { void dataColSetNEleNull(SDataCol *pCol, int nEle, int maxPoints) { + if(pCol->pData == NULL) { + pCol->pData = malloc(maxPoints * pCol->bytes); + ASSERT(pCol->pData != NULL); + if(IS_VAR_DATA_TYPE(pCol->type)) { + pCol->dataOff = malloc(maxPoints * sizeof(VarDataOffsetT)); + ASSERT(pCol->dataOff != NULL); + } + } + if (IS_VAR_DATA_TYPE(pCol->type)) { pCol->len = 0; for (int i = 0; i < nEle; i++) { @@ -324,17 +334,7 @@ SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows) { } pCols->maxRowSize = maxRowSize; - pCols->bufSize = maxRowSize * maxRows; - if (pCols->bufSize > 0) { - pCols->buf = malloc(pCols->bufSize); - if (pCols->buf == NULL) { - uDebug("malloc failure, size:%" PRId64 " failed, reason:%s", (int64_t)sizeof(SDataCol) * maxCols, - strerror(errno)); - tdFreeDataCols(pCols); - return NULL; - } - } return pCols; } @@ -348,27 +348,31 @@ int tdInitDataCols(SDataCols *pCols, STSchema *pSchema) { if (schemaTLen(pSchema) > pCols->maxRowSize) { pCols->maxRowSize = schemaTLen(pSchema); - pCols->bufSize = schemaTLen(pSchema) * pCols->maxPoints; - pCols->buf = realloc(pCols->buf, pCols->bufSize); - if (pCols->buf == NULL) return -1; } tdResetDataCols(pCols); pCols->numOfCols = schemaNCols(pSchema); - void *ptr = pCols->buf; for (int i = 0; i < schemaNCols(pSchema); i++) { - dataColInit(pCols->cols + i, schemaColAt(pSchema, i), &ptr, pCols->maxPoints); - ASSERT((char *)ptr - (char *)(pCols->buf) <= pCols->bufSize); + dataColInit(pCols->cols + i, schemaColAt(pSchema, i), pCols->maxPoints); } return 0; } SDataCols *tdFreeDataCols(SDataCols *pCols) { + int i; if (pCols) { - tfree(pCols->buf); - tfree(pCols->cols); + if(pCols->cols) { + int maxCols = pCols->maxCols; + for(i = 0; i < maxCols; i++) { + SDataCol *pCol = &pCols->cols[i]; + tfree(pCol->pData); + tfree(pCol->dataOff); + } + free(pCols->cols); + pCols->cols = NULL; + } free(pCols); } return NULL; @@ -389,19 +393,17 @@ SDataCols *tdDupDataCols(SDataCols *pDataCols, bool keepData) { pRet->cols[i].offset = pDataCols->cols[i].offset; pRet->cols[i].spaceSize = pDataCols->cols[i].spaceSize; - pRet->cols[i].pData = (void *)((char *)pRet->buf + ((char *)(pDataCols->cols[i].pData) - (char *)(pDataCols->buf))); - - if (IS_VAR_DATA_TYPE(pRet->cols[i].type)) { - ASSERT(pDataCols->cols[i].dataOff != NULL); - pRet->cols[i].dataOff = - (int32_t *)((char *)pRet->buf + ((char *)(pDataCols->cols[i].dataOff) - (char *)(pDataCols->buf))); - } + pRet->cols[i].len = 0; + pRet->cols[i].dataOff = NULL; + pRet->cols[i].pData = NULL; if (keepData) { pRet->cols[i].len = pDataCols->cols[i].len; if (pDataCols->cols[i].len > 0) { + pRet->cols[i].pData = malloc(pDataCols->cols[i].bytes * pDataCols->maxPoints); memcpy(pRet->cols[i].pData, pDataCols->cols[i].pData, pDataCols->cols[i].len); if (IS_VAR_DATA_TYPE(pRet->cols[i].type)) { + pRet->cols[i].dataOff = malloc(sizeof(VarDataOffsetT) * pDataCols->maxPoints); memcpy(pRet->cols[i].dataOff, pDataCols->cols[i].dataOff, sizeof(VarDataOffsetT) * pDataCols->maxPoints); } } @@ -426,40 +428,27 @@ static void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols int rcol = 0; int dcol = 0; - if (dataRowDeleted(row)) { - for (; dcol < pCols->numOfCols; dcol++) { - SDataCol *pDataCol = &(pCols->cols[dcol]); - if (dcol == 0) { - dataColAppendVal(pDataCol, dataRowTuple(row), pCols->numOfRows, pCols->maxPoints); - } else { - dataColSetNullAt(pDataCol, pCols->numOfRows); - } + while (dcol < pCols->numOfCols) { + SDataCol *pDataCol = &(pCols->cols[dcol]); + if (rcol >= schemaNCols(pSchema)) { + dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints); + dcol++; + continue; } - } else { - while (dcol < pCols->numOfCols) { - SDataCol *pDataCol = &(pCols->cols[dcol]); - if (rcol >= schemaNCols(pSchema)) { - // dataColSetNullAt(pDataCol, pCols->numOfRows); - dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints); - dcol++; - continue; - } - STColumn *pRowCol = schemaColAt(pSchema, rcol); - if (pRowCol->colId == pDataCol->colId) { - void *value = tdGetRowDataOfCol(row, pRowCol->type, pRowCol->offset + TD_DATA_ROW_HEAD_SIZE); - dataColAppendVal(pDataCol, value, pCols->numOfRows, pCols->maxPoints); - dcol++; - rcol++; - } else if (pRowCol->colId < pDataCol->colId) { - rcol++; - } else { - if(forceSetNull) { - //dataColSetNullAt(pDataCol, pCols->numOfRows); - dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints); - } - dcol++; + STColumn *pRowCol = schemaColAt(pSchema, rcol); + if (pRowCol->colId == pDataCol->colId) { + void *value = tdGetRowDataOfCol(row, pRowCol->type, pRowCol->offset + TD_DATA_ROW_HEAD_SIZE); + dataColAppendVal(pDataCol, value, pCols->numOfRows, pCols->maxPoints); + dcol++; + rcol++; + } else if (pRowCol->colId < pDataCol->colId) { + rcol++; + } else { + if(forceSetNull) { + dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints); } + dcol++; } } pCols->numOfRows++; @@ -471,43 +460,30 @@ static void tdAppendKvRowToDataCol(SKVRow row, STSchema *pSchema, SDataCols *pCo int rcol = 0; int dcol = 0; - if (kvRowDeleted(row)) { - for (; dcol < pCols->numOfCols; dcol++) { - SDataCol *pDataCol = &(pCols->cols[dcol]); - if (dcol == 0) { - dataColAppendVal(pDataCol, kvRowValues(row), pCols->numOfRows, pCols->maxPoints); - } else { - dataColSetNullAt(pDataCol, pCols->numOfRows); - } + int nRowCols = kvRowNCols(row); + + while (dcol < pCols->numOfCols) { + SDataCol *pDataCol = &(pCols->cols[dcol]); + if (rcol >= nRowCols || rcol >= schemaNCols(pSchema)) { + dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints); + ++dcol; + continue; } - } else { - int nRowCols = kvRowNCols(row); - while (dcol < pCols->numOfCols) { - SDataCol *pDataCol = &(pCols->cols[dcol]); - if (rcol >= nRowCols || rcol >= schemaNCols(pSchema)) { - // dataColSetNullAt(pDataCol, pCols->numOfRows); + SColIdx *colIdx = kvRowColIdxAt(row, rcol); + + if (colIdx->colId == pDataCol->colId) { + void *value = tdGetKvRowDataOfCol(row, colIdx->offset); + dataColAppendVal(pDataCol, value, pCols->numOfRows, pCols->maxPoints); + ++dcol; + ++rcol; + } else if (colIdx->colId < pDataCol->colId) { + ++rcol; + } else { + if (forceSetNull) { dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints); - ++dcol; - continue; - } - - SColIdx *colIdx = kvRowColIdxAt(row, rcol); - - if (colIdx->colId == pDataCol->colId) { - void *value = tdGetKvRowDataOfCol(row, colIdx->offset); - dataColAppendVal(pDataCol, value, pCols->numOfRows, pCols->maxPoints); - ++dcol; - ++rcol; - } else if (colIdx->colId < pDataCol->colId) { - ++rcol; - } else { - if (forceSetNull) { - // dataColSetNullAt(pDataCol, pCols->numOfRows); - dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints); - } - ++dcol; } + ++dcol; } } pCols->numOfRows++; diff --git a/src/tsdb/inc/tsdbRowMergeBuf.h b/src/tsdb/inc/tsdbRowMergeBuf.h index 302bf25750..cefa9b27fb 100644 --- a/src/tsdb/inc/tsdbRowMergeBuf.h +++ b/src/tsdb/inc/tsdbRowMergeBuf.h @@ -29,7 +29,9 @@ typedef void* SMergeBuf; SDataRow tsdbMergeTwoRows(SMergeBuf *pBuf, SMemRow row1, SMemRow row2, STSchema *pSchema1, STSchema *pSchema2); static FORCE_INLINE int tsdbMergeBufMakeSureRoom(SMergeBuf *pBuf, STSchema* pSchema1, STSchema* pSchema2) { - return tsdbMakeRoom(pBuf, MAX(dataRowMaxBytesFromSchema(pSchema1), dataRowMaxBytesFromSchema(pSchema2))); + size_t len1 = dataRowMaxBytesFromSchema(pSchema1); + size_t len2 = dataRowMaxBytesFromSchema(pSchema2); + return tsdbMakeRoom(pBuf, MAX(len1, len2)); } static FORCE_INLINE void tsdbFreeMergeBuf(SMergeBuf buf) { diff --git a/src/tsdb/src/tsdbMeta.c b/src/tsdb/src/tsdbMeta.c index f233500ee9..619b32b3d9 100644 --- a/src/tsdb/src/tsdbMeta.c +++ b/src/tsdb/src/tsdbMeta.c @@ -1035,6 +1035,8 @@ static void tsdbRemoveTableFromMeta(STsdbRepo *pRepo, STable *pTable, bool rmFro } } } + pMeta->maxCols = maxCols; + pMeta->maxRowBytes = maxRowBytes; if (lock) tsdbUnlockRepoMeta(pRepo); tsdbDebug("vgId:%d table %s uid %" PRIu64 " is removed from meta", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), TABLE_UID(pTable)); diff --git a/src/tsdb/src/tsdbReadImpl.c b/src/tsdb/src/tsdbReadImpl.c index 666a2d3571..a16c3ffe6a 100644 --- a/src/tsdb/src/tsdbReadImpl.c +++ b/src/tsdb/src/tsdbReadImpl.c @@ -518,6 +518,15 @@ static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, void *content, int32 return -1; } + if(pDataCol->pData == NULL) { + pDataCol->pData = malloc(maxPoints * pDataCol->bytes); + ASSERT(pDataCol->pData != NULL); + if(IS_VAR_DATA_TYPE(pDataCol->type)) { + pDataCol->dataOff = malloc(maxPoints * sizeof(VarDataOffsetT)); + ASSERT(pDataCol->dataOff != NULL); + } + } + // Decode the data if (comp) { // Need to decompress From e8d100657bcf800fd51f8e3a843e41b0c54ff6cd Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Tue, 3 Aug 2021 18:37:57 +0800 Subject: [PATCH 02/14] [TD-5694]: fix memory alloc --- src/common/inc/tdataformat.h | 4 +-- src/common/src/tdataformat.c | 61 +++++++++++++++++++++++------------- src/tsdb/src/tsdbReadImpl.c | 9 +----- 3 files changed, 42 insertions(+), 32 deletions(-) diff --git a/src/common/inc/tdataformat.h b/src/common/inc/tdataformat.h index 99c612c86c..53e77430d3 100644 --- a/src/common/inc/tdataformat.h +++ b/src/common/inc/tdataformat.h @@ -325,6 +325,8 @@ typedef struct SDataCol { #define isAllRowsNull(pCol) ((pCol)->len == 0) static FORCE_INLINE void dataColReset(SDataCol *pDataCol) { pDataCol->len = 0; } +void tdAllocMemForCol(SDataCol *pCol, int maxPoints); + void dataColInit(SDataCol *pDataCol, STColumn *pCol, int maxPoints); void dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxPoints); void dataColSetOffset(SDataCol *pCol, int nEle); @@ -358,12 +360,10 @@ typedef struct { int maxRowSize; int maxCols; // max number of columns int maxPoints; // max number of points - //int bufSize; int numOfRows; int numOfCols; // Total number of cols int sversion; // TODO: set sversion - //void * buf; SDataCol *cols; } SDataCols; diff --git a/src/common/src/tdataformat.c b/src/common/src/tdataformat.c index ad928211a1..077081bfb6 100644 --- a/src/common/src/tdataformat.c +++ b/src/common/src/tdataformat.c @@ -22,6 +22,24 @@ static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2, int limit2, int tRows, bool forceSetNull); +void tdAllocMemForCol(SDataCol *pCol, int maxPoints) { + if(pCol->pData == NULL) { + pCol->pData = malloc(maxPoints * pCol->bytes); + pCol->spaceSize = maxPoints * pCol->bytes; + if(pCol->pData == NULL) { + uDebug("malloc failure, size:%" PRId64 " failed, reason:%s", (int64_t)pCol->spaceSize, + strerror(errno)); + } + if(IS_VAR_DATA_TYPE(pCol->type)) { + pCol->dataOff = malloc(maxPoints * sizeof(VarDataOffsetT)); + if(pCol->dataOff == NULL) { + uDebug("malloc failure, size:%" PRId64 " failed, reason:%s", (int64_t)(maxPoints * sizeof(VarDataOffsetT)), + strerror(errno)); + } + } + } +} + /** * Duplicate the schema and return a new object */ @@ -214,9 +232,6 @@ void dataColInit(SDataCol *pDataCol, STColumn *pCol, int maxPoints) { pDataCol->offset = colOffset(pCol) + TD_DATA_ROW_HEAD_SIZE; pDataCol->len = 0; - pDataCol->spaceSize = pDataCol->bytes * maxPoints; - pDataCol->pData = NULL; - pDataCol->dataOff = NULL; } // value from timestamp should be TKEY here instead of TSKEY void dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxPoints) { @@ -232,14 +247,7 @@ void dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxP // Find the first not null value, fill all previouse values as NULL dataColSetNEleNull(pCol, numOfRows, maxPoints); } else { - if(pCol->pData == NULL) { - pCol->pData = malloc(maxPoints * pCol->bytes); - ASSERT(pCol->pData != NULL); - if(IS_VAR_DATA_TYPE(pCol->type)) { - pCol->dataOff = malloc(maxPoints * sizeof(VarDataOffsetT)); - ASSERT(pCol->dataOff != NULL); - } - } + tdAllocMemForCol(pCol, maxPoints); } } @@ -277,15 +285,8 @@ static FORCE_INLINE void dataColSetNullAt(SDataCol *pCol, int index) { } void dataColSetNEleNull(SDataCol *pCol, int nEle, int maxPoints) { - - if(pCol->pData == NULL) { - pCol->pData = malloc(maxPoints * pCol->bytes); - ASSERT(pCol->pData != NULL); - if(IS_VAR_DATA_TYPE(pCol->type)) { - pCol->dataOff = malloc(maxPoints * sizeof(VarDataOffsetT)); - ASSERT(pCol->dataOff != NULL); - } - } + if(isAllRowsNull(pCol)) return; + tdAllocMemForCol(pCol, maxPoints); if (IS_VAR_DATA_TYPE(pCol->type)) { pCol->len = 0; @@ -340,9 +341,24 @@ SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows) { } int tdInitDataCols(SDataCols *pCols, STSchema *pSchema) { + int i; + int oldMaxCols = pCols->maxCols; + if(oldMaxCols > 0) { + for(i = 0; i < oldMaxCols; i++) { + if(i >= pSchema->numOfCols || + (pCols->cols[i].spaceSize < pSchema->columns[i].bytes * pCols->maxPoints)) { + tfree(pCols->cols[i].pData); + tfree(pCols->cols[i].dataOff); + } + } + } if (schemaNCols(pSchema) > pCols->maxCols) { pCols->maxCols = schemaNCols(pSchema); pCols->cols = (SDataCol *)realloc(pCols->cols, sizeof(SDataCol) * pCols->maxCols); + for(i = oldMaxCols; i < pCols->maxCols; i++) { + pCols->cols[i].pData = NULL; + pCols->cols[i].dataOff = NULL; + } if (pCols->cols == NULL) return -1; } @@ -353,7 +369,7 @@ int tdInitDataCols(SDataCols *pCols, STSchema *pSchema) { tdResetDataCols(pCols); pCols->numOfCols = schemaNCols(pSchema); - for (int i = 0; i < schemaNCols(pSchema); i++) { + for (i = 0; i < schemaNCols(pSchema); i++) { dataColInit(pCols->cols + i, schemaColAt(pSchema, i), pCols->maxPoints); } @@ -392,7 +408,7 @@ SDataCols *tdDupDataCols(SDataCols *pDataCols, bool keepData) { pRet->cols[i].bytes = pDataCols->cols[i].bytes; pRet->cols[i].offset = pDataCols->cols[i].offset; - pRet->cols[i].spaceSize = pDataCols->cols[i].spaceSize; + pRet->cols[i].spaceSize = 0; pRet->cols[i].len = 0; pRet->cols[i].dataOff = NULL; pRet->cols[i].pData = NULL; @@ -400,6 +416,7 @@ SDataCols *tdDupDataCols(SDataCols *pDataCols, bool keepData) { if (keepData) { pRet->cols[i].len = pDataCols->cols[i].len; if (pDataCols->cols[i].len > 0) { + pRet->cols[i].spaceSize = pDataCols->cols[i].spaceSize; pRet->cols[i].pData = malloc(pDataCols->cols[i].bytes * pDataCols->maxPoints); memcpy(pRet->cols[i].pData, pDataCols->cols[i].pData, pDataCols->cols[i].len); if (IS_VAR_DATA_TYPE(pRet->cols[i].type)) { diff --git a/src/tsdb/src/tsdbReadImpl.c b/src/tsdb/src/tsdbReadImpl.c index a16c3ffe6a..711c32535b 100644 --- a/src/tsdb/src/tsdbReadImpl.c +++ b/src/tsdb/src/tsdbReadImpl.c @@ -518,14 +518,7 @@ static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, void *content, int32 return -1; } - if(pDataCol->pData == NULL) { - pDataCol->pData = malloc(maxPoints * pDataCol->bytes); - ASSERT(pDataCol->pData != NULL); - if(IS_VAR_DATA_TYPE(pDataCol->type)) { - pDataCol->dataOff = malloc(maxPoints * sizeof(VarDataOffsetT)); - ASSERT(pDataCol->dataOff != NULL); - } - } + tdAllocMemForCol(pDataCol, maxPoints); // Decode the data if (comp) { From 72c26ef481e35494bfac24092d46be89d1e1ed1e Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Tue, 3 Aug 2021 22:03:59 +0800 Subject: [PATCH 03/14] [TD-5694]: refactor --- src/common/src/tdataformat.c | 53 ++++++++++++++++++------------------ 1 file changed, 27 insertions(+), 26 deletions(-) diff --git a/src/common/src/tdataformat.c b/src/common/src/tdataformat.c index 077081bfb6..f50445e6e7 100644 --- a/src/common/src/tdataformat.c +++ b/src/common/src/tdataformat.c @@ -23,18 +23,20 @@ static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, i int limit2, int tRows, bool forceSetNull); void tdAllocMemForCol(SDataCol *pCol, int maxPoints) { - if(pCol->pData == NULL) { - pCol->pData = malloc(maxPoints * pCol->bytes); - pCol->spaceSize = maxPoints * pCol->bytes; - if(pCol->pData == NULL) { + int spaceNeeded = pCol->bytes * maxPoints; + if(IS_VAR_DATA_TYPE(pCol->type)) { + spaceNeeded += sizeof(VarDataOffsetT) * maxPoints; + } + if(pCol->spaceSize < spaceNeeded) { + void* ptr = realloc(pCol->pData, spaceNeeded); + if(ptr == NULL) { uDebug("malloc failure, size:%" PRId64 " failed, reason:%s", (int64_t)pCol->spaceSize, strerror(errno)); - } - if(IS_VAR_DATA_TYPE(pCol->type)) { - pCol->dataOff = malloc(maxPoints * sizeof(VarDataOffsetT)); - if(pCol->dataOff == NULL) { - uDebug("malloc failure, size:%" PRId64 " failed, reason:%s", (int64_t)(maxPoints * sizeof(VarDataOffsetT)), - strerror(errno)); + } else { + pCol->pData = ptr; + pCol->spaceSize = spaceNeeded; + if(IS_VAR_DATA_TYPE(pCol->type)) { + pCol->dataOff = POINTER_SHIFT(ptr, pCol->bytes * maxPoints); } } } @@ -330,6 +332,12 @@ SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows) { tdFreeDataCols(pCols); return NULL; } + int i; + for(i = 0; i < maxCols; i++) { + pCols->cols[i].spaceSize = 0; + pCols->cols[i].pData = NULL; + pCols->cols[i].dataOff = NULL; + } pCols->maxCols = maxCols; } @@ -343,23 +351,15 @@ SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows) { int tdInitDataCols(SDataCols *pCols, STSchema *pSchema) { int i; int oldMaxCols = pCols->maxCols; - if(oldMaxCols > 0) { - for(i = 0; i < oldMaxCols; i++) { - if(i >= pSchema->numOfCols || - (pCols->cols[i].spaceSize < pSchema->columns[i].bytes * pCols->maxPoints)) { - tfree(pCols->cols[i].pData); - tfree(pCols->cols[i].dataOff); - } - } - } - if (schemaNCols(pSchema) > pCols->maxCols) { + if (schemaNCols(pSchema) > oldMaxCols) { pCols->maxCols = schemaNCols(pSchema); pCols->cols = (SDataCol *)realloc(pCols->cols, sizeof(SDataCol) * pCols->maxCols); + if (pCols->cols == NULL) return -1; for(i = oldMaxCols; i < pCols->maxCols; i++) { pCols->cols[i].pData = NULL; pCols->cols[i].dataOff = NULL; + pCols->cols[i].spaceSize = 0; } - if (pCols->cols == NULL) return -1; } if (schemaTLen(pSchema) > pCols->maxRowSize) { @@ -384,7 +384,6 @@ SDataCols *tdFreeDataCols(SDataCols *pCols) { for(i = 0; i < maxCols; i++) { SDataCol *pCol = &pCols->cols[i]; tfree(pCol->pData); - tfree(pCol->dataOff); } free(pCols->cols); pCols->cols = NULL; @@ -416,12 +415,14 @@ SDataCols *tdDupDataCols(SDataCols *pDataCols, bool keepData) { if (keepData) { pRet->cols[i].len = pDataCols->cols[i].len; if (pDataCols->cols[i].len > 0) { - pRet->cols[i].spaceSize = pDataCols->cols[i].spaceSize; - pRet->cols[i].pData = malloc(pDataCols->cols[i].bytes * pDataCols->maxPoints); + int spaceSize = pDataCols->cols[i].bytes * pDataCols->maxPoints; + pRet->cols[i].spaceSize = spaceSize; + pRet->cols[i].pData = malloc(spaceSize); memcpy(pRet->cols[i].pData, pDataCols->cols[i].pData, pDataCols->cols[i].len); if (IS_VAR_DATA_TYPE(pRet->cols[i].type)) { - pRet->cols[i].dataOff = malloc(sizeof(VarDataOffsetT) * pDataCols->maxPoints); - memcpy(pRet->cols[i].dataOff, pDataCols->cols[i].dataOff, sizeof(VarDataOffsetT) * pDataCols->maxPoints); + int dataOffSize = sizeof(VarDataOffsetT) * pDataCols->maxPoints; + pRet->cols[i].dataOff = malloc(dataOffSize); + memcpy(pRet->cols[i].dataOff, pDataCols->cols[i].dataOff, dataOffSize); } } } From 7dbf526124dd8a39e6d70edf4a52735c19ce5075 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Tue, 3 Aug 2021 22:13:46 +0800 Subject: [PATCH 04/14] [TD-5694]: refactor --- src/common/src/tdataformat.c | 13 +++---------- 1 file changed, 3 insertions(+), 10 deletions(-) diff --git a/src/common/src/tdataformat.c b/src/common/src/tdataformat.c index f50445e6e7..0082c11e4b 100644 --- a/src/common/src/tdataformat.c +++ b/src/common/src/tdataformat.c @@ -335,6 +335,7 @@ SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows) { int i; for(i = 0; i < maxCols; i++) { pCols->cols[i].spaceSize = 0; + pCols->cols[i].len = 0; pCols->cols[i].pData = NULL; pCols->cols[i].dataOff = NULL; } @@ -407,21 +408,13 @@ SDataCols *tdDupDataCols(SDataCols *pDataCols, bool keepData) { pRet->cols[i].bytes = pDataCols->cols[i].bytes; pRet->cols[i].offset = pDataCols->cols[i].offset; - pRet->cols[i].spaceSize = 0; - pRet->cols[i].len = 0; - pRet->cols[i].dataOff = NULL; - pRet->cols[i].pData = NULL; - if (keepData) { pRet->cols[i].len = pDataCols->cols[i].len; - if (pDataCols->cols[i].len > 0) { - int spaceSize = pDataCols->cols[i].bytes * pDataCols->maxPoints; - pRet->cols[i].spaceSize = spaceSize; - pRet->cols[i].pData = malloc(spaceSize); + if (pRet->cols[i].len > 0) { + tdAllocMemForCol(&pRet->cols[i], pRet->maxPoints); memcpy(pRet->cols[i].pData, pDataCols->cols[i].pData, pDataCols->cols[i].len); if (IS_VAR_DATA_TYPE(pRet->cols[i].type)) { int dataOffSize = sizeof(VarDataOffsetT) * pDataCols->maxPoints; - pRet->cols[i].dataOff = malloc(dataOffSize); memcpy(pRet->cols[i].dataOff, pDataCols->cols[i].dataOff, dataOffSize); } } From 3b775c190ff6091f33adaab67fd0391eb868633c Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Tue, 3 Aug 2021 22:52:46 +0800 Subject: [PATCH 05/14] [TD-5694]: fix memory alloc --- src/common/src/tdataformat.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/common/src/tdataformat.c b/src/common/src/tdataformat.c index 0082c11e4b..c3615e64fc 100644 --- a/src/common/src/tdataformat.c +++ b/src/common/src/tdataformat.c @@ -248,10 +248,9 @@ void dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxP if (numOfRows > 0) { // Find the first not null value, fill all previouse values as NULL dataColSetNEleNull(pCol, numOfRows, maxPoints); - } else { - tdAllocMemForCol(pCol, maxPoints); } } + tdAllocMemForCol(pCol, maxPoints); if (IS_VAR_DATA_TYPE(pCol->type)) { // set offset From 2062b768db6286653c94803ea7cceb5a580e9980 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Wed, 4 Aug 2021 00:01:13 +0800 Subject: [PATCH 06/14] [TD-5694]: fix memory alloc --- src/common/src/tdataformat.c | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/common/src/tdataformat.c b/src/common/src/tdataformat.c index c3615e64fc..c96c916a01 100644 --- a/src/common/src/tdataformat.c +++ b/src/common/src/tdataformat.c @@ -267,6 +267,7 @@ void dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxP } bool isNEleNull(SDataCol *pCol, int nEle) { + if(isAllRowsNull(pCol)) return true; for (int i = 0; i < nEle; i++) { if (!isNull(tdGetColDataOfRow(pCol, i), pCol->type)) return false; } @@ -360,6 +361,15 @@ int tdInitDataCols(SDataCols *pCols, STSchema *pSchema) { pCols->cols[i].dataOff = NULL; pCols->cols[i].spaceSize = 0; } + } else if(schemaNCols(pSchema) < oldMaxCols){ + //TODO: this handling should not exist, for alloc will handle it nicely + for(i = schemaNCols(pSchema); i < oldMaxCols; i++) { + tfree(pCols->cols[i].pData); + pCols->cols[i].spaceSize = 0; + } + pCols->maxCols = schemaNCols(pSchema); + pCols->cols = (SDataCol *)realloc(pCols->cols, sizeof(SDataCol) * pCols->maxCols); + if (pCols->cols == NULL) return -1; } if (schemaTLen(pSchema) > pCols->maxRowSize) { From 97bd6a9f9212abc5a4c45e1b5f4e66b0744033f9 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Wed, 4 Aug 2021 00:18:10 +0800 Subject: [PATCH 07/14] [TD-5694]: fix memory alloc --- src/common/src/tdataformat.c | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/common/src/tdataformat.c b/src/common/src/tdataformat.c index c96c916a01..16c96bb16f 100644 --- a/src/common/src/tdataformat.c +++ b/src/common/src/tdataformat.c @@ -371,6 +371,11 @@ int tdInitDataCols(SDataCols *pCols, STSchema *pSchema) { pCols->cols = (SDataCol *)realloc(pCols->cols, sizeof(SDataCol) * pCols->maxCols); if (pCols->cols == NULL) return -1; } + for(i = 0; i < pCols->maxCols; i++) { + tfree(pCols->cols[i].pData); + pCols->cols[i].dataOff = NULL; + pCols->cols[i].spaceSize = 0; + } if (schemaTLen(pSchema) > pCols->maxRowSize) { pCols->maxRowSize = schemaTLen(pSchema); From 42db901a22ac8d438adc4ae680d29c2d736b6d4b Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Wed, 4 Aug 2021 00:51:35 +0800 Subject: [PATCH 08/14] [TD-5694]: fix --- src/common/inc/tdataformat.h | 2 +- src/common/src/tdataformat.c | 20 +++++--------------- 2 files changed, 6 insertions(+), 16 deletions(-) diff --git a/src/common/inc/tdataformat.h b/src/common/inc/tdataformat.h index 53e77430d3..fb6bab0cf2 100644 --- a/src/common/inc/tdataformat.h +++ b/src/common/inc/tdataformat.h @@ -325,7 +325,7 @@ typedef struct SDataCol { #define isAllRowsNull(pCol) ((pCol)->len == 0) static FORCE_INLINE void dataColReset(SDataCol *pDataCol) { pDataCol->len = 0; } -void tdAllocMemForCol(SDataCol *pCol, int maxPoints); +int tdAllocMemForCol(SDataCol *pCol, int maxPoints); void dataColInit(SDataCol *pDataCol, STColumn *pCol, int maxPoints); void dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxPoints); diff --git a/src/common/src/tdataformat.c b/src/common/src/tdataformat.c index 16c96bb16f..44a138cec4 100644 --- a/src/common/src/tdataformat.c +++ b/src/common/src/tdataformat.c @@ -22,7 +22,7 @@ static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2, int limit2, int tRows, bool forceSetNull); -void tdAllocMemForCol(SDataCol *pCol, int maxPoints) { +int tdAllocMemForCol(SDataCol *pCol, int maxPoints) { int spaceNeeded = pCol->bytes * maxPoints; if(IS_VAR_DATA_TYPE(pCol->type)) { spaceNeeded += sizeof(VarDataOffsetT) * maxPoints; @@ -30,8 +30,10 @@ void tdAllocMemForCol(SDataCol *pCol, int maxPoints) { if(pCol->spaceSize < spaceNeeded) { void* ptr = realloc(pCol->pData, spaceNeeded); if(ptr == NULL) { + ASSERT(false); uDebug("malloc failure, size:%" PRId64 " failed, reason:%s", (int64_t)pCol->spaceSize, strerror(errno)); + return -1; } else { pCol->pData = ptr; pCol->spaceSize = spaceNeeded; @@ -40,6 +42,7 @@ void tdAllocMemForCol(SDataCol *pCol, int maxPoints) { } } } + return 0; } /** @@ -361,20 +364,6 @@ int tdInitDataCols(SDataCols *pCols, STSchema *pSchema) { pCols->cols[i].dataOff = NULL; pCols->cols[i].spaceSize = 0; } - } else if(schemaNCols(pSchema) < oldMaxCols){ - //TODO: this handling should not exist, for alloc will handle it nicely - for(i = schemaNCols(pSchema); i < oldMaxCols; i++) { - tfree(pCols->cols[i].pData); - pCols->cols[i].spaceSize = 0; - } - pCols->maxCols = schemaNCols(pSchema); - pCols->cols = (SDataCol *)realloc(pCols->cols, sizeof(SDataCol) * pCols->maxCols); - if (pCols->cols == NULL) return -1; - } - for(i = 0; i < pCols->maxCols; i++) { - tfree(pCols->cols[i].pData); - pCols->cols[i].dataOff = NULL; - pCols->cols[i].spaceSize = 0; } if (schemaTLen(pSchema) > pCols->maxRowSize) { @@ -386,6 +375,7 @@ int tdInitDataCols(SDataCols *pCols, STSchema *pSchema) { for (i = 0; i < schemaNCols(pSchema); i++) { dataColInit(pCols->cols + i, schemaColAt(pSchema, i), pCols->maxPoints); + tdAllocMemForCol(pCols->cols + i, pCols->maxPoints); } return 0; From cc451f1fbccdb63cae5265d7ef504761dfe5391d Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Wed, 4 Aug 2021 01:02:00 +0800 Subject: [PATCH 09/14] [TD-5694]: fix --- src/common/src/tdataformat.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/common/src/tdataformat.c b/src/common/src/tdataformat.c index 44a138cec4..9bcada27cb 100644 --- a/src/common/src/tdataformat.c +++ b/src/common/src/tdataformat.c @@ -375,6 +375,7 @@ int tdInitDataCols(SDataCols *pCols, STSchema *pSchema) { for (i = 0; i < schemaNCols(pSchema); i++) { dataColInit(pCols->cols + i, schemaColAt(pSchema, i), pCols->maxPoints); + pCols->cols[i].spaceSize = 0; tdAllocMemForCol(pCols->cols + i, pCols->maxPoints); } From 7b1fce481ae0324cfe121c902f8059f33dc89133 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Wed, 4 Aug 2021 01:12:10 +0800 Subject: [PATCH 10/14] [TD-5694]: fix --- src/common/src/tdataformat.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/common/src/tdataformat.c b/src/common/src/tdataformat.c index 9bcada27cb..c4f3dda7b4 100644 --- a/src/common/src/tdataformat.c +++ b/src/common/src/tdataformat.c @@ -25,7 +25,7 @@ static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, i int tdAllocMemForCol(SDataCol *pCol, int maxPoints) { int spaceNeeded = pCol->bytes * maxPoints; if(IS_VAR_DATA_TYPE(pCol->type)) { - spaceNeeded += sizeof(VarDataOffsetT) * maxPoints; + spaceNeeded += sizeof(VarDataOffsetT) * maxPoints + sizeof(VarDataLenT) * maxPoints; } if(pCol->spaceSize < spaceNeeded) { void* ptr = realloc(pCol->pData, spaceNeeded); From 9d6dbf473699bf20c0d46d7a35003648152c333c Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Wed, 4 Aug 2021 01:15:29 +0800 Subject: [PATCH 11/14] [TD-5694]: fix --- src/common/src/tdataformat.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/common/src/tdataformat.c b/src/common/src/tdataformat.c index c4f3dda7b4..3a43a90b76 100644 --- a/src/common/src/tdataformat.c +++ b/src/common/src/tdataformat.c @@ -25,7 +25,7 @@ static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, i int tdAllocMemForCol(SDataCol *pCol, int maxPoints) { int spaceNeeded = pCol->bytes * maxPoints; if(IS_VAR_DATA_TYPE(pCol->type)) { - spaceNeeded += sizeof(VarDataOffsetT) * maxPoints + sizeof(VarDataLenT) * maxPoints; + spaceNeeded += sizeof(VarDataOffsetT) * maxPoints; } if(pCol->spaceSize < spaceNeeded) { void* ptr = realloc(pCol->pData, spaceNeeded); @@ -37,11 +37,11 @@ int tdAllocMemForCol(SDataCol *pCol, int maxPoints) { } else { pCol->pData = ptr; pCol->spaceSize = spaceNeeded; - if(IS_VAR_DATA_TYPE(pCol->type)) { - pCol->dataOff = POINTER_SHIFT(ptr, pCol->bytes * maxPoints); - } } } + if(IS_VAR_DATA_TYPE(pCol->type)) { + pCol->dataOff = POINTER_SHIFT(pCol->pData, pCol->bytes * maxPoints); + } return 0; } From 3749b3d120c1ef1941e16a3ffe5e611e884010cf Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Wed, 4 Aug 2021 03:04:43 +0800 Subject: [PATCH 12/14] [TD-5694]: fix --- src/common/src/tdataformat.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/common/src/tdataformat.c b/src/common/src/tdataformat.c index 3a43a90b76..9293139f52 100644 --- a/src/common/src/tdataformat.c +++ b/src/common/src/tdataformat.c @@ -251,9 +251,10 @@ void dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxP if (numOfRows > 0) { // Find the first not null value, fill all previouse values as NULL dataColSetNEleNull(pCol, numOfRows, maxPoints); + } else { + tdAllocMemForCol(pCol, maxPoints); } } - tdAllocMemForCol(pCol, maxPoints); if (IS_VAR_DATA_TYPE(pCol->type)) { // set offset @@ -290,7 +291,6 @@ static FORCE_INLINE void dataColSetNullAt(SDataCol *pCol, int index) { } void dataColSetNEleNull(SDataCol *pCol, int nEle, int maxPoints) { - if(isAllRowsNull(pCol)) return; tdAllocMemForCol(pCol, maxPoints); if (IS_VAR_DATA_TYPE(pCol->type)) { @@ -414,9 +414,9 @@ SDataCols *tdDupDataCols(SDataCols *pDataCols, bool keepData) { pRet->cols[i].offset = pDataCols->cols[i].offset; if (keepData) { - pRet->cols[i].len = pDataCols->cols[i].len; - if (pRet->cols[i].len > 0) { + if (pDataCols->cols[i].len > 0) { tdAllocMemForCol(&pRet->cols[i], pRet->maxPoints); + pRet->cols[i].len = pDataCols->cols[i].len; memcpy(pRet->cols[i].pData, pDataCols->cols[i].pData, pDataCols->cols[i].len); if (IS_VAR_DATA_TYPE(pRet->cols[i].type)) { int dataOffSize = sizeof(VarDataOffsetT) * pDataCols->maxPoints; From 391b5c6a3a9615811e2aa4947c5b70e885e4709b Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Wed, 4 Aug 2021 03:20:16 +0800 Subject: [PATCH 13/14] [TD-5694]: finish --- src/common/src/tdataformat.c | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/common/src/tdataformat.c b/src/common/src/tdataformat.c index 9293139f52..72a08f1a68 100644 --- a/src/common/src/tdataformat.c +++ b/src/common/src/tdataformat.c @@ -375,8 +375,6 @@ int tdInitDataCols(SDataCols *pCols, STSchema *pSchema) { for (i = 0; i < schemaNCols(pSchema); i++) { dataColInit(pCols->cols + i, schemaColAt(pSchema, i), pCols->maxPoints); - pCols->cols[i].spaceSize = 0; - tdAllocMemForCol(pCols->cols + i, pCols->maxPoints); } return 0; From f7e8569521c5f0af68fb1c2e88726b7eef592288 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Wed, 4 Aug 2021 03:22:04 +0800 Subject: [PATCH 14/14] [TD-5694]: finish --- src/common/src/tdataformat.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/common/src/tdataformat.c b/src/common/src/tdataformat.c index 72a08f1a68..3f0ab7f93e 100644 --- a/src/common/src/tdataformat.c +++ b/src/common/src/tdataformat.c @@ -22,6 +22,7 @@ static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2, int limit2, int tRows, bool forceSetNull); +//TODO: change caller to use return val int tdAllocMemForCol(SDataCol *pCol, int maxPoints) { int spaceNeeded = pCol->bytes * maxPoints; if(IS_VAR_DATA_TYPE(pCol->type)) { @@ -30,7 +31,6 @@ int tdAllocMemForCol(SDataCol *pCol, int maxPoints) { if(pCol->spaceSize < spaceNeeded) { void* ptr = realloc(pCol->pData, spaceNeeded); if(ptr == NULL) { - ASSERT(false); uDebug("malloc failure, size:%" PRId64 " failed, reason:%s", (int64_t)pCol->spaceSize, strerror(errno)); return -1;