naming convention
This commit is contained in:
parent
4a365c0d91
commit
16a05a6914
|
@ -103,7 +103,7 @@ typedef struct SBlockKeyTuple {
|
|||
} SBlockKeyTuple;
|
||||
|
||||
typedef struct SBlockKeyInfo {
|
||||
int32_t nBytesAlloc;
|
||||
int32_t maxBytesAlloc;
|
||||
SBlockKeyTuple* pKeyTuple;
|
||||
} SBlockKeyInfo;
|
||||
|
||||
|
|
|
@ -45,7 +45,7 @@ static int32_t tscAllocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSiz
|
|||
static int32_t parseBoundColumns(SInsertStatementParam *pInsertParam, SParsedDataColInfo *pColInfo, SSchema *pSchema,
|
||||
char *str, char **end);
|
||||
|
||||
static FORCE_INLINE int32_t getPaddingRowSize(STableComInfo *tinfo) {
|
||||
static FORCE_INLINE int32_t getExtendedRowSize(STableComInfo *tinfo) {
|
||||
return tinfo->rowSize + PAYLOAD_HEADER_LEN + PAYLOAD_ID_TYPE_LEN * tinfo->numOfColumns;
|
||||
}
|
||||
int initSMemRowBuilder(SMemRowBuilder *pBuilder, SSchema *pSSchema, uint16_t nCols, uint16_t allNullColsLen) {
|
||||
|
@ -427,17 +427,14 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pri
|
|||
switch (pSchema->type) {
|
||||
case TSDB_DATA_TYPE_BOOL: { // bool
|
||||
if (isNullStr(pToken)) {
|
||||
// *((uint8_t *)payload) = TSDB_DATA_BOOL_NULL;
|
||||
*isColNull = true;
|
||||
} else {
|
||||
if ((pToken->type == TK_BOOL || pToken->type == TK_STRING) && (pToken->n != 0)) {
|
||||
if (strncmp(pToken->z, "true", pToken->n) == 0) {
|
||||
// *(uint8_t *)payload = TSDB_TRUE;
|
||||
*sizeAppend =
|
||||
tsSetColumnValue(payload, pSchema->colId, pSchema->type, &TRUE_VALUE, TYPE_BYTES[TSDB_DATA_TYPE_BOOL]);
|
||||
*kvRowColLen += (TDRowLenT)(sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_BOOL]);
|
||||
} else if (strncmp(pToken->z, "false", pToken->n) == 0) {
|
||||
// *(uint8_t *)payload = TSDB_FALSE;
|
||||
*sizeAppend =
|
||||
tsSetColumnValue(payload, pSchema->colId, pSchema->type, &FALSE_VALUE, TYPE_BYTES[TSDB_DATA_TYPE_BOOL]);
|
||||
*kvRowColLen += (TDRowLenT)(sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_BOOL]);
|
||||
|
@ -446,13 +443,11 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pri
|
|||
}
|
||||
} else if (pToken->type == TK_INTEGER) {
|
||||
iv = strtoll(pToken->z, NULL, 10);
|
||||
// *(uint8_t *)payload = (int8_t)((iv == 0) ? TSDB_FALSE : TSDB_TRUE);
|
||||
*sizeAppend = tsSetColumnValue(payload, pSchema->colId, pSchema->type,
|
||||
((iv == 0) ? &FALSE_VALUE : &TRUE_VALUE), TYPE_BYTES[TSDB_DATA_TYPE_BOOL]);
|
||||
*kvRowColLen += (TDRowLenT)(sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_BOOL]);
|
||||
} else if (pToken->type == TK_FLOAT) {
|
||||
double dv = strtod(pToken->z, NULL);
|
||||
// *(uint8_t *)payload = (int8_t)((dv == 0) ? TSDB_FALSE : TSDB_TRUE);
|
||||
*sizeAppend = tsSetColumnValue(payload, pSchema->colId, pSchema->type,
|
||||
((dv == 0) ? &FALSE_VALUE : &TRUE_VALUE), TYPE_BYTES[TSDB_DATA_TYPE_BOOL]);
|
||||
*kvRowColLen += (TDRowLenT)(sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_BOOL]);
|
||||
|
@ -465,7 +460,6 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pri
|
|||
|
||||
case TSDB_DATA_TYPE_TINYINT:
|
||||
if (isNullStr(pToken)) {
|
||||
// *((uint8_t *)payload) = TSDB_DATA_TINYINT_NULL;
|
||||
*isColNull = true;
|
||||
} else {
|
||||
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true);
|
||||
|
@ -475,7 +469,6 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pri
|
|||
return tscInvalidOperationMsg(msg, "data overflow", pToken->z);
|
||||
}
|
||||
|
||||
// *((uint8_t *)payload) = (uint8_t)iv;
|
||||
uint8_t tmpVal = (uint8_t)iv;
|
||||
*sizeAppend =
|
||||
tsSetColumnValue(payload, pSchema->colId, pSchema->type, &tmpVal, TYPE_BYTES[TSDB_DATA_TYPE_TINYINT]);
|
||||
|
@ -486,7 +479,6 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pri
|
|||
|
||||
case TSDB_DATA_TYPE_UTINYINT:
|
||||
if (isNullStr(pToken)) {
|
||||
// *((uint8_t *)payload) = TSDB_DATA_UTINYINT_NULL;
|
||||
*isColNull = true;
|
||||
} else {
|
||||
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false);
|
||||
|
@ -496,7 +488,6 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pri
|
|||
return tscInvalidOperationMsg(msg, "unsigned tinyint data overflow", pToken->z);
|
||||
}
|
||||
|
||||
// *((uint8_t *)payload) = (uint8_t)iv;
|
||||
uint8_t tmpVal = (uint8_t)iv;
|
||||
*sizeAppend =
|
||||
tsSetColumnValue(payload, pSchema->colId, pSchema->type, &tmpVal, TYPE_BYTES[TSDB_DATA_TYPE_UTINYINT]);
|
||||
|
@ -527,7 +518,6 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pri
|
|||
|
||||
case TSDB_DATA_TYPE_USMALLINT:
|
||||
if (isNullStr(pToken)) {
|
||||
// *((uint16_t *)payload) = TSDB_DATA_USMALLINT_NULL;
|
||||
*isColNull = true;
|
||||
} else {
|
||||
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false);
|
||||
|
@ -537,7 +527,6 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pri
|
|||
return tscInvalidOperationMsg(msg, "unsigned smallint data overflow", pToken->z);
|
||||
}
|
||||
|
||||
// *((uint16_t *)payload) = (uint16_t)iv;
|
||||
uint16_t tmpVal = (uint16_t)iv;
|
||||
*sizeAppend =
|
||||
tsSetColumnValue(payload, pSchema->colId, pSchema->type, &tmpVal, TYPE_BYTES[TSDB_DATA_TYPE_USMALLINT]);
|
||||
|
@ -548,7 +537,6 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pri
|
|||
|
||||
case TSDB_DATA_TYPE_INT:
|
||||
if (isNullStr(pToken)) {
|
||||
// *((int32_t *)payload) = TSDB_DATA_INT_NULL;
|
||||
*isColNull = true;
|
||||
} else {
|
||||
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true);
|
||||
|
@ -558,7 +546,6 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pri
|
|||
return tscInvalidOperationMsg(msg, "int data overflow", pToken->z);
|
||||
}
|
||||
|
||||
// *((int32_t *)payload) = (int32_t)iv;
|
||||
int32_t tmpVal = (int32_t)iv;
|
||||
*sizeAppend = tsSetColumnValue(payload, pSchema->colId, pSchema->type, &tmpVal, TYPE_BYTES[TSDB_DATA_TYPE_INT]);
|
||||
*kvRowColLen += (TDRowLenT)(sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_INT]);
|
||||
|
@ -568,7 +555,6 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pri
|
|||
|
||||
case TSDB_DATA_TYPE_UINT:
|
||||
if (isNullStr(pToken)) {
|
||||
// *((uint32_t *)payload) = TSDB_DATA_UINT_NULL;
|
||||
*isColNull = true;
|
||||
} else {
|
||||
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false);
|
||||
|
@ -578,7 +564,6 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pri
|
|||
return tscInvalidOperationMsg(msg, "unsigned int data overflow", pToken->z);
|
||||
}
|
||||
|
||||
// *((uint32_t *)payload) = (uint32_t)iv;
|
||||
uint32_t tmpVal = (uint32_t)iv;
|
||||
*sizeAppend =
|
||||
tsSetColumnValue(payload, pSchema->colId, pSchema->type, &tmpVal, TYPE_BYTES[TSDB_DATA_TYPE_UINT]);
|
||||
|
@ -589,7 +574,6 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pri
|
|||
|
||||
case TSDB_DATA_TYPE_BIGINT:
|
||||
if (isNullStr(pToken)) {
|
||||
// *((int64_t *)payload) = TSDB_DATA_BIGINT_NULL;
|
||||
*isColNull = true;
|
||||
} else {
|
||||
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true);
|
||||
|
@ -599,8 +583,6 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pri
|
|||
return tscInvalidOperationMsg(msg, "bigint data overflow", pToken->z);
|
||||
}
|
||||
|
||||
// *((int64_t *)payload) = iv;
|
||||
// int64_t tmpVal = (int64_t)iv;
|
||||
*sizeAppend = tsSetColumnValue(payload, pSchema->colId, pSchema->type, &iv, TYPE_BYTES[TSDB_DATA_TYPE_BIGINT]);
|
||||
*kvRowColLen += (TDRowLenT)(sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_BIGINT]);
|
||||
}
|
||||
|
@ -608,7 +590,6 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pri
|
|||
|
||||
case TSDB_DATA_TYPE_UBIGINT:
|
||||
if (isNullStr(pToken)) {
|
||||
// *((uint64_t *)payload) = TSDB_DATA_UBIGINT_NULL;
|
||||
*isColNull = true;
|
||||
} else {
|
||||
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false);
|
||||
|
@ -618,15 +599,15 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pri
|
|||
return tscInvalidOperationMsg(msg, "unsigned bigint data overflow", pToken->z);
|
||||
}
|
||||
|
||||
// *((uint64_t *)payload) = iv;
|
||||
*sizeAppend = tsSetColumnValue(payload, pSchema->colId, pSchema->type, &iv, TYPE_BYTES[TSDB_DATA_TYPE_UBIGINT]);
|
||||
uint64_t tmpVal = (uint64_t)iv;
|
||||
*sizeAppend =
|
||||
tsSetColumnValue(payload, pSchema->colId, pSchema->type, &tmpVal, TYPE_BYTES[TSDB_DATA_TYPE_UBIGINT]);
|
||||
*kvRowColLen += (TDRowLenT)(sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_UBIGINT]);
|
||||
}
|
||||
break;
|
||||
|
||||
case TSDB_DATA_TYPE_FLOAT:
|
||||
if (isNullStr(pToken)) {
|
||||
// *((int32_t *)payload) = TSDB_DATA_FLOAT_NULL;
|
||||
*isColNull = true;
|
||||
} else {
|
||||
double dv;
|
||||
|
@ -639,8 +620,6 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pri
|
|||
return tscInvalidOperationMsg(msg, "illegal float data", pToken->z);
|
||||
}
|
||||
|
||||
// *((float *)payload) = (float)dv;
|
||||
// SET_FLOAT_VAL(payload, dv);
|
||||
float tmpVal = (float)dv;
|
||||
*sizeAppend =
|
||||
tsSetColumnValue(payload, pSchema->colId, pSchema->type, &tmpVal, TYPE_BYTES[TSDB_DATA_TYPE_FLOAT]);
|
||||
|
@ -650,7 +629,6 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pri
|
|||
|
||||
case TSDB_DATA_TYPE_DOUBLE:
|
||||
if (isNullStr(pToken)) {
|
||||
// *((int64_t *)payload) = TSDB_DATA_DOUBLE_NULL;
|
||||
*isColNull = true;
|
||||
} else {
|
||||
double dv;
|
||||
|
@ -662,7 +640,6 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pri
|
|||
return tscInvalidOperationMsg(msg, "illegal double data", pToken->z);
|
||||
}
|
||||
|
||||
// *((double *)payload) = dv;
|
||||
*sizeAppend = tsSetColumnValue(payload, pSchema->colId, pSchema->type, &dv, TYPE_BYTES[TSDB_DATA_TYPE_DOUBLE]);
|
||||
*kvRowColLen += (TDRowLenT)(sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_DOUBLE]);
|
||||
}
|
||||
|
@ -682,43 +659,41 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pri
|
|||
|
||||
payloadColSetId(payload, pSchema->colId);
|
||||
payloadColSetType(payload, pSchema->type);
|
||||
varDataSetLen(payload + PAYLOAD_ID_TYPE_LEN, pToken->n);
|
||||
memcpy(varDataVal(payload + PAYLOAD_ID_TYPE_LEN), pToken->z, pToken->n);
|
||||
*sizeAppend = (TDRowLenT)(PAYLOAD_ID_TYPE_LEN + sizeof(VarDataLenT) + pToken->n);
|
||||
varDataSetLen(payloadColValue(payload), pToken->n);
|
||||
memcpy(varDataVal(payloadColValue(payload)), pToken->z, pToken->n);
|
||||
*sizeAppend = (TDRowLenT)(PAYLOAD_ID_TYPE_LEN + VARSTR_HEADER_SIZE + pToken->n);
|
||||
*dataRowColDeltaLen += (TDRowLenT)(pToken->n - sizeof(uint8_t));
|
||||
*kvRowColLen += (TDRowLenT)(sizeof(SColIdx) + sizeof(VarDataLenT) + pToken->n);
|
||||
*kvRowColLen += (TDRowLenT)(sizeof(SColIdx) + VARSTR_HEADER_SIZE + pToken->n);
|
||||
}
|
||||
|
||||
break;
|
||||
|
||||
case TSDB_DATA_TYPE_NCHAR:
|
||||
if (pToken->type == TK_NULL) {
|
||||
// setVardataNull(payload, TSDB_DATA_TYPE_NCHAR);
|
||||
*isColNull = true;
|
||||
} else {
|
||||
// if the converted output len is over than pColumnModel->bytes, return error: 'Argument list too long'
|
||||
int32_t output = 0;
|
||||
payloadColSetId(payload, pSchema->colId);
|
||||
payloadColSetType(payload, pSchema->type);
|
||||
if (!taosMbsToUcs4(pToken->z, pToken->n, varDataVal(payload + PAYLOAD_ID_TYPE_LEN),
|
||||
if (!taosMbsToUcs4(pToken->z, pToken->n, varDataVal(payloadColValue(payload)),
|
||||
pSchema->bytes - VARSTR_HEADER_SIZE, &output)) {
|
||||
char buf[512] = {0};
|
||||
snprintf(buf, tListLen(buf), "%s", strerror(errno));
|
||||
return tscInvalidOperationMsg(msg, buf, pToken->z);
|
||||
}
|
||||
|
||||
varDataSetLen(payload + PAYLOAD_ID_TYPE_LEN, output);
|
||||
varDataSetLen(payloadColValue(payload), output);
|
||||
|
||||
*sizeAppend = (TDRowLenT)(PAYLOAD_ID_TYPE_LEN + sizeof(VarDataLenT) + output);
|
||||
*sizeAppend = (TDRowLenT)(PAYLOAD_ID_TYPE_LEN + VARSTR_HEADER_SIZE + output);
|
||||
*dataRowColDeltaLen += (TDRowLenT)(output - sizeof(uint32_t));
|
||||
*kvRowColLen += (TDRowLenT)(sizeof(SColIdx) + sizeof(VarDataLenT) + output);
|
||||
*kvRowColLen += (TDRowLenT)(sizeof(SColIdx) + VARSTR_HEADER_SIZE + output);
|
||||
}
|
||||
break;
|
||||
|
||||
case TSDB_DATA_TYPE_TIMESTAMP: {
|
||||
if (pToken->type == TK_NULL) {
|
||||
if (primaryKey) {
|
||||
// *((int64_t *)payload) = 0;
|
||||
// When building SKVRow primaryKey, we should not skip even with NULL value.
|
||||
int64_t tmpVal = 0;
|
||||
|
||||
|
@ -726,7 +701,6 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pri
|
|||
TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP]);
|
||||
*kvRowColLen += (TDRowLenT)(sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP]);
|
||||
} else {
|
||||
// *((int64_t *)payload) = TSDB_DATA_BIGINT_NULL;
|
||||
*isColNull = true;
|
||||
}
|
||||
} else {
|
||||
|
@ -735,8 +709,6 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pri
|
|||
return tscInvalidOperationMsg(msg, "invalid timestamp", pToken->z);
|
||||
}
|
||||
|
||||
// *((int64_t *)payload) = tmpVal;
|
||||
|
||||
*sizeAppend = tsSetColumnValue(primaryKey ? primaryKeyStart : payload, pSchema->colId, pSchema->type, &tmpVal,
|
||||
TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP]);
|
||||
*kvRowColLen += (TDRowLenT)(sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP]);
|
||||
|
@ -803,8 +775,10 @@ int tsParseOneRow(char **str, STableDataBlocks *pDataBlocks, int16_t timePrec, i
|
|||
TDRowLenT dataRowLen = pBuilder->allNullLen;
|
||||
TDRowLenT kvRowLen = TD_MEM_ROW_KV_VER_SIZE;
|
||||
|
||||
char *kvPrimayKeyStart = payload + PAYLOAD_HEADER_LEN;
|
||||
char *kvStart = kvPrimayKeyStart + PLAYLOAD_PRIMARY_COL_LEN; // make sure 1st column tuple is primaryKey
|
||||
ASSERT(dataRowLen > 0);
|
||||
|
||||
char *kvPrimayKeyStart = payload + PAYLOAD_HEADER_LEN; // primaryKey in 1st column tuple
|
||||
char *kvStart = kvPrimayKeyStart + PAYLOAD_PRIMARY_COL_LEN; // the column tuple behind the primaryKey
|
||||
|
||||
for (int i = 0; i < spd->numOfBound; ++i) {
|
||||
// the start position in data block buffer of current value in sql
|
||||
|
@ -882,8 +856,7 @@ int tsParseOneRow(char **str, STableDataBlocks *pDataBlocks, int16_t timePrec, i
|
|||
return ret;
|
||||
}
|
||||
|
||||
if (isPrimaryKey &&
|
||||
tsCheckTimestamp(pDataBlocks, payloadKeyAddr(kvStart)) != TSDB_CODE_SUCCESS) {
|
||||
if (isPrimaryKey && tsCheckTimestamp(pDataBlocks, payloadColValue(kvPrimayKeyStart)) != TSDB_CODE_SUCCESS) {
|
||||
tscInvalidOperationMsg(pInsertParam->msg, "client time/server time can not be mixed up", sToken.z);
|
||||
return TSDB_CODE_TSC_INVALID_TIME_STAMP;
|
||||
}
|
||||
|
@ -904,7 +877,6 @@ int tsParseOneRow(char **str, STableDataBlocks *pDataBlocks, int16_t timePrec, i
|
|||
payloadSetType(payload, SMEM_ROW_DATA);
|
||||
}
|
||||
|
||||
*(uint16_t *)(payload + sizeof(uint8_t)) = nColsNotNull;
|
||||
*len = PAYLOAD_HEADER_LEN + rowSizeAppended;
|
||||
|
||||
payloadSetNCols(payload, nColsNotNull);
|
||||
|
@ -938,7 +910,7 @@ int32_t tsParseValues(char **str, STableDataBlocks *pDataBlock, int maxRows, SIn
|
|||
|
||||
int32_t precision = tinfo.precision;
|
||||
|
||||
int32_t rowSizeWithColIdType = getPaddingRowSize(&tinfo);
|
||||
int32_t extendedRowSize = getExtendedRowSize(&tinfo);
|
||||
|
||||
initSMemRowBuilder(&pDataBlock->rowBuilder, tscGetTableSchema(pDataBlock->pTableMeta),
|
||||
tscGetNumOfColumns(pDataBlock->pTableMeta), 0);
|
||||
|
@ -949,9 +921,9 @@ int32_t tsParseValues(char **str, STableDataBlocks *pDataBlock, int maxRows, SIn
|
|||
if (sToken.n == 0 || sToken.type != TK_LP) break;
|
||||
|
||||
*str += index;
|
||||
if ((*numOfRows) >= maxRows || pDataBlock->size + rowSizeWithColIdType >= pDataBlock->nAllocSize) {
|
||||
if ((*numOfRows) >= maxRows || pDataBlock->size + extendedRowSize >= pDataBlock->nAllocSize) {
|
||||
int32_t tSize;
|
||||
code = tscAllocateMemIfNeed(pDataBlock, rowSizeWithColIdType, &tSize);
|
||||
code = tscAllocateMemIfNeed(pDataBlock, extendedRowSize, &tSize);
|
||||
if (code != TSDB_CODE_SUCCESS) { //TODO pass the correct error code to client
|
||||
strcpy(pInsertParam->msg, "client out of memory");
|
||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
|
@ -1048,8 +1020,9 @@ static int32_t tsSetBlockInfo(SSubmitBlk *pBlocks, const STableMeta *pTableMeta,
|
|||
}
|
||||
}
|
||||
|
||||
#if 0
|
||||
// data block is disordered, sort it in ascending order
|
||||
void tscSortRemoveDataBlockDupRowsXX(STableDataBlocks *dataBuf) {
|
||||
static void tscSortRemoveDataBlockDupRowsOld(STableDataBlocks *dataBuf) {
|
||||
SSubmitBlk *pBlocks = (SSubmitBlk *)dataBuf->pData;
|
||||
|
||||
// size is less than the total size, since duplicated rows may be removed yet.
|
||||
|
@ -1092,12 +1065,13 @@ void tscSortRemoveDataBlockDupRowsXX(STableDataBlocks *dataBuf) {
|
|||
|
||||
dataBuf->prevTS = INT64_MIN;
|
||||
}
|
||||
#endif
|
||||
|
||||
// data block is disordered, sort it in ascending order
|
||||
int tscSortRemoveDataBlockDupRows(STableDataBlocks *dataBuf, SBlockKeyInfo *pBlkKeyInfo) {
|
||||
SSubmitBlk *pBlocks = (SSubmitBlk *)dataBuf->pData;
|
||||
int16_t nRows = pBlocks->numOfRows;
|
||||
|
||||
int16_t nRows = pBlocks->numOfRows;
|
||||
|
||||
// size is less than the total size, since duplicated rows may be removed yet.
|
||||
|
||||
// if use server time, this block must be ordered
|
||||
|
@ -1105,22 +1079,23 @@ int tscSortRemoveDataBlockDupRows(STableDataBlocks *dataBuf, SBlockKeyInfo *pBlk
|
|||
assert(dataBuf->ordered);
|
||||
}
|
||||
// allocate memory
|
||||
size_t curBlkTupleSize = nRows * sizeof(SBlockKeyTuple);
|
||||
if (pBlkKeyInfo->pKeyTuple == NULL || pBlkKeyInfo->nBytesAlloc < curBlkTupleSize) {
|
||||
char *tmp = realloc(pBlkKeyInfo->pKeyTuple, curBlkTupleSize);
|
||||
size_t nAlloc = nRows * sizeof(SBlockKeyTuple);
|
||||
if (pBlkKeyInfo->pKeyTuple == NULL || pBlkKeyInfo->maxBytesAlloc < nAlloc) {
|
||||
size_t nRealAlloc = nAlloc + 10 * sizeof(SBlockKeyTuple);
|
||||
char * tmp = realloc(pBlkKeyInfo->pKeyTuple, nRealAlloc);
|
||||
if (tmp == NULL) {
|
||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
}
|
||||
pBlkKeyInfo->pKeyTuple = (SBlockKeyTuple *)tmp;
|
||||
pBlkKeyInfo->nBytesAlloc = (int32_t)curBlkTupleSize;
|
||||
pBlkKeyInfo->maxBytesAlloc = (int32_t)nRealAlloc;
|
||||
}
|
||||
memset(pBlkKeyInfo->pKeyTuple, 0, curBlkTupleSize);
|
||||
memset(pBlkKeyInfo->pKeyTuple, 0, nAlloc);
|
||||
|
||||
SBlockKeyTuple *pBlkKeyTuple = pBlkKeyInfo->pKeyTuple;
|
||||
char * pBlockData = pBlocks->data;
|
||||
int n = 0;
|
||||
uint32_t totolPayloadLen = 0;
|
||||
TDRowLenT payloadTLen = 0;
|
||||
char * pBlockData = pBlocks->data;
|
||||
uint32_t totolPayloadLen = 0;
|
||||
TDRowLenT payloadTLen = 0;
|
||||
int n = 0;
|
||||
while (n < nRows) {
|
||||
pBlkKeyTuple->skey = payloadKey(pBlockData);
|
||||
pBlkKeyTuple->payloadAddr = pBlockData;
|
||||
|
@ -1170,7 +1145,7 @@ static int32_t doParseInsertStatement(SInsertStatementParam *pInsertParam, char
|
|||
STableComInfo tinfo = tscGetTableInfo(dataBuf->pTableMeta);
|
||||
|
||||
int32_t maxNumOfRows;
|
||||
int32_t code = tscAllocateMemIfNeed(dataBuf, getPaddingRowSize(&tinfo), &maxNumOfRows);
|
||||
int32_t code = tscAllocateMemIfNeed(dataBuf, getExtendedRowSize(&tinfo), &maxNumOfRows);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
}
|
||||
|
@ -1962,7 +1937,7 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int32_t numOfRow
|
|||
goto _error;
|
||||
}
|
||||
|
||||
tscAllocateMemIfNeed(pTableDataBlock, getPaddingRowSize(&tinfo), &maxRows);
|
||||
tscAllocateMemIfNeed(pTableDataBlock, getExtendedRowSize(&tinfo), &maxRows);
|
||||
tokenBuf = calloc(1, TSDB_MAX_BYTES_PER_ROW);
|
||||
if (tokenBuf == NULL) {
|
||||
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
|
|
|
@ -1796,7 +1796,8 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, bo
|
|||
}
|
||||
|
||||
static int32_t getRowExpandSize(STableMeta* pTableMeta) {
|
||||
int32_t result = TD_DATA_ROW_HEAD_SIZE;
|
||||
// add prefix len of KV type SMemRow(we may use SDataRow or SKVRow)
|
||||
int32_t result = TD_DATA_ROW_HEAD_SIZE + TD_MEM_ROW_KV_TYPE_VER_SIZE;
|
||||
int32_t columns = tscGetNumOfColumns(pTableMeta);
|
||||
SSchema* pSchema = tscGetTableSchema(pTableMeta);
|
||||
for(int32_t i = 0; i < columns; i++) {
|
||||
|
@ -1804,7 +1805,6 @@ static int32_t getRowExpandSize(STableMeta* pTableMeta) {
|
|||
result += TYPE_BYTES[TSDB_DATA_TYPE_BINARY];
|
||||
}
|
||||
}
|
||||
result += TD_MEM_ROW_KV_TYPE_VER_SIZE; // add prefix len of KV type SMemRow(we may use SDataRow or SKVRow)
|
||||
return result;
|
||||
}
|
||||
|
||||
|
@ -1835,7 +1835,7 @@ int32_t tscMergeTableDataBlocks(SInsertStatementParam *pInsertParam, bool freeBl
|
|||
const int INSERT_HEAD_SIZE = sizeof(SMsgDesc) + sizeof(SSubmitMsg);
|
||||
int code = 0;
|
||||
void* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
|
||||
SArray* pVnodeDataBlockList = taosArrayInit(8, POINTER_BYTES);
|
||||
SArray* pVnodeDataBlockList = taosArrayInit(8, POINTER_BYTES);
|
||||
|
||||
STableDataBlocks** p = taosHashIterate(pInsertParam->pTableBlockHashList, NULL);
|
||||
|
||||
|
@ -1883,11 +1883,11 @@ int32_t tscMergeTableDataBlocks(SInsertStatementParam *pInsertParam, bool freeBl
|
|||
}
|
||||
}
|
||||
|
||||
if((code = tscSortRemoveDataBlockDupRows(pOneTableBlock, &blkKeyInfo)) != 0){
|
||||
taosHashCleanup(pVnodeDataBlockHashList);
|
||||
tscDestroyBlockArrayList(pVnodeDataBlockList);
|
||||
tfree(dataBuf->pData);
|
||||
tfree(blkKeyInfo.pKeyTuple);
|
||||
if ((code = tscSortRemoveDataBlockDupRows(pOneTableBlock, &blkKeyInfo)) != 0) {
|
||||
taosHashCleanup(pVnodeDataBlockHashList);
|
||||
tscDestroyBlockArrayList(pVnodeDataBlockList);
|
||||
tfree(dataBuf->pData);
|
||||
tfree(blkKeyInfo.pKeyTuple);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
|
@ -620,7 +620,7 @@ static FORCE_INLINE void *tdGetMemRowDataOfCol(void *row, int8_t type, int32_t o
|
|||
#define PAYLOAD_HEADER_LEN (PAYLOAD_NCOLS_OFFSET + PAYLOAD_NCOLS_LEN)
|
||||
#define PAYLOAD_ID_LEN sizeof(int16_t)
|
||||
#define PAYLOAD_ID_TYPE_LEN (sizeof(int16_t) + sizeof(uint8_t))
|
||||
#define PLAYLOAD_PRIMARY_COL_LEN (PAYLOAD_ID_TYPE_LEN + sizeof(TSKEY))
|
||||
#define PAYLOAD_PRIMARY_COL_LEN (PAYLOAD_ID_TYPE_LEN + sizeof(TSKEY))
|
||||
|
||||
#define payloadBody(r) POINTER_SHIFT(r, PAYLOAD_HEADER_LEN)
|
||||
#define payloadType(r) (*(uint8_t *)(r))
|
||||
|
|
Loading…
Reference in New Issue