Merge pull request #7176 from taosdata/enhance/ljc
enhance: improve inserting logic, handle memory allocation error
This commit is contained in:
commit
d5f10f7ba3
|
@ -328,11 +328,10 @@ static FORCE_INLINE void dataColReset(SDataCol *pDataCol) { pDataCol->len = 0; }
|
||||||
int tdAllocMemForCol(SDataCol *pCol, int maxPoints);
|
int tdAllocMemForCol(SDataCol *pCol, int maxPoints);
|
||||||
|
|
||||||
void dataColInit(SDataCol *pDataCol, STColumn *pCol, int maxPoints);
|
void dataColInit(SDataCol *pDataCol, STColumn *pCol, int maxPoints);
|
||||||
void dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxPoints);
|
int dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxPoints);
|
||||||
void dataColSetOffset(SDataCol *pCol, int nEle);
|
void dataColSetOffset(SDataCol *pCol, int nEle);
|
||||||
|
|
||||||
bool isNEleNull(SDataCol *pCol, int nEle);
|
bool isNEleNull(SDataCol *pCol, int nEle);
|
||||||
void dataColSetNEleNull(SDataCol *pCol, int nEle, int maxPoints);
|
|
||||||
|
|
||||||
// Get the data pointer from a column-wised data
|
// Get the data pointer from a column-wised data
|
||||||
static FORCE_INLINE const void *tdGetColDataOfRow(SDataCol *pCol, int row) {
|
static FORCE_INLINE const void *tdGetColDataOfRow(SDataCol *pCol, int row) {
|
||||||
|
@ -357,13 +356,11 @@ static FORCE_INLINE int32_t dataColGetNEleLen(SDataCol *pDataCol, int rows) {
|
||||||
}
|
}
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int maxRowSize;
|
int maxCols; // max number of columns
|
||||||
int maxCols; // max number of columns
|
int maxPoints; // max number of points
|
||||||
int maxPoints; // max number of points
|
int numOfRows;
|
||||||
|
int numOfCols; // Total number of cols
|
||||||
int numOfRows;
|
int sversion; // TODO: set sversion
|
||||||
int numOfCols; // Total number of cols
|
|
||||||
int sversion; // TODO: set sversion
|
|
||||||
SDataCol *cols;
|
SDataCol *cols;
|
||||||
} SDataCols;
|
} SDataCols;
|
||||||
|
|
||||||
|
@ -407,7 +404,7 @@ static FORCE_INLINE TSKEY dataColsKeyLast(SDataCols *pCols) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows);
|
SDataCols *tdNewDataCols(int maxCols, int maxRows);
|
||||||
void tdResetDataCols(SDataCols *pCols);
|
void tdResetDataCols(SDataCols *pCols);
|
||||||
int tdInitDataCols(SDataCols *pCols, STSchema *pSchema);
|
int tdInitDataCols(SDataCols *pCols, STSchema *pSchema);
|
||||||
SDataCols *tdDupDataCols(SDataCols *pCols, bool keepData);
|
SDataCols *tdDupDataCols(SDataCols *pCols, bool keepData);
|
||||||
|
|
|
@ -19,10 +19,10 @@
|
||||||
#include "wchar.h"
|
#include "wchar.h"
|
||||||
#include "tarray.h"
|
#include "tarray.h"
|
||||||
|
|
||||||
|
static void dataColSetNEleNull(SDataCol *pCol, int nEle);
|
||||||
static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2,
|
static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2,
|
||||||
int limit2, int tRows, bool forceSetNull);
|
int limit2, int tRows, bool forceSetNull);
|
||||||
|
|
||||||
//TODO: change caller to use return val
|
|
||||||
int tdAllocMemForCol(SDataCol *pCol, int maxPoints) {
|
int tdAllocMemForCol(SDataCol *pCol, int maxPoints) {
|
||||||
int spaceNeeded = pCol->bytes * maxPoints;
|
int spaceNeeded = pCol->bytes * maxPoints;
|
||||||
if(IS_VAR_DATA_TYPE(pCol->type)) {
|
if(IS_VAR_DATA_TYPE(pCol->type)) {
|
||||||
|
@ -31,7 +31,7 @@ int tdAllocMemForCol(SDataCol *pCol, int maxPoints) {
|
||||||
if(pCol->spaceSize < spaceNeeded) {
|
if(pCol->spaceSize < spaceNeeded) {
|
||||||
void* ptr = realloc(pCol->pData, spaceNeeded);
|
void* ptr = realloc(pCol->pData, spaceNeeded);
|
||||||
if(ptr == NULL) {
|
if(ptr == NULL) {
|
||||||
uDebug("malloc failure, size:%" PRId64 " failed, reason:%s", (int64_t)pCol->spaceSize,
|
uDebug("malloc failure, size:%" PRId64 " failed, reason:%s", (int64_t)spaceNeeded,
|
||||||
strerror(errno));
|
strerror(errno));
|
||||||
return -1;
|
return -1;
|
||||||
} else {
|
} else {
|
||||||
|
@ -239,20 +239,19 @@ void dataColInit(SDataCol *pDataCol, STColumn *pCol, int maxPoints) {
|
||||||
pDataCol->len = 0;
|
pDataCol->len = 0;
|
||||||
}
|
}
|
||||||
// value from timestamp should be TKEY here instead of TSKEY
|
// value from timestamp should be TKEY here instead of TSKEY
|
||||||
void dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxPoints) {
|
int dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxPoints) {
|
||||||
ASSERT(pCol != NULL && value != NULL);
|
ASSERT(pCol != NULL && value != NULL);
|
||||||
|
|
||||||
if (isAllRowsNull(pCol)) {
|
if (isAllRowsNull(pCol)) {
|
||||||
if (isNull(value, pCol->type)) {
|
if (isNull(value, pCol->type)) {
|
||||||
// all null value yet, just return
|
// all null value yet, just return
|
||||||
return;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if(tdAllocMemForCol(pCol, maxPoints) < 0) return -1;
|
||||||
if (numOfRows > 0) {
|
if (numOfRows > 0) {
|
||||||
// Find the first not null value, fill all previouse values as NULL
|
// Find the first not null value, fill all previouse values as NULL
|
||||||
dataColSetNEleNull(pCol, numOfRows, maxPoints);
|
dataColSetNEleNull(pCol, numOfRows);
|
||||||
} else {
|
|
||||||
tdAllocMemForCol(pCol, maxPoints);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -268,12 +267,21 @@ void dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxP
|
||||||
memcpy(POINTER_SHIFT(pCol->pData, pCol->len), value, pCol->bytes);
|
memcpy(POINTER_SHIFT(pCol->pData, pCol->len), value, pCol->bytes);
|
||||||
pCol->len += pCol->bytes;
|
pCol->len += pCol->bytes;
|
||||||
}
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static FORCE_INLINE const void *tdGetColDataOfRowUnsafe(SDataCol *pCol, int row) {
|
||||||
|
if (IS_VAR_DATA_TYPE(pCol->type)) {
|
||||||
|
return POINTER_SHIFT(pCol->pData, pCol->dataOff[row]);
|
||||||
|
} else {
|
||||||
|
return POINTER_SHIFT(pCol->pData, TYPE_BYTES[pCol->type] * row);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
bool isNEleNull(SDataCol *pCol, int nEle) {
|
bool isNEleNull(SDataCol *pCol, int nEle) {
|
||||||
if(isAllRowsNull(pCol)) return true;
|
if(isAllRowsNull(pCol)) return true;
|
||||||
for (int i = 0; i < nEle; i++) {
|
for (int i = 0; i < nEle; i++) {
|
||||||
if (!isNull(tdGetColDataOfRow(pCol, i), pCol->type)) return false;
|
if (!isNull(tdGetColDataOfRowUnsafe(pCol, i), pCol->type)) return false;
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -290,9 +298,7 @@ static FORCE_INLINE void dataColSetNullAt(SDataCol *pCol, int index) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void dataColSetNEleNull(SDataCol *pCol, int nEle, int maxPoints) {
|
static void dataColSetNEleNull(SDataCol *pCol, int nEle) {
|
||||||
tdAllocMemForCol(pCol, maxPoints);
|
|
||||||
|
|
||||||
if (IS_VAR_DATA_TYPE(pCol->type)) {
|
if (IS_VAR_DATA_TYPE(pCol->type)) {
|
||||||
pCol->len = 0;
|
pCol->len = 0;
|
||||||
for (int i = 0; i < nEle; i++) {
|
for (int i = 0; i < nEle; i++) {
|
||||||
|
@ -318,7 +324,7 @@ void dataColSetOffset(SDataCol *pCol, int nEle) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows) {
|
SDataCols *tdNewDataCols(int maxCols, int maxRows) {
|
||||||
SDataCols *pCols = (SDataCols *)calloc(1, sizeof(SDataCols));
|
SDataCols *pCols = (SDataCols *)calloc(1, sizeof(SDataCols));
|
||||||
if (pCols == NULL) {
|
if (pCols == NULL) {
|
||||||
uDebug("malloc failure, size:%" PRId64 " failed, reason:%s", (int64_t)sizeof(SDataCols), strerror(errno));
|
uDebug("malloc failure, size:%" PRId64 " failed, reason:%s", (int64_t)sizeof(SDataCols), strerror(errno));
|
||||||
|
@ -326,6 +332,9 @@ SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows) {
|
||||||
}
|
}
|
||||||
|
|
||||||
pCols->maxPoints = maxRows;
|
pCols->maxPoints = maxRows;
|
||||||
|
pCols->maxCols = maxCols;
|
||||||
|
pCols->numOfRows = 0;
|
||||||
|
pCols->numOfCols = 0;
|
||||||
|
|
||||||
if (maxCols > 0) {
|
if (maxCols > 0) {
|
||||||
pCols->cols = (SDataCol *)calloc(maxCols, sizeof(SDataCol));
|
pCols->cols = (SDataCol *)calloc(maxCols, sizeof(SDataCol));
|
||||||
|
@ -342,13 +351,8 @@ SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows) {
|
||||||
pCols->cols[i].pData = NULL;
|
pCols->cols[i].pData = NULL;
|
||||||
pCols->cols[i].dataOff = NULL;
|
pCols->cols[i].dataOff = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
pCols->maxCols = maxCols;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pCols->maxRowSize = maxRowSize;
|
|
||||||
|
|
||||||
|
|
||||||
return pCols;
|
return pCols;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -357,8 +361,9 @@ int tdInitDataCols(SDataCols *pCols, STSchema *pSchema) {
|
||||||
int oldMaxCols = pCols->maxCols;
|
int oldMaxCols = pCols->maxCols;
|
||||||
if (schemaNCols(pSchema) > oldMaxCols) {
|
if (schemaNCols(pSchema) > oldMaxCols) {
|
||||||
pCols->maxCols = schemaNCols(pSchema);
|
pCols->maxCols = schemaNCols(pSchema);
|
||||||
pCols->cols = (SDataCol *)realloc(pCols->cols, sizeof(SDataCol) * pCols->maxCols);
|
void* ptr = (SDataCol *)realloc(pCols->cols, sizeof(SDataCol) * pCols->maxCols);
|
||||||
if (pCols->cols == NULL) return -1;
|
if (ptr == NULL) return -1;
|
||||||
|
pCols->cols = ptr;
|
||||||
for(i = oldMaxCols; i < pCols->maxCols; i++) {
|
for(i = oldMaxCols; i < pCols->maxCols; i++) {
|
||||||
pCols->cols[i].pData = NULL;
|
pCols->cols[i].pData = NULL;
|
||||||
pCols->cols[i].dataOff = NULL;
|
pCols->cols[i].dataOff = NULL;
|
||||||
|
@ -366,10 +371,6 @@ int tdInitDataCols(SDataCols *pCols, STSchema *pSchema) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (schemaTLen(pSchema) > pCols->maxRowSize) {
|
|
||||||
pCols->maxRowSize = schemaTLen(pSchema);
|
|
||||||
}
|
|
||||||
|
|
||||||
tdResetDataCols(pCols);
|
tdResetDataCols(pCols);
|
||||||
pCols->numOfCols = schemaNCols(pSchema);
|
pCols->numOfCols = schemaNCols(pSchema);
|
||||||
|
|
||||||
|
@ -398,7 +399,7 @@ SDataCols *tdFreeDataCols(SDataCols *pCols) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SDataCols *tdDupDataCols(SDataCols *pDataCols, bool keepData) {
|
SDataCols *tdDupDataCols(SDataCols *pDataCols, bool keepData) {
|
||||||
SDataCols *pRet = tdNewDataCols(pDataCols->maxRowSize, pDataCols->maxCols, pDataCols->maxPoints);
|
SDataCols *pRet = tdNewDataCols(pDataCols->maxCols, pDataCols->maxPoints);
|
||||||
if (pRet == NULL) return NULL;
|
if (pRet == NULL) return NULL;
|
||||||
|
|
||||||
pRet->numOfCols = pDataCols->numOfCols;
|
pRet->numOfCols = pDataCols->numOfCols;
|
||||||
|
@ -413,7 +414,10 @@ SDataCols *tdDupDataCols(SDataCols *pDataCols, bool keepData) {
|
||||||
|
|
||||||
if (keepData) {
|
if (keepData) {
|
||||||
if (pDataCols->cols[i].len > 0) {
|
if (pDataCols->cols[i].len > 0) {
|
||||||
tdAllocMemForCol(&pRet->cols[i], pRet->maxPoints);
|
if(tdAllocMemForCol(&pRet->cols[i], pRet->maxPoints) < 0) {
|
||||||
|
tdFreeDataCols(pRet);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
pRet->cols[i].len = pDataCols->cols[i].len;
|
pRet->cols[i].len = pDataCols->cols[i].len;
|
||||||
memcpy(pRet->cols[i].pData, pDataCols->cols[i].pData, pDataCols->cols[i].len);
|
memcpy(pRet->cols[i].pData, pDataCols->cols[i].pData, pDataCols->cols[i].len);
|
||||||
if (IS_VAR_DATA_TYPE(pRet->cols[i].type)) {
|
if (IS_VAR_DATA_TYPE(pRet->cols[i].type)) {
|
||||||
|
@ -584,9 +588,12 @@ static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, i
|
||||||
if ((key1 > key2) || (key1 == key2 && !TKEY_IS_DELETED(tkey2))) {
|
if ((key1 > key2) || (key1 == key2 && !TKEY_IS_DELETED(tkey2))) {
|
||||||
for (int i = 0; i < src2->numOfCols; i++) {
|
for (int i = 0; i < src2->numOfCols; i++) {
|
||||||
ASSERT(target->cols[i].type == src2->cols[i].type);
|
ASSERT(target->cols[i].type == src2->cols[i].type);
|
||||||
if (src2->cols[i].len > 0 && (forceSetNull || (!forceSetNull && !isNull(src2->cols[i].pData, src2->cols[i].type)))) {
|
if (src2->cols[i].len > 0 && !isNull(src2->cols[i].pData, src2->cols[i].type)) {
|
||||||
dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src2->cols + i, *iter2), target->numOfRows,
|
dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src2->cols + i, *iter2), target->numOfRows,
|
||||||
target->maxPoints);
|
target->maxPoints);
|
||||||
|
} else if(!forceSetNull && key1 == key2 && src1->cols[i].len > 0) {
|
||||||
|
dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src1->cols + i, *iter1), target->numOfRows,
|
||||||
|
target->maxPoints);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
target->numOfRows++;
|
target->numOfRows++;
|
||||||
|
|
|
@ -722,7 +722,7 @@ static int tsdbInitCommitH(SCommitH *pCommith, STsdbRepo *pRepo) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
pCommith->pDataCols = tdNewDataCols(0, 0, pCfg->maxRowsPerFileBlock);
|
pCommith->pDataCols = tdNewDataCols(0, pCfg->maxRowsPerFileBlock);
|
||||||
if (pCommith->pDataCols == NULL) {
|
if (pCommith->pDataCols == NULL) {
|
||||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||||
tsdbDestroyCommitH(pCommith);
|
tsdbDestroyCommitH(pCommith);
|
||||||
|
@ -920,7 +920,6 @@ int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDataCo
|
||||||
SDataCol * pDataCol = pDataCols->cols + ncol;
|
SDataCol * pDataCol = pDataCols->cols + ncol;
|
||||||
SBlockCol *pBlockCol = pBlockData->cols + nColsNotAllNull;
|
SBlockCol *pBlockCol = pBlockData->cols + nColsNotAllNull;
|
||||||
|
|
||||||
// if (isNEleNull(pDataCol, rowsToWrite)) { // all data to commit are NULL, just ignore it
|
|
||||||
if (isAllRowsNull(pDataCol)) { // all data to commit are NULL, just ignore it
|
if (isAllRowsNull(pDataCol)) { // all data to commit are NULL, just ignore it
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -1277,6 +1276,7 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt
|
||||||
|
|
||||||
if (key1 < key2) {
|
if (key1 < key2) {
|
||||||
for (int i = 0; i < pDataCols->numOfCols; i++) {
|
for (int i = 0; i < pDataCols->numOfCols; i++) {
|
||||||
|
//TODO: dataColAppendVal may fail
|
||||||
dataColAppendVal(pTarget->cols + i, tdGetColDataOfRow(pDataCols->cols + i, *iter), pTarget->numOfRows,
|
dataColAppendVal(pTarget->cols + i, tdGetColDataOfRow(pDataCols->cols + i, *iter), pTarget->numOfRows,
|
||||||
pTarget->maxPoints);
|
pTarget->maxPoints);
|
||||||
}
|
}
|
||||||
|
@ -1308,6 +1308,7 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt
|
||||||
ASSERT(!isRowDel);
|
ASSERT(!isRowDel);
|
||||||
|
|
||||||
for (int i = 0; i < pDataCols->numOfCols; i++) {
|
for (int i = 0; i < pDataCols->numOfCols; i++) {
|
||||||
|
//TODO: dataColAppendVal may fail
|
||||||
dataColAppendVal(pTarget->cols + i, tdGetColDataOfRow(pDataCols->cols + i, *iter), pTarget->numOfRows,
|
dataColAppendVal(pTarget->cols + i, tdGetColDataOfRow(pDataCols->cols + i, *iter), pTarget->numOfRows,
|
||||||
pTarget->maxPoints);
|
pTarget->maxPoints);
|
||||||
}
|
}
|
||||||
|
|
|
@ -296,7 +296,7 @@ static int tsdbCompactMeta(STsdbRepo *pRepo) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
pComph->pDataCols = tdNewDataCols(0, 0, pCfg->maxRowsPerFileBlock);
|
pComph->pDataCols = tdNewDataCols(0, pCfg->maxRowsPerFileBlock);
|
||||||
if (pComph->pDataCols == NULL) {
|
if (pComph->pDataCols == NULL) {
|
||||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||||
tsdbDestroyCompactH(pComph);
|
tsdbDestroyCompactH(pComph);
|
||||||
|
|
|
@ -702,11 +702,12 @@ static int tsdbScanAndConvertSubmitMsg(STsdbRepo *pRepo, SSubmitMsg *pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
//row1 has higher priority
|
//row1 has higher priority
|
||||||
static SMemRow tsdbInsertDupKeyMerge(SMemRow row1, SMemRow row2, STsdbRepo* pRepo, STSchema **ppSchema1, STSchema **ppSchema2, STable* pTable, int32_t* pAffectedRows, int64_t* pPoints, SMemRow* pLastRow) {
|
static SMemRow tsdbInsertDupKeyMerge(SMemRow row1, SMemRow row2, STsdbRepo* pRepo,
|
||||||
|
STSchema **ppSchema1, STSchema **ppSchema2,
|
||||||
|
STable* pTable, int32_t* pPoints, SMemRow* pLastRow) {
|
||||||
|
|
||||||
//for compatiblity, duplicate key inserted when update=0 should be also calculated as affected rows!
|
//for compatiblity, duplicate key inserted when update=0 should be also calculated as affected rows!
|
||||||
if(row1 == NULL && row2 == NULL && pRepo->config.update == TD_ROW_DISCARD_UPDATE) {
|
if(row1 == NULL && row2 == NULL && pRepo->config.update == TD_ROW_DISCARD_UPDATE) {
|
||||||
(*pAffectedRows)++;
|
|
||||||
(*pPoints)++;
|
(*pPoints)++;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -715,7 +716,6 @@ static SMemRow tsdbInsertDupKeyMerge(SMemRow row1, SMemRow row2, STsdbRepo* pRep
|
||||||
void* pMem = tsdbAllocBytes(pRepo, memRowTLen(row1));
|
void* pMem = tsdbAllocBytes(pRepo, memRowTLen(row1));
|
||||||
if(pMem == NULL) return NULL;
|
if(pMem == NULL) return NULL;
|
||||||
memRowCpy(pMem, row1);
|
memRowCpy(pMem, row1);
|
||||||
(*pAffectedRows)++;
|
|
||||||
(*pPoints)++;
|
(*pPoints)++;
|
||||||
*pLastRow = pMem;
|
*pLastRow = pMem;
|
||||||
return pMem;
|
return pMem;
|
||||||
|
@ -750,18 +750,16 @@ static SMemRow tsdbInsertDupKeyMerge(SMemRow row1, SMemRow row2, STsdbRepo* pRep
|
||||||
if(pMem == NULL) return NULL;
|
if(pMem == NULL) return NULL;
|
||||||
memRowCpy(pMem, tmp);
|
memRowCpy(pMem, tmp);
|
||||||
|
|
||||||
(*pAffectedRows)++;
|
|
||||||
(*pPoints)++;
|
(*pPoints)++;
|
||||||
|
|
||||||
*pLastRow = pMem;
|
*pLastRow = pMem;
|
||||||
return pMem;
|
return pMem;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void* tsdbInsertDupKeyMergePacked(void** args) {
|
static void* tsdbInsertDupKeyMergePacked(void** args) {
|
||||||
return tsdbInsertDupKeyMerge(args[0], args[1], args[2], (STSchema**)&args[3], (STSchema**)&args[4], args[5], args[6], args[7], args[8]);
|
return tsdbInsertDupKeyMerge(args[0], args[1], args[2], (STSchema**)&args[3], (STSchema**)&args[4], args[5], args[6], args[7]);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void tsdbSetupSkipListHookFns(SSkipList* pSkipList, STsdbRepo *pRepo, STable *pTable, int32_t* pAffectedRows, int64_t* pPoints, SMemRow* pLastRow) {
|
static void tsdbSetupSkipListHookFns(SSkipList* pSkipList, STsdbRepo *pRepo, STable *pTable, int32_t* pPoints, SMemRow* pLastRow) {
|
||||||
|
|
||||||
if(pSkipList->insertHandleFn == NULL) {
|
if(pSkipList->insertHandleFn == NULL) {
|
||||||
tGenericSavedFunc *dupHandleSavedFunc = genericSavedFuncInit((GenericVaFunc)&tsdbInsertDupKeyMergePacked, 9);
|
tGenericSavedFunc *dupHandleSavedFunc = genericSavedFuncInit((GenericVaFunc)&tsdbInsertDupKeyMergePacked, 9);
|
||||||
|
@ -769,17 +767,16 @@ static void tsdbSetupSkipListHookFns(SSkipList* pSkipList, STsdbRepo *pRepo, STa
|
||||||
dupHandleSavedFunc->args[3] = NULL;
|
dupHandleSavedFunc->args[3] = NULL;
|
||||||
dupHandleSavedFunc->args[4] = NULL;
|
dupHandleSavedFunc->args[4] = NULL;
|
||||||
dupHandleSavedFunc->args[5] = pTable;
|
dupHandleSavedFunc->args[5] = pTable;
|
||||||
dupHandleSavedFunc->args[6] = pAffectedRows;
|
|
||||||
dupHandleSavedFunc->args[7] = pPoints;
|
|
||||||
dupHandleSavedFunc->args[8] = pLastRow;
|
|
||||||
pSkipList->insertHandleFn = dupHandleSavedFunc;
|
pSkipList->insertHandleFn = dupHandleSavedFunc;
|
||||||
}
|
}
|
||||||
|
pSkipList->insertHandleFn->args[6] = pPoints;
|
||||||
|
pSkipList->insertHandleFn->args[7] = pLastRow;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int tsdbInsertDataToTable(STsdbRepo* pRepo, SSubmitBlk* pBlock, int32_t *pAffectedRows) {
|
static int tsdbInsertDataToTable(STsdbRepo* pRepo, SSubmitBlk* pBlock, int32_t *pAffectedRows) {
|
||||||
|
|
||||||
STsdbMeta *pMeta = pRepo->tsdbMeta;
|
STsdbMeta *pMeta = pRepo->tsdbMeta;
|
||||||
int64_t points = 0;
|
int32_t points = 0;
|
||||||
STable *pTable = NULL;
|
STable *pTable = NULL;
|
||||||
SSubmitBlkIter blkIter = {0};
|
SSubmitBlkIter blkIter = {0};
|
||||||
SMemTable *pMemTable = NULL;
|
SMemTable *pMemTable = NULL;
|
||||||
|
@ -830,9 +827,10 @@ static int tsdbInsertDataToTable(STsdbRepo* pRepo, SSubmitBlk* pBlock, int32_t *
|
||||||
|
|
||||||
SMemRow lastRow = NULL;
|
SMemRow lastRow = NULL;
|
||||||
int64_t osize = SL_SIZE(pTableData->pData);
|
int64_t osize = SL_SIZE(pTableData->pData);
|
||||||
tsdbSetupSkipListHookFns(pTableData->pData, pRepo, pTable, pAffectedRows, &points, &lastRow);
|
tsdbSetupSkipListHookFns(pTableData->pData, pRepo, pTable, &points, &lastRow);
|
||||||
tSkipListPutBatchByIter(pTableData->pData, &blkIter, (iter_next_fn_t)tsdbGetSubmitBlkNext);
|
tSkipListPutBatchByIter(pTableData->pData, &blkIter, (iter_next_fn_t)tsdbGetSubmitBlkNext);
|
||||||
int64_t dsize = SL_SIZE(pTableData->pData) - osize;
|
int64_t dsize = SL_SIZE(pTableData->pData) - osize;
|
||||||
|
(*pAffectedRows) += points;
|
||||||
|
|
||||||
|
|
||||||
if(lastRow != NULL) {
|
if(lastRow != NULL) {
|
||||||
|
|
|
@ -17,7 +17,6 @@
|
||||||
#define TSDB_SUPER_TABLE_SL_LEVEL 5
|
#define TSDB_SUPER_TABLE_SL_LEVEL 5
|
||||||
#define DEFAULT_TAG_INDEX_COLUMN 0
|
#define DEFAULT_TAG_INDEX_COLUMN 0
|
||||||
|
|
||||||
static int tsdbCompareSchemaVersion(const void *key1, const void *key2);
|
|
||||||
static char * getTagIndexKey(const void *pData);
|
static char * getTagIndexKey(const void *pData);
|
||||||
static STable *tsdbNewTable();
|
static STable *tsdbNewTable();
|
||||||
static STable *tsdbCreateTableFromCfg(STableCfg *pCfg, bool isSuper, STable *pSTable);
|
static STable *tsdbCreateTableFromCfg(STableCfg *pCfg, bool isSuper, STable *pSTable);
|
||||||
|
|
|
@ -466,7 +466,7 @@ static STsdbQueryHandle* tsdbQueryTablesImpl(STsdbRepo* tsdb, STsdbQueryCond* pC
|
||||||
STsdbMeta* pMeta = tsdbGetMeta(tsdb);
|
STsdbMeta* pMeta = tsdbGetMeta(tsdb);
|
||||||
assert(pMeta != NULL);
|
assert(pMeta != NULL);
|
||||||
|
|
||||||
pQueryHandle->pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pQueryHandle->pTsdb->config.maxRowsPerFileBlock);
|
pQueryHandle->pDataCols = tdNewDataCols(pMeta->maxCols, pQueryHandle->pTsdb->config.maxRowsPerFileBlock);
|
||||||
if (pQueryHandle->pDataCols == NULL) {
|
if (pQueryHandle->pDataCols == NULL) {
|
||||||
tsdbError("%p failed to malloc buf for pDataCols, %"PRIu64, pQueryHandle, pQueryHandle->qId);
|
tsdbError("%p failed to malloc buf for pDataCols, %"PRIu64, pQueryHandle, pQueryHandle->qId);
|
||||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||||
|
@ -1446,7 +1446,7 @@ static int doBinarySearchKey(char* pValue, int num, TSKEY key, int order) {
|
||||||
return midPos;
|
return midPos;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t doCopyRowsFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t capacity, int32_t numOfRows, int32_t start, int32_t end) {
|
static int32_t doCopyRowsFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t capacity, int32_t numOfRows, int32_t start, int32_t end) {
|
||||||
char* pData = NULL;
|
char* pData = NULL;
|
||||||
int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1 : -1;
|
int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1 : -1;
|
||||||
|
|
||||||
|
@ -1481,7 +1481,7 @@ int32_t doCopyRowsFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t capacity
|
||||||
pData = (char*)pColInfo->pData + (capacity - numOfRows - num) * pColInfo->info.bytes;
|
pData = (char*)pColInfo->pData + (capacity - numOfRows - num) * pColInfo->info.bytes;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pColInfo->info.colId == src->colId) {
|
if (!isAllRowsNull(src) && pColInfo->info.colId == src->colId) {
|
||||||
if (pColInfo->info.type != TSDB_DATA_TYPE_BINARY && pColInfo->info.type != TSDB_DATA_TYPE_NCHAR) {
|
if (pColInfo->info.type != TSDB_DATA_TYPE_BINARY && pColInfo->info.type != TSDB_DATA_TYPE_NCHAR) {
|
||||||
memmove(pData, (char*)src->pData + bytes * start, bytes * num);
|
memmove(pData, (char*)src->pData + bytes * start, bytes * num);
|
||||||
} else { // handle the var-string
|
} else { // handle the var-string
|
||||||
|
|
|
@ -42,14 +42,14 @@ int tsdbInitReadH(SReadH *pReadh, STsdbRepo *pRepo) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
pReadh->pDCols[0] = tdNewDataCols(0, 0, pCfg->maxRowsPerFileBlock);
|
pReadh->pDCols[0] = tdNewDataCols(0, pCfg->maxRowsPerFileBlock);
|
||||||
if (pReadh->pDCols[0] == NULL) {
|
if (pReadh->pDCols[0] == NULL) {
|
||||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||||
tsdbDestroyReadH(pReadh);
|
tsdbDestroyReadH(pReadh);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
pReadh->pDCols[1] = tdNewDataCols(0, 0, pCfg->maxRowsPerFileBlock);
|
pReadh->pDCols[1] = tdNewDataCols(0, pCfg->maxRowsPerFileBlock);
|
||||||
if (pReadh->pDCols[1] == NULL) {
|
if (pReadh->pDCols[1] == NULL) {
|
||||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||||
tsdbDestroyReadH(pReadh);
|
tsdbDestroyReadH(pReadh);
|
||||||
|
@ -463,7 +463,7 @@ static int tsdbLoadBlockDataImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDat
|
||||||
SDataCol *pDataCol = &(pDataCols->cols[dcol]);
|
SDataCol *pDataCol = &(pDataCols->cols[dcol]);
|
||||||
if (dcol != 0 && ccol >= pBlockData->numOfCols) {
|
if (dcol != 0 && ccol >= pBlockData->numOfCols) {
|
||||||
// Set current column as NULL and forward
|
// Set current column as NULL and forward
|
||||||
dataColSetNEleNull(pDataCol, pBlock->numOfRows, pDataCols->maxPoints);
|
dataColReset(pDataCol);
|
||||||
dcol++;
|
dcol++;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -503,7 +503,7 @@ static int tsdbLoadBlockDataImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDat
|
||||||
ccol++;
|
ccol++;
|
||||||
} else {
|
} else {
|
||||||
// Set current column as NULL and forward
|
// Set current column as NULL and forward
|
||||||
dataColSetNEleNull(pDataCol, pBlock->numOfRows, pDataCols->maxPoints);
|
dataColReset(pDataCol);
|
||||||
dcol++;
|
dcol++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -608,7 +608,7 @@ static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pBlockCol == NULL) {
|
if (pBlockCol == NULL) {
|
||||||
dataColSetNEleNull(pDataCol, pBlock->numOfRows, pDataCols->maxPoints);
|
dataColReset(pDataCol);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue