update definition of SSchema/STColumn to support columns larger than 1MB(int16_t to int32_t)

This commit is contained in:
Cary Xu 2022-03-27 10:42:23 +08:00
parent 2dfecab056
commit d4c636589a
27 changed files with 133 additions and 112 deletions

View File

@ -116,7 +116,7 @@ typedef struct SParsedDataColInfo {
uint16_t allNullLen; // TODO: get from STSchema(base on SDataRow) uint16_t allNullLen; // TODO: get from STSchema(base on SDataRow)
uint16_t extendedVarLen; uint16_t extendedVarLen;
uint16_t boundNullLen; // bound column len with all NULL value(without VarDataOffsetT/SColIdx part) uint16_t boundNullLen; // bound column len with all NULL value(without VarDataOffsetT/SColIdx part)
int32_t * boundedColumns; // bound column idx according to schema int32_t *boundColumns; // bound column idx according to schema
SBoundColumn * cols; SBoundColumn * cols;
SBoundIdxInfo *colIdxInfo; SBoundIdxInfo *colIdxInfo;
int8_t orderStatus; // bound columns int8_t orderStatus; // bound columns
@ -125,7 +125,7 @@ typedef struct SParsedDataColInfo {
#define IS_DATA_COL_ORDERED(spd) ((spd->orderStatus) == (int8_t)ORDER_STATUS_ORDERED) #define IS_DATA_COL_ORDERED(spd) ((spd->orderStatus) == (int8_t)ORDER_STATUS_ORDERED)
typedef struct { typedef struct {
uint8_t memRowType; // default is 0, that is SDataRow uint8_t rowType; // default is 0, that is SDataRow
int32_t rowSize; int32_t rowSize;
} SMemRowBuilder; } SMemRowBuilder;
@ -137,17 +137,17 @@ void destroyMemRowBuilder(SMemRowBuilder *pBuilder);
/** /**
* @brief * @brief
* *
* @param memRowType * @param rowType
* @param spd * @param spd
* @param idx the absolute bound index of columns * @param idx the absolute bound index of columns
* @return FORCE_INLINE * @return FORCE_INLINE
*/ */
static FORCE_INLINE void tscGetMemRowAppendInfo(SSchema *pSchema, uint8_t memRowType, SParsedDataColInfo *spd, static FORCE_INLINE void tscGetSTSRowAppendInfo(SSchema *pSchema, uint8_t rowType, SParsedDataColInfo *spd, int32_t idx,
int32_t idx, int32_t *toffset, int16_t *colId) { int32_t *toffset, int16_t *colId) {
int32_t schemaIdx = 0; int32_t schemaIdx = 0;
if (IS_DATA_COL_ORDERED(spd)) { if (IS_DATA_COL_ORDERED(spd)) {
schemaIdx = spd->boundedColumns[idx]; schemaIdx = spd->boundColumns[idx];
if (isDataRowT(memRowType)) { if (isDataRowT(rowType)) {
*toffset = (spd->cols + schemaIdx)->toffset; // the offset of firstPart *toffset = (spd->cols + schemaIdx)->toffset; // the offset of firstPart
} else { } else {
*toffset = idx * sizeof(SColIdx); // the offset of SColIdx *toffset = idx * sizeof(SColIdx); // the offset of SColIdx
@ -155,7 +155,7 @@ static FORCE_INLINE void tscGetMemRowAppendInfo(SSchema *pSchema, uint8_t memRow
} else { } else {
ASSERT(idx == (spd->colIdxInfo + idx)->boundIdx); ASSERT(idx == (spd->colIdxInfo + idx)->boundIdx);
schemaIdx = (spd->colIdxInfo + idx)->schemaColIdx; schemaIdx = (spd->colIdxInfo + idx)->schemaColIdx;
if (isDataRowT(memRowType)) { if (isDataRowT(rowType)) {
*toffset = (spd->cols + schemaIdx)->toffset; *toffset = (spd->cols + schemaIdx)->toffset;
} else { } else {
*toffset = ((spd->colIdxInfo + idx)->finalIdx) * sizeof(SColIdx); *toffset = ((spd->colIdxInfo + idx)->finalIdx) * sizeof(SColIdx);

View File

@ -428,7 +428,7 @@ int tsParseOneRow(char **str, STableDataBlocks *pDataBlocks, int16_t timePrec, i
// 1. set the parsed value from sql string // 1. set the parsed value from sql string
for (int i = 0; i < spd->numOfBound; ++i) { for (int i = 0; i < spd->numOfBound; ++i) {
// the start position in data block buffer of current value in sql // the start position in data block buffer of current value in sql
int32_t colIndex = spd->boundedColumns[i]; int32_t colIndex = spd->boundColumns[i];
char *start = row + spd->cols[colIndex].offset; char *start = row + spd->cols[colIndex].offset;
@ -495,7 +495,7 @@ int tsParseOneRow(char **str, STableDataBlocks *pDataBlocks, int16_t timePrec, i
bool isPrimaryKey = (colIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX); bool isPrimaryKey = (colIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX);
int32_t toffset = -1; int32_t toffset = -1;
int16_t colId = -1; int16_t colId = -1;
tscGetMemRowAppendInfo(schema, pBuilder->memRowType, spd, i, &toffset, &colId); tscGetSTSRowAppendInfo(schema, pBuilder->memRowType, spd, i, &toffset, &colId);
int32_t ret = int32_t ret =
tsParseOneColumnKV(pSchema, &sToken, row, pInsertParam->msg, str, isPrimaryKey, timePrec, toffset, colId); tsParseOneColumnKV(pSchema, &sToken, row, pInsertParam->msg, str, isPrimaryKey, timePrec, toffset, colId);
@ -630,7 +630,7 @@ void tscSetBoundColumnInfo(SParsedDataColInfo *pColInfo, SSchema *pSchema, int32
pColInfo->numOfCols = numOfCols; pColInfo->numOfCols = numOfCols;
pColInfo->numOfBound = numOfCols; pColInfo->numOfBound = numOfCols;
pColInfo->orderStatus = ORDER_STATUS_ORDERED; // default is ORDERED for non-bound mode pColInfo->orderStatus = ORDER_STATUS_ORDERED; // default is ORDERED for non-bound mode
pColInfo->boundedColumns = calloc(pColInfo->numOfCols, sizeof(int32_t)); pColInfo->boundColumns = calloc(pColInfo->numOfCols, sizeof(int32_t));
pColInfo->cols = calloc(pColInfo->numOfCols, sizeof(SBoundColumn)); pColInfo->cols = calloc(pColInfo->numOfCols, sizeof(SBoundColumn));
pColInfo->colIdxInfo = NULL; pColInfo->colIdxInfo = NULL;
pColInfo->flen = 0; pColInfo->flen = 0;
@ -656,7 +656,7 @@ void tscSetBoundColumnInfo(SParsedDataColInfo *pColInfo, SSchema *pSchema, int32
default: default:
break; break;
} }
pColInfo->boundedColumns[i] = i; pColInfo->boundColumns[i] = i;
} }
pColInfo->allNullLen += pColInfo->flen; pColInfo->allNullLen += pColInfo->flen;
pColInfo->boundNullLen = pColInfo->allNullLen; // default set allNullLen pColInfo->boundNullLen = pColInfo->allNullLen; // default set allNullLen
@ -991,7 +991,7 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql, char** boundC
} }
for (int i = 0; i < spd.numOfBound; ++i) { for (int i = 0; i < spd.numOfBound; ++i) {
SSchema* pSchema = &pTagSchema[spd.boundedColumns[i]]; SSchema *pSchema = &pTagSchema[spd.boundColumns[i]];
index = 0; index = 0;
sToken = tStrGetToken(sql, &index, true); sToken = tStrGetToken(sql, &index, true);
@ -1158,7 +1158,7 @@ static int32_t parseBoundColumns(SInsertStatementParam *pInsertParam, SParsedDat
pColInfo->numOfBound = 0; pColInfo->numOfBound = 0;
pColInfo->boundNullLen = 0; pColInfo->boundNullLen = 0;
memset(pColInfo->boundedColumns, 0, sizeof(int32_t) * nCols); memset(pColInfo->boundColumns, 0, sizeof(int32_t) * nCols);
for (int32_t i = 0; i < nCols; ++i) { for (int32_t i = 0; i < nCols; ++i) {
pColInfo->cols[i].valStat = VAL_STAT_NONE; pColInfo->cols[i].valStat = VAL_STAT_NONE;
} }
@ -1205,7 +1205,7 @@ static int32_t parseBoundColumns(SInsertStatementParam *pInsertParam, SParsedDat
} }
pColInfo->cols[t].valStat = VAL_STAT_HAS; pColInfo->cols[t].valStat = VAL_STAT_HAS;
pColInfo->boundedColumns[pColInfo->numOfBound] = t; pColInfo->boundColumns[pColInfo->numOfBound] = t;
++pColInfo->numOfBound; ++pColInfo->numOfBound;
switch (pSchema[t].type) { switch (pSchema[t].type) {
case TSDB_DATA_TYPE_BINARY: case TSDB_DATA_TYPE_BINARY:
@ -1239,7 +1239,7 @@ static int32_t parseBoundColumns(SInsertStatementParam *pInsertParam, SParsedDat
} }
pColInfo->cols[t].valStat = VAL_STAT_HAS; pColInfo->cols[t].valStat = VAL_STAT_HAS;
pColInfo->boundedColumns[pColInfo->numOfBound] = t; pColInfo->boundColumns[pColInfo->numOfBound] = t;
++pColInfo->numOfBound; ++pColInfo->numOfBound;
switch (pSchema[t].type) { switch (pSchema[t].type) {
case TSDB_DATA_TYPE_BINARY: case TSDB_DATA_TYPE_BINARY:
@ -1279,7 +1279,7 @@ static int32_t parseBoundColumns(SInsertStatementParam *pInsertParam, SParsedDat
} }
SBoundIdxInfo *pColIdx = pColInfo->colIdxInfo; SBoundIdxInfo *pColIdx = pColInfo->colIdxInfo;
for (uint16_t i = 0; i < pColInfo->numOfBound; ++i) { for (uint16_t i = 0; i < pColInfo->numOfBound; ++i) {
pColIdx[i].schemaColIdx = (uint16_t)pColInfo->boundedColumns[i]; pColIdx[i].schemaColIdx = (uint16_t)pColInfo->boundColumns[i];
pColIdx[i].boundIdx = i; pColIdx[i].boundIdx = i;
} }
qsort(pColIdx, pColInfo->numOfBound, sizeof(SBoundIdxInfo), schemaIdxCompar); qsort(pColIdx, pColInfo->numOfBound, sizeof(SBoundIdxInfo), schemaIdxCompar);
@ -1289,7 +1289,7 @@ static int32_t parseBoundColumns(SInsertStatementParam *pInsertParam, SParsedDat
qsort(pColIdx, pColInfo->numOfBound, sizeof(SBoundIdxInfo), boundIdxCompar); qsort(pColIdx, pColInfo->numOfBound, sizeof(SBoundIdxInfo), boundIdxCompar);
} }
memset(&pColInfo->boundedColumns[pColInfo->numOfBound], 0, memset(&pColInfo->boundColumns[pColInfo->numOfBound], 0,
sizeof(int32_t) * (pColInfo->numOfCols - pColInfo->numOfBound)); sizeof(int32_t) * (pColInfo->numOfCols - pColInfo->numOfBound));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;

View File

@ -1554,7 +1554,7 @@ void tscFreeSqlObj(SSqlObj* pSql) {
} }
void tscDestroyBoundColumnInfo(SParsedDataColInfo* pColInfo) { void tscDestroyBoundColumnInfo(SParsedDataColInfo* pColInfo) {
tfree(pColInfo->boundedColumns); tfree(pColInfo->boundColumns);
tfree(pColInfo->cols); tfree(pColInfo->cols);
tfree(pColInfo->colIdxInfo); tfree(pColInfo->colIdxInfo);
} }

View File

@ -59,12 +59,15 @@ extern "C" {
} while (0); } while (0);
// ----------------- TSDB COLUMN DEFINITION // ----------------- TSDB COLUMN DEFINITION
#pragma pack(push, 1)
typedef struct { typedef struct {
int8_t type; // Column type col_id_t colId; // column ID(start from PRIMARYKEY_TIMESTAMP_COL_ID(1))
col_id_t colId; // column ID(start from PRIMARYKEY_TIMESTAMP_COL_ID(1)) int32_t type : 8; // column type
int16_t bytes; // column bytes (restore to int16_t in case of misuse) int32_t bytes : 24; // column bytes (restore to int32_t in case of misuse)
uint16_t offset; // point offset in STpRow after the header part. int32_t sma : 8; // block SMA: 0, no SMA, 1, sum/min/max, 2, ...
int32_t offset : 24; // point offset in STpRow after the header part.
} STColumn; } STColumn;
#pragma pack(pop)
#define colType(col) ((col)->type) #define colType(col) ((col)->type)
#define colColId(col) ((col)->colId) #define colColId(col) ((col)->colId)
@ -136,7 +139,7 @@ typedef struct {
int32_t tdInitTSchemaBuilder(STSchemaBuilder *pBuilder, int32_t version); int32_t tdInitTSchemaBuilder(STSchemaBuilder *pBuilder, int32_t version);
void tdDestroyTSchemaBuilder(STSchemaBuilder *pBuilder); void tdDestroyTSchemaBuilder(STSchemaBuilder *pBuilder);
void tdResetTSchemaBuilder(STSchemaBuilder *pBuilder, int32_t version); void tdResetTSchemaBuilder(STSchemaBuilder *pBuilder, int32_t version);
int32_t tdAddColToSchema(STSchemaBuilder *pBuilder, int8_t type, int16_t colId, int16_t bytes); int32_t tdAddColToSchema(STSchemaBuilder *pBuilder, int8_t type, col_id_t colId, col_bytes_t bytes);
STSchema *tdGetSchemaFromBuilder(STSchemaBuilder *pBuilder); STSchema *tdGetSchemaFromBuilder(STSchemaBuilder *pBuilder);
// ----------------- Semantic timestamp key definition // ----------------- Semantic timestamp key definition
@ -590,7 +593,7 @@ void tdDestroyKVRowBuilder(SKVRowBuilder *pBuilder);
void tdResetKVRowBuilder(SKVRowBuilder *pBuilder); void tdResetKVRowBuilder(SKVRowBuilder *pBuilder);
SKVRow tdGetKVRowFromBuilder(SKVRowBuilder *pBuilder); SKVRow tdGetKVRowFromBuilder(SKVRowBuilder *pBuilder);
static FORCE_INLINE int32_t tdAddColToKVRow(SKVRowBuilder *pBuilder, int16_t colId, int8_t type, const void *value) { static FORCE_INLINE int32_t tdAddColToKVRow(SKVRowBuilder *pBuilder, col_id_t colId, int8_t type, const void *value) {
if (pBuilder->nCols >= pBuilder->tCols) { if (pBuilder->nCols >= pBuilder->tCols) {
pBuilder->tCols *= 2; pBuilder->tCols *= 2;
SColIdx *pColIdx = (SColIdx *)taosMemoryRealloc((void *)(pBuilder->pColIdx), sizeof(SColIdx) * pBuilder->tCols); SColIdx *pColIdx = (SColIdx *)taosMemoryRealloc((void *)(pBuilder->pColIdx), sizeof(SColIdx) * pBuilder->tCols);

View File

@ -259,10 +259,10 @@ typedef struct {
} SSubmitRsp; } SSubmitRsp;
typedef struct SSchema { typedef struct SSchema {
int8_t type; int8_t type;
int32_t colId; col_id_t colId;
int32_t bytes; int32_t bytes;
char name[TSDB_COL_NAME_LEN]; char name[TSDB_COL_NAME_LEN];
} SSchema; } SSchema;
typedef struct { typedef struct {
@ -438,8 +438,8 @@ typedef struct {
*/ */
typedef struct { typedef struct {
union { union {
int16_t colId; col_id_t colId;
int16_t slotId; int16_t slotId;
}; };
int16_t type; int16_t type;
@ -1902,7 +1902,7 @@ static FORCE_INLINE int32_t taosEncodeSSchema(void** buf, const SSchema* pSchema
int32_t tlen = 0; int32_t tlen = 0;
tlen += taosEncodeFixedI8(buf, pSchema->type); tlen += taosEncodeFixedI8(buf, pSchema->type);
tlen += taosEncodeFixedI32(buf, pSchema->bytes); tlen += taosEncodeFixedI32(buf, pSchema->bytes);
tlen += taosEncodeFixedI32(buf, pSchema->colId); tlen += taosEncodeFixedI16(buf, pSchema->colId);
tlen += taosEncodeString(buf, pSchema->name); tlen += taosEncodeString(buf, pSchema->name);
return tlen; return tlen;
} }
@ -1910,7 +1910,7 @@ static FORCE_INLINE int32_t taosEncodeSSchema(void** buf, const SSchema* pSchema
static FORCE_INLINE void* taosDecodeSSchema(void* buf, SSchema* pSchema) { static FORCE_INLINE void* taosDecodeSSchema(void* buf, SSchema* pSchema) {
buf = taosDecodeFixedI8(buf, &pSchema->type); buf = taosDecodeFixedI8(buf, &pSchema->type);
buf = taosDecodeFixedI32(buf, &pSchema->bytes); buf = taosDecodeFixedI32(buf, &pSchema->bytes);
buf = taosDecodeFixedI32(buf, &pSchema->colId); buf = taosDecodeFixedI16(buf, &pSchema->colId);
buf = taosDecodeStringTo(buf, pSchema->name); buf = taosDecodeStringTo(buf, pSchema->name);
return buf; return buf;
} }
@ -1918,7 +1918,7 @@ static FORCE_INLINE void* taosDecodeSSchema(void* buf, SSchema* pSchema) {
static FORCE_INLINE int32_t tEncodeSSchema(SCoder* pEncoder, const SSchema* pSchema) { static FORCE_INLINE int32_t tEncodeSSchema(SCoder* pEncoder, const SSchema* pSchema) {
if (tEncodeI8(pEncoder, pSchema->type) < 0) return -1; if (tEncodeI8(pEncoder, pSchema->type) < 0) return -1;
if (tEncodeI32(pEncoder, pSchema->bytes) < 0) return -1; if (tEncodeI32(pEncoder, pSchema->bytes) < 0) return -1;
if (tEncodeI32(pEncoder, pSchema->colId) < 0) return -1; if (tEncodeI16(pEncoder, pSchema->colId) < 0) return -1;
if (tEncodeCStr(pEncoder, pSchema->name) < 0) return -1; if (tEncodeCStr(pEncoder, pSchema->name) < 0) return -1;
return 0; return 0;
} }
@ -1926,7 +1926,7 @@ static FORCE_INLINE int32_t tEncodeSSchema(SCoder* pEncoder, const SSchema* pSch
static FORCE_INLINE int32_t tDecodeSSchema(SCoder* pDecoder, SSchema* pSchema) { static FORCE_INLINE int32_t tDecodeSSchema(SCoder* pDecoder, SSchema* pSchema) {
if (tDecodeI8(pDecoder, &pSchema->type) < 0) return -1; if (tDecodeI8(pDecoder, &pSchema->type) < 0) return -1;
if (tDecodeI32(pDecoder, &pSchema->bytes) < 0) return -1; if (tDecodeI32(pDecoder, &pSchema->bytes) < 0) return -1;
if (tDecodeI32(pDecoder, &pSchema->colId) < 0) return -1; if (tDecodeI16(pDecoder, &pSchema->colId) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pSchema->name) < 0) return -1; if (tDecodeCStrTo(pDecoder, pSchema->name) < 0) return -1;
return 0; return 0;
} }

View File

@ -671,8 +671,9 @@ static FORCE_INLINE int32_t tdAppendColValToKvRow(SRowBuilder *pBuilder, TDRowVa
* @param colIdx sorted column index, start from 0 * @param colIdx sorted column index, start from 0
* @return FORCE_INLINE * @return FORCE_INLINE
*/ */
static FORCE_INLINE int32_t tdAppendColValToRow(SRowBuilder *pBuilder, int16_t colId, int8_t colType, TDRowValT valType, static FORCE_INLINE int32_t tdAppendColValToRow(SRowBuilder *pBuilder, col_id_t colId, int8_t colType,
const void *val, bool isCopyVarData, int32_t offset, int16_t colIdx) { TDRowValT valType, const void *val, bool isCopyVarData, int32_t offset,
col_id_t colIdx) {
STSRow *pRow = pBuilder->pBuf; STSRow *pRow = pBuilder->pBuf;
if (!val) { if (!val) {
#ifdef TD_SUPPORT_BITMAP #ifdef TD_SUPPORT_BITMAP

View File

@ -29,6 +29,7 @@ typedef uint32_t TDRowLenT;
typedef uint8_t TDRowValT; typedef uint8_t TDRowValT;
typedef int16_t col_id_t; typedef int16_t col_id_t;
typedef int8_t col_type_t; typedef int8_t col_type_t;
typedef int32_t col_bytes_t;
#pragma pack(push, 1) #pragma pack(push, 1)
typedef struct { typedef struct {

View File

@ -56,7 +56,7 @@ typedef enum EColumnType {
typedef struct SColumnNode { typedef struct SColumnNode {
SExprNode node; // QUERY_NODE_COLUMN SExprNode node; // QUERY_NODE_COLUMN
uint64_t tableId; uint64_t tableId;
int16_t colId; col_id_t colId;
EColumnType colType; // column or tag EColumnType colType; // column or tag
char dbName[TSDB_DB_NAME_LEN]; char dbName[TSDB_DB_NAME_LEN];
char tableName[TSDB_TABLE_NAME_LEN]; char tableName[TSDB_TABLE_NAME_LEN];

View File

@ -43,10 +43,10 @@ typedef enum {
} ETaskType; } ETaskType;
typedef struct STableComInfo { typedef struct STableComInfo {
uint8_t numOfTags; // the number of tags in schema uint8_t numOfTags; // the number of tags in schema
uint8_t precision; // the number of precision uint8_t precision; // the number of precision
int16_t numOfColumns; // the number of columns col_id_t numOfColumns; // the number of columns
int32_t rowSize; // row size of the schema int32_t rowSize; // row size of the schema
} STableComInfo; } STableComInfo;
typedef struct SIndexMeta { typedef struct SIndexMeta {
@ -171,7 +171,7 @@ bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_
int32_t queryCreateTableMetaFromMsg(STableMetaRsp* msg, bool isSuperTable, STableMeta** pMeta); int32_t queryCreateTableMetaFromMsg(STableMetaRsp* msg, bool isSuperTable, STableMeta** pMeta);
char *jobTaskStatusStr(int32_t status); char *jobTaskStatusStr(int32_t status);
SSchema createSchema(uint8_t type, int32_t bytes, int32_t colId, const char* name); SSchema createSchema(int8_t type, int32_t bytes, col_id_t colId, const char* name);
extern int32_t (*queryBuildMsg[TDMT_MAX])(void* input, char** msg, int32_t msgSize, int32_t* msgLen); extern int32_t (*queryBuildMsg[TDMT_MAX])(void* input, char** msg, int32_t msgSize, int32_t* msgLen);
extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t msgSize); extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t msgSize);

View File

@ -91,7 +91,7 @@ static int32_t hbProcessStbInfoRsp(void *value, int32_t valueLen, struct SCatalo
} else { } else {
tscDebug("hb update stb, db:%s, stb:%s", rsp->dbFName, rsp->stbName); tscDebug("hb update stb, db:%s, stb:%s", rsp->dbFName, rsp->stbName);
if (rsp->pSchemas[0].colId != PRIMARYKEY_TIMESTAMP_COL_ID) { if (rsp->pSchemas[0].colId != PRIMARYKEY_TIMESTAMP_COL_ID) {
tscError("invalid colId[%d] for the first column in table meta rsp msg", rsp->pSchemas[0].colId); tscError("invalid colId[%" PRIi16 "] for the first column in table meta rsp msg", rsp->pSchemas[0].colId);
tFreeSTableMetaBatchRsp(&batchMetaRsp); tFreeSTableMetaBatchRsp(&batchMetaRsp);
return TSDB_CODE_TSC_INVALID_VALUE; return TSDB_CODE_TSC_INVALID_VALUE;
} }

View File

@ -106,12 +106,12 @@ void *tdDecodeSchema(void *buf, STSchema **pRSchema) {
if (tdInitTSchemaBuilder(&schemaBuilder, version) < 0) return NULL; if (tdInitTSchemaBuilder(&schemaBuilder, version) < 0) return NULL;
for (int i = 0; i < numOfCols; i++) { for (int i = 0; i < numOfCols; i++) {
int8_t type = 0; col_type_t type = 0;
int16_t colId = 0; col_id_t colId = 0;
int16_t bytes = 0; col_bytes_t bytes = 0;
buf = taosDecodeFixedI8(buf, &type); buf = taosDecodeFixedI8(buf, &type);
buf = taosDecodeFixedI16(buf, &colId); buf = taosDecodeFixedI16(buf, &colId);
buf = taosDecodeFixedI16(buf, &bytes); buf = taosDecodeFixedI32(buf, &bytes);
if (tdAddColToSchema(&schemaBuilder, type, colId, bytes) < 0) { if (tdAddColToSchema(&schemaBuilder, type, colId, bytes) < 0) {
tdDestroyTSchemaBuilder(&schemaBuilder); tdDestroyTSchemaBuilder(&schemaBuilder);
return NULL; return NULL;
@ -148,7 +148,7 @@ void tdResetTSchemaBuilder(STSchemaBuilder *pBuilder, int32_t version) {
pBuilder->version = version; pBuilder->version = version;
} }
int tdAddColToSchema(STSchemaBuilder *pBuilder, int8_t type, int16_t colId, int16_t bytes) { int tdAddColToSchema(STSchemaBuilder *pBuilder, int8_t type, col_id_t colId, col_bytes_t bytes) {
if (!isValidDataType(type)) return -1; if (!isValidDataType(type)) return -1;
if (pBuilder->nCols >= pBuilder->tCols) { if (pBuilder->nCols >= pBuilder->tCols) {

View File

@ -299,14 +299,14 @@ int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) {
tlen += taosEncodeFixedU32(buf, pReq->stbCfg.nCols); tlen += taosEncodeFixedU32(buf, pReq->stbCfg.nCols);
for (uint32_t i = 0; i < pReq->stbCfg.nCols; i++) { for (uint32_t i = 0; i < pReq->stbCfg.nCols; i++) {
tlen += taosEncodeFixedI8(buf, pReq->stbCfg.pSchema[i].type); tlen += taosEncodeFixedI8(buf, pReq->stbCfg.pSchema[i].type);
tlen += taosEncodeFixedI32(buf, pReq->stbCfg.pSchema[i].colId); tlen += taosEncodeFixedI16(buf, pReq->stbCfg.pSchema[i].colId);
tlen += taosEncodeFixedI32(buf, pReq->stbCfg.pSchema[i].bytes); tlen += taosEncodeFixedI32(buf, pReq->stbCfg.pSchema[i].bytes);
tlen += taosEncodeString(buf, pReq->stbCfg.pSchema[i].name); tlen += taosEncodeString(buf, pReq->stbCfg.pSchema[i].name);
} }
tlen += taosEncodeFixedU32(buf, pReq->stbCfg.nTagCols); tlen += taosEncodeFixedU32(buf, pReq->stbCfg.nTagCols);
for (uint32_t i = 0; i < pReq->stbCfg.nTagCols; i++) { for (uint32_t i = 0; i < pReq->stbCfg.nTagCols; i++) {
tlen += taosEncodeFixedI8(buf, pReq->stbCfg.pTagSchema[i].type); tlen += taosEncodeFixedI8(buf, pReq->stbCfg.pTagSchema[i].type);
tlen += taosEncodeFixedI32(buf, pReq->stbCfg.pTagSchema[i].colId); tlen += taosEncodeFixedI16(buf, pReq->stbCfg.pTagSchema[i].colId);
tlen += taosEncodeFixedI32(buf, pReq->stbCfg.pTagSchema[i].bytes); tlen += taosEncodeFixedI32(buf, pReq->stbCfg.pTagSchema[i].bytes);
tlen += taosEncodeString(buf, pReq->stbCfg.pTagSchema[i].name); tlen += taosEncodeString(buf, pReq->stbCfg.pTagSchema[i].name);
} }
@ -333,7 +333,7 @@ int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) {
tlen += taosEncodeFixedU32(buf, pReq->ntbCfg.nCols); tlen += taosEncodeFixedU32(buf, pReq->ntbCfg.nCols);
for (uint32_t i = 0; i < pReq->ntbCfg.nCols; i++) { for (uint32_t i = 0; i < pReq->ntbCfg.nCols; i++) {
tlen += taosEncodeFixedI8(buf, pReq->ntbCfg.pSchema[i].type); tlen += taosEncodeFixedI8(buf, pReq->ntbCfg.pSchema[i].type);
tlen += taosEncodeFixedI32(buf, pReq->ntbCfg.pSchema[i].colId); tlen += taosEncodeFixedI16(buf, pReq->ntbCfg.pSchema[i].colId);
tlen += taosEncodeFixedI32(buf, pReq->ntbCfg.pSchema[i].bytes); tlen += taosEncodeFixedI32(buf, pReq->ntbCfg.pSchema[i].bytes);
tlen += taosEncodeString(buf, pReq->ntbCfg.pSchema[i].name); tlen += taosEncodeString(buf, pReq->ntbCfg.pSchema[i].name);
} }
@ -374,7 +374,7 @@ void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) {
pReq->stbCfg.pSchema = (SSchema *)taosMemoryMalloc(pReq->stbCfg.nCols * sizeof(SSchema)); pReq->stbCfg.pSchema = (SSchema *)taosMemoryMalloc(pReq->stbCfg.nCols * sizeof(SSchema));
for (uint32_t i = 0; i < pReq->stbCfg.nCols; i++) { for (uint32_t i = 0; i < pReq->stbCfg.nCols; i++) {
buf = taosDecodeFixedI8(buf, &(pReq->stbCfg.pSchema[i].type)); buf = taosDecodeFixedI8(buf, &(pReq->stbCfg.pSchema[i].type));
buf = taosDecodeFixedI32(buf, &(pReq->stbCfg.pSchema[i].colId)); buf = taosDecodeFixedI16(buf, &(pReq->stbCfg.pSchema[i].colId));
buf = taosDecodeFixedI32(buf, &(pReq->stbCfg.pSchema[i].bytes)); buf = taosDecodeFixedI32(buf, &(pReq->stbCfg.pSchema[i].bytes));
buf = taosDecodeStringTo(buf, pReq->stbCfg.pSchema[i].name); buf = taosDecodeStringTo(buf, pReq->stbCfg.pSchema[i].name);
} }
@ -382,7 +382,7 @@ void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) {
pReq->stbCfg.pTagSchema = (SSchema *)taosMemoryMalloc(pReq->stbCfg.nTagCols * sizeof(SSchema)); pReq->stbCfg.pTagSchema = (SSchema *)taosMemoryMalloc(pReq->stbCfg.nTagCols * sizeof(SSchema));
for (uint32_t i = 0; i < pReq->stbCfg.nTagCols; i++) { for (uint32_t i = 0; i < pReq->stbCfg.nTagCols; i++) {
buf = taosDecodeFixedI8(buf, &(pReq->stbCfg.pTagSchema[i].type)); buf = taosDecodeFixedI8(buf, &(pReq->stbCfg.pTagSchema[i].type));
buf = taosDecodeFixedI32(buf, &pReq->stbCfg.pTagSchema[i].colId); buf = taosDecodeFixedI16(buf, &pReq->stbCfg.pTagSchema[i].colId);
buf = taosDecodeFixedI32(buf, &pReq->stbCfg.pTagSchema[i].bytes); buf = taosDecodeFixedI32(buf, &pReq->stbCfg.pTagSchema[i].bytes);
buf = taosDecodeStringTo(buf, pReq->stbCfg.pTagSchema[i].name); buf = taosDecodeStringTo(buf, pReq->stbCfg.pTagSchema[i].name);
} }
@ -422,7 +422,7 @@ void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) {
pReq->ntbCfg.pSchema = (SSchema *)taosMemoryMalloc(pReq->ntbCfg.nCols * sizeof(SSchema)); pReq->ntbCfg.pSchema = (SSchema *)taosMemoryMalloc(pReq->ntbCfg.nCols * sizeof(SSchema));
for (uint32_t i = 0; i < pReq->ntbCfg.nCols; i++) { for (uint32_t i = 0; i < pReq->ntbCfg.nCols; i++) {
buf = taosDecodeFixedI8(buf, &pReq->ntbCfg.pSchema[i].type); buf = taosDecodeFixedI8(buf, &pReq->ntbCfg.pSchema[i].type);
buf = taosDecodeFixedI32(buf, &pReq->ntbCfg.pSchema[i].colId); buf = taosDecodeFixedI16(buf, &pReq->ntbCfg.pSchema[i].colId);
buf = taosDecodeFixedI32(buf, &pReq->ntbCfg.pSchema[i].bytes); buf = taosDecodeFixedI32(buf, &pReq->ntbCfg.pSchema[i].bytes);
buf = taosDecodeStringTo(buf, pReq->ntbCfg.pSchema[i].name); buf = taosDecodeStringTo(buf, pReq->ntbCfg.pSchema[i].name);
} }

View File

@ -98,7 +98,7 @@ SSdbRaw *mndStbActionEncode(SStbObj *pStb) {
for (int32_t i = 0; i < pStb->numOfColumns; ++i) { for (int32_t i = 0; i < pStb->numOfColumns; ++i) {
SSchema *pSchema = &pStb->pColumns[i]; SSchema *pSchema = &pStb->pColumns[i];
SDB_SET_INT8(pRaw, dataPos, pSchema->type, STB_ENCODE_OVER) SDB_SET_INT8(pRaw, dataPos, pSchema->type, STB_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pSchema->colId, STB_ENCODE_OVER) SDB_SET_INT16(pRaw, dataPos, pSchema->colId, STB_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pSchema->bytes, STB_ENCODE_OVER) SDB_SET_INT32(pRaw, dataPos, pSchema->bytes, STB_ENCODE_OVER)
SDB_SET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN, STB_ENCODE_OVER) SDB_SET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN, STB_ENCODE_OVER)
} }
@ -106,7 +106,7 @@ SSdbRaw *mndStbActionEncode(SStbObj *pStb) {
for (int32_t i = 0; i < pStb->numOfTags; ++i) { for (int32_t i = 0; i < pStb->numOfTags; ++i) {
SSchema *pSchema = &pStb->pTags[i]; SSchema *pSchema = &pStb->pTags[i];
SDB_SET_INT8(pRaw, dataPos, pSchema->type, STB_ENCODE_OVER) SDB_SET_INT8(pRaw, dataPos, pSchema->type, STB_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pSchema->colId, STB_ENCODE_OVER) SDB_SET_INT16(pRaw, dataPos, pSchema->colId, STB_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pSchema->bytes, STB_ENCODE_OVER) SDB_SET_INT32(pRaw, dataPos, pSchema->bytes, STB_ENCODE_OVER)
SDB_SET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN, STB_ENCODE_OVER) SDB_SET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN, STB_ENCODE_OVER)
} }
@ -114,7 +114,7 @@ SSdbRaw *mndStbActionEncode(SStbObj *pStb) {
for (int32_t i = 0; i < pStb->numOfSmas; ++i) { for (int32_t i = 0; i < pStb->numOfSmas; ++i) {
SSchema *pSchema = &pStb->pSmas[i]; SSchema *pSchema = &pStb->pSmas[i];
SDB_SET_INT8(pRaw, dataPos, pSchema->type, STB_ENCODE_OVER) SDB_SET_INT8(pRaw, dataPos, pSchema->type, STB_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pSchema->colId, STB_ENCODE_OVER) SDB_SET_INT16(pRaw, dataPos, pSchema->colId, STB_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pSchema->bytes, STB_ENCODE_OVER) SDB_SET_INT32(pRaw, dataPos, pSchema->bytes, STB_ENCODE_OVER)
SDB_SET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN, STB_ENCODE_OVER) SDB_SET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN, STB_ENCODE_OVER)
} }
@ -185,7 +185,7 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) {
for (int32_t i = 0; i < pStb->numOfColumns; ++i) { for (int32_t i = 0; i < pStb->numOfColumns; ++i) {
SSchema *pSchema = &pStb->pColumns[i]; SSchema *pSchema = &pStb->pColumns[i];
SDB_GET_INT8(pRaw, dataPos, &pSchema->type, STB_DECODE_OVER) SDB_GET_INT8(pRaw, dataPos, &pSchema->type, STB_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pSchema->colId, STB_DECODE_OVER) SDB_GET_INT16(pRaw, dataPos, &pSchema->colId, STB_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pSchema->bytes, STB_DECODE_OVER) SDB_GET_INT32(pRaw, dataPos, &pSchema->bytes, STB_DECODE_OVER)
SDB_GET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN, STB_DECODE_OVER) SDB_GET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN, STB_DECODE_OVER)
} }
@ -193,7 +193,7 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) {
for (int32_t i = 0; i < pStb->numOfTags; ++i) { for (int32_t i = 0; i < pStb->numOfTags; ++i) {
SSchema *pSchema = &pStb->pTags[i]; SSchema *pSchema = &pStb->pTags[i];
SDB_GET_INT8(pRaw, dataPos, &pSchema->type, STB_DECODE_OVER) SDB_GET_INT8(pRaw, dataPos, &pSchema->type, STB_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pSchema->colId, STB_DECODE_OVER) SDB_GET_INT16(pRaw, dataPos, &pSchema->colId, STB_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pSchema->bytes, STB_DECODE_OVER) SDB_GET_INT32(pRaw, dataPos, &pSchema->bytes, STB_DECODE_OVER)
SDB_GET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN, STB_DECODE_OVER) SDB_GET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN, STB_DECODE_OVER)
} }
@ -201,7 +201,7 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) {
for (int32_t i = 0; i < pStb->numOfSmas; ++i) { for (int32_t i = 0; i < pStb->numOfSmas; ++i) {
SSchema *pSchema = &pStb->pSmas[i]; SSchema *pSchema = &pStb->pSmas[i];
SDB_GET_INT8(pRaw, dataPos, &pSchema->type, STB_DECODE_OVER) SDB_GET_INT8(pRaw, dataPos, &pSchema->type, STB_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pSchema->colId, STB_DECODE_OVER) SDB_GET_INT16(pRaw, dataPos, &pSchema->colId, STB_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pSchema->bytes, STB_DECODE_OVER) SDB_GET_INT32(pRaw, dataPos, &pSchema->bytes, STB_DECODE_OVER)
SDB_GET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN, STB_DECODE_OVER) SDB_GET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN, STB_DECODE_OVER)
} }

View File

@ -233,7 +233,7 @@ int metaSaveSmaToDB(SMeta *pMeta, STSma *pSmaCfg) {
// save sma info // save sma info
int32_t len = tEncodeTSma(NULL, pSmaCfg); int32_t len = tEncodeTSma(NULL, pSmaCfg);
pBuf = taosMemoryCalloc(len, 1); pBuf = taosMemoryCalloc(1, len);
if (pBuf == NULL) { if (pBuf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
@ -285,7 +285,7 @@ static int metaEncodeSchema(void **buf, SSchemaWrapper *pSW) {
for (int i = 0; i < pSW->nCols; i++) { for (int i = 0; i < pSW->nCols; i++) {
pSchema = pSW->pSchema + i; pSchema = pSW->pSchema + i;
tlen += taosEncodeFixedI8(buf, pSchema->type); tlen += taosEncodeFixedI8(buf, pSchema->type);
tlen += taosEncodeFixedI32(buf, pSchema->colId); tlen += taosEncodeFixedI16(buf, pSchema->colId);
tlen += taosEncodeFixedI32(buf, pSchema->bytes); tlen += taosEncodeFixedI32(buf, pSchema->bytes);
tlen += taosEncodeString(buf, pSchema->name); tlen += taosEncodeString(buf, pSchema->name);
} }
@ -301,7 +301,7 @@ static void *metaDecodeSchema(void *buf, SSchemaWrapper *pSW) {
for (int i = 0; i < pSW->nCols; i++) { for (int i = 0; i < pSW->nCols; i++) {
pSchema = pSW->pSchema + i; pSchema = pSW->pSchema + i;
buf = taosDecodeFixedI8(buf, &pSchema->type); buf = taosDecodeFixedI8(buf, &pSchema->type);
buf = taosDecodeFixedI32(buf, &pSchema->colId); buf = taosDecodeFixedI16(buf, &pSchema->colId);
buf = taosDecodeFixedI32(buf, &pSchema->bytes); buf = taosDecodeFixedI32(buf, &pSchema->bytes);
buf = taosDecodeStringTo(buf, pSchema->name); buf = taosDecodeStringTo(buf, pSchema->name);
} }
@ -516,6 +516,7 @@ static int metaEncodeTbInfo(void **buf, STbCfg *pTbCfg) {
tsize += taosEncodeFixedU64(buf, pTbCfg->ctbCfg.suid); tsize += taosEncodeFixedU64(buf, pTbCfg->ctbCfg.suid);
tsize += tdEncodeKVRow(buf, pTbCfg->ctbCfg.pTag); tsize += tdEncodeKVRow(buf, pTbCfg->ctbCfg.pTag);
} else if (pTbCfg->type == META_NORMAL_TABLE) { } else if (pTbCfg->type == META_NORMAL_TABLE) {
// TODO
} else { } else {
ASSERT(0); ASSERT(0);
} }
@ -538,6 +539,7 @@ static void *metaDecodeTbInfo(void *buf, STbCfg *pTbCfg) {
buf = taosDecodeFixedU64(buf, &(pTbCfg->ctbCfg.suid)); buf = taosDecodeFixedU64(buf, &(pTbCfg->ctbCfg.suid));
buf = tdDecodeKVRow(buf, &(pTbCfg->ctbCfg.pTag)); buf = tdDecodeKVRow(buf, &(pTbCfg->ctbCfg.pTag));
} else if (pTbCfg->type == META_NORMAL_TABLE) { } else if (pTbCfg->type == META_NORMAL_TABLE) {
// TODO
} else { } else {
ASSERT(0); ASSERT(0);
} }

View File

@ -130,8 +130,8 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
int32_t colNeed = 0; int32_t colNeed = 0;
while (colMeta < pSchemaWrapper->nCols && colNeed < colNumNeed) { while (colMeta < pSchemaWrapper->nCols && colNeed < colNumNeed) {
SSchema* pColSchema = &pSchemaWrapper->pSchema[colMeta]; SSchema* pColSchema = &pSchemaWrapper->pSchema[colMeta];
int16_t colIdSchema = pColSchema->colId; col_id_t colIdSchema = pColSchema->colId;
int16_t colIdNeed = *(int16_t*)taosArrayGet(pHandle->pColIdList, colNeed); col_id_t colIdNeed = *(col_id_t*)taosArrayGet(pHandle->pColIdList, colNeed);
if (colIdSchema < colIdNeed) { if (colIdSchema < colIdNeed) {
colMeta++; colMeta++;
} else if (colIdSchema > colIdNeed) { } else if (colIdSchema > colIdNeed) {
@ -159,7 +159,7 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
int j = 0; int j = 0;
for (int32_t i = 0; i < colNumNeed; i++) { for (int32_t i = 0; i < colNumNeed; i++) {
int16_t colId = *(int16_t*)taosArrayGet(pHandle->pColIdList, i); col_id_t colId = *(col_id_t*)taosArrayGet(pHandle->pColIdList, i);
while (j < pSchemaWrapper->nCols && pSchemaWrapper->pSchema[j].colId < colId) { while (j < pSchemaWrapper->nCols && pSchemaWrapper->pSchema[j].colId < colId) {
j++; j++;
} }

View File

@ -1369,7 +1369,7 @@ int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDFile *pDF
} }
} }
// Update pBlock membership vairables // Update pBlock membership variables
pBlock->last = isLast; pBlock->last = isLast;
pBlock->offset = offset; pBlock->offset = offset;
pBlock->algorithm = pCfg->compression; pBlock->algorithm = pCfg->compression;

View File

@ -255,7 +255,7 @@ int32_t tdScanAndConvertSubmitMsg(SSubmitReq *pMsg) {
return 0; return 0;
} }
int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg) { static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg) {
ASSERT(pMsg != NULL); ASSERT(pMsg != NULL);
// STsdbMeta * pMeta = pTsdb->tsdbMeta; // STsdbMeta * pMeta = pTsdb->tsdbMeta;
SSubmitMsgIter msgIter = {0}; SSubmitMsgIter msgIter = {0};

View File

@ -74,6 +74,9 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
// TODO: maybe need to clear the request struct // TODO: maybe need to clear the request struct
taosMemoryFree(vCreateTbReq.stbCfg.pSchema); taosMemoryFree(vCreateTbReq.stbCfg.pSchema);
taosMemoryFree(vCreateTbReq.stbCfg.pTagSchema); taosMemoryFree(vCreateTbReq.stbCfg.pTagSchema);
taosMemoryFree(vCreateTbReq.stbCfg.pBSmaCols);
taosMemoryFree(vCreateTbReq.stbCfg.pRSmaParam);
taosMemoryFree(vCreateTbReq.dbFName);
taosMemoryFree(vCreateTbReq.name); taosMemoryFree(vCreateTbReq.name);
break; break;
} }
@ -102,13 +105,18 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
vError("vgId:%d, failed to create table: %s", pVnode->vgId, pCreateTbReq->name); vError("vgId:%d, failed to create table: %s", pVnode->vgId, pCreateTbReq->name);
} }
taosMemoryFree(pCreateTbReq->name); taosMemoryFree(pCreateTbReq->name);
taosMemoryFree(pCreateTbReq->dbFName);
if (pCreateTbReq->type == TD_SUPER_TABLE) { if (pCreateTbReq->type == TD_SUPER_TABLE) {
taosMemoryFree(pCreateTbReq->stbCfg.pSchema); taosMemoryFree(pCreateTbReq->stbCfg.pSchema);
taosMemoryFree(pCreateTbReq->stbCfg.pTagSchema); taosMemoryFree(pCreateTbReq->stbCfg.pTagSchema);
taosMemoryFree(pCreateTbReq->stbCfg.pBSmaCols);
taosMemoryFree(pCreateTbReq->stbCfg.pRSmaParam);
} else if (pCreateTbReq->type == TD_CHILD_TABLE) { } else if (pCreateTbReq->type == TD_CHILD_TABLE) {
taosMemoryFree(pCreateTbReq->ctbCfg.pTag); taosMemoryFree(pCreateTbReq->ctbCfg.pTag);
} else { } else {
taosMemoryFree(pCreateTbReq->ntbCfg.pSchema); taosMemoryFree(pCreateTbReq->ntbCfg.pSchema);
taosMemoryFree(pCreateTbReq->ntbCfg.pBSmaCols);
taosMemoryFree(pCreateTbReq->ntbCfg.pRSmaParam);
} }
} }
@ -135,6 +143,9 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
tDeserializeSVCreateTbReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vAlterTbReq); tDeserializeSVCreateTbReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vAlterTbReq);
taosMemoryFree(vAlterTbReq.stbCfg.pSchema); taosMemoryFree(vAlterTbReq.stbCfg.pSchema);
taosMemoryFree(vAlterTbReq.stbCfg.pTagSchema); taosMemoryFree(vAlterTbReq.stbCfg.pTagSchema);
taosMemoryFree(vAlterTbReq.stbCfg.pBSmaCols);
taosMemoryFree(vAlterTbReq.stbCfg.pRSmaParam);
taosMemoryFree(vAlterTbReq.dbFName);
taosMemoryFree(vAlterTbReq.name); taosMemoryFree(vAlterTbReq.name);
break; break;
} }

View File

@ -172,7 +172,7 @@ void ctgDbgShowTableMeta(SCatalog* pCtg, const char *tbName, STableMeta* p) {
int32_t colNum = c->numOfColumns + c->numOfTags; int32_t colNum = c->numOfColumns + c->numOfTags;
for (int32_t i = 0; i < colNum; ++i) { for (int32_t i = 0; i < colNum; ++i) {
SSchema *s = &p->schema[i]; SSchema *s = &p->schema[i];
ctgDebug("[%d] name:%s, type:%d, colId:%d, bytes:%d", i, s->name, s->type, s->colId, s->bytes); ctgDebug("[%d] name:%s, type:%d, colId:%" PRIi16 ", bytes:%d", i, s->name, s->type, s->colId, s->bytes);
} }
} }

View File

@ -41,26 +41,26 @@ typedef struct SBoundColumn {
} SBoundColumn; } SBoundColumn;
typedef struct { typedef struct {
uint16_t schemaColIdx; col_id_t schemaColIdx;
uint16_t boundIdx; col_id_t boundIdx;
uint16_t finalIdx; col_id_t finalIdx;
} SBoundIdxInfo; } SBoundIdxInfo;
typedef struct SParsedDataColInfo { typedef struct SParsedDataColInfo {
int16_t numOfCols; col_id_t numOfCols;
int16_t numOfBound; col_id_t numOfBound;
uint16_t flen; // TODO: get from STSchema uint16_t flen; // TODO: get from STSchema
uint16_t allNullLen; // TODO: get from STSchema(base on SDataRow) uint16_t allNullLen; // TODO: get from STSchema(base on SDataRow)
uint16_t extendedVarLen; uint16_t extendedVarLen;
uint16_t boundNullLen; // bound column len with all NULL value(without VarDataOffsetT/SColIdx part) uint16_t boundNullLen; // bound column len with all NULL value(without VarDataOffsetT/SColIdx part)
int32_t * boundedColumns; // bound column idx according to schema col_id_t *boundColumns; // bound column idx according to schema
SBoundColumn * cols; SBoundColumn *cols;
SBoundIdxInfo *colIdxInfo; SBoundIdxInfo *colIdxInfo;
int8_t orderStatus; // bound columns int8_t orderStatus; // bound columns
} SParsedDataColInfo; } SParsedDataColInfo;
typedef struct { typedef struct {
uint8_t memRowType; // default is 0, that is SDataRow uint8_t rowType; // default is 0, that is SDataRow
int32_t rowSize; int32_t rowSize;
} SMemRowBuilder; } SMemRowBuilder;
@ -92,11 +92,11 @@ static FORCE_INLINE int32_t getExtendedRowSize(STableDataBlocks *pBlock) {
(int32_t)TD_BITMAP_BYTES(pTableInfo->numOfColumns - 1); (int32_t)TD_BITMAP_BYTES(pTableInfo->numOfColumns - 1);
} }
static FORCE_INLINE void getMemRowAppendInfo(SSchema *pSchema, uint8_t rowType, SParsedDataColInfo *spd, static FORCE_INLINE void getSTSRowAppendInfo(SSchema *pSchema, uint8_t rowType, SParsedDataColInfo *spd, col_id_t idx,
int32_t idx, int32_t *toffset, int32_t *colIdx) { int32_t *toffset, col_id_t *colIdx) {
int32_t schemaIdx = 0; col_id_t schemaIdx = 0;
if (IS_DATA_COL_ORDERED(spd)) { if (IS_DATA_COL_ORDERED(spd)) {
schemaIdx = spd->boundedColumns[idx] - PRIMARYKEY_TIMESTAMP_COL_ID; schemaIdx = spd->boundColumns[idx] - PRIMARYKEY_TIMESTAMP_COL_ID;
if (TD_IS_TP_ROW_T(rowType)) { if (TD_IS_TP_ROW_T(rowType)) {
*toffset = (spd->cols + schemaIdx)->toffset; // the offset of firstPart *toffset = (spd->cols + schemaIdx)->toffset; // the offset of firstPart
*colIdx = schemaIdx; *colIdx = schemaIdx;
@ -132,7 +132,7 @@ static FORCE_INLINE int32_t setBlockInfo(SSubmitBlk *pBlocks, STableDataBlocks*
int32_t schemaIdxCompar(const void *lhs, const void *rhs); int32_t schemaIdxCompar(const void *lhs, const void *rhs);
int32_t boundIdxCompar(const void *lhs, const void *rhs); int32_t boundIdxCompar(const void *lhs, const void *rhs);
void setBoundColumnInfo(SParsedDataColInfo* pColList, SSchema* pSchema, int32_t numOfCols); void setBoundColumnInfo(SParsedDataColInfo *pColList, SSchema *pSchema, col_id_t numOfCols);
void destroyBoundColumnInfo(SParsedDataColInfo* pColList); void destroyBoundColumnInfo(SParsedDataColInfo* pColList);
void destroyBlockArrayList(SArray* pDataBlockList); void destroyBlockArrayList(SArray* pDataBlockList);
void destroyBlockHashmap(SHashObj* pDataBlockHash); void destroyBlockHashmap(SHashObj* pDataBlockHash);

View File

@ -600,9 +600,9 @@ static int32_t parseValueToken(char** end, SToken* pToken, SSchema* pSchema, int
typedef struct SMemParam { typedef struct SMemParam {
SRowBuilder* rb; SRowBuilder* rb;
SSchema* schema; SSchema* schema;
int32_t toffset; int32_t toffset;
int32_t colIdx; col_id_t colIdx;
} SMemParam; } SMemParam;
static FORCE_INLINE int32_t MemRowAppend(const void* value, int32_t len, void* param) { static FORCE_INLINE int32_t MemRowAppend(const void* value, int32_t len, void* param) {
@ -623,9 +623,11 @@ static FORCE_INLINE int32_t MemRowAppend(const void* value, int32_t len, void* p
tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NORM, rowEnd, false, pa->toffset, pa->colIdx); tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NORM, rowEnd, false, pa->toffset, pa->colIdx);
} else { } else {
if (value == NULL) { // it is a null data if (value == NULL) { // it is a null data
tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NULL, value, false, pa->toffset, pa->colIdx); tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NULL, value, false, pa->toffset,
pa->colIdx);
} else { } else {
tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NORM, value, false, pa->toffset, pa->colIdx); tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NORM, value, false, pa->toffset,
pa->colIdx);
} }
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -633,18 +635,18 @@ static FORCE_INLINE int32_t MemRowAppend(const void* value, int32_t len, void* p
// pSql -> tag1_name, ...) // pSql -> tag1_name, ...)
static int32_t parseBoundColumns(SInsertParseContext* pCxt, SParsedDataColInfo* pColList, SSchema* pSchema) { static int32_t parseBoundColumns(SInsertParseContext* pCxt, SParsedDataColInfo* pColList, SSchema* pSchema) {
int32_t nCols = pColList->numOfCols; int16_t nCols = pColList->numOfCols;
pColList->numOfBound = 0; pColList->numOfBound = 0;
pColList->boundNullLen = 0; pColList->boundNullLen = 0;
memset(pColList->boundedColumns, 0, sizeof(int32_t) * nCols); memset(pColList->boundColumns, 0, sizeof(int16_t) * nCols);
for (int32_t i = 0; i < nCols; ++i) { for (int32_t i = 0; i < nCols; ++i) {
pColList->cols[i].valStat = VAL_STAT_NONE; pColList->cols[i].valStat = VAL_STAT_NONE;
} }
SToken sToken; SToken sToken;
bool isOrdered = true; bool isOrdered = true;
int32_t lastColIdx = -1; // last column found int16_t lastColIdx = -1; // last column found
while (1) { while (1) {
NEXT_TOKEN(pCxt->pSql, sToken); NEXT_TOKEN(pCxt->pSql, sToken);
@ -652,8 +654,8 @@ static int32_t parseBoundColumns(SInsertParseContext* pCxt, SParsedDataColInfo*
break; break;
} }
int32_t t = lastColIdx + 1; int16_t t = lastColIdx + 1;
int32_t index = findCol(&sToken, t, nCols, pSchema); int16_t index = findCol(&sToken, t, nCols, pSchema);
if (index < 0 && t > 0) { if (index < 0 && t > 0) {
index = findCol(&sToken, 0, t, pSchema); index = findCol(&sToken, 0, t, pSchema);
isOrdered = false; isOrdered = false;
@ -666,7 +668,7 @@ static int32_t parseBoundColumns(SInsertParseContext* pCxt, SParsedDataColInfo*
} }
lastColIdx = index; lastColIdx = index;
pColList->cols[index].valStat = VAL_STAT_HAS; pColList->cols[index].valStat = VAL_STAT_HAS;
pColList->boundedColumns[pColList->numOfBound] = index + PRIMARYKEY_TIMESTAMP_COL_ID; pColList->boundColumns[pColList->numOfBound] = index + PRIMARYKEY_TIMESTAMP_COL_ID;
++pColList->numOfBound; ++pColList->numOfBound;
switch (pSchema[t].type) { switch (pSchema[t].type) {
case TSDB_DATA_TYPE_BINARY: case TSDB_DATA_TYPE_BINARY:
@ -689,18 +691,19 @@ static int32_t parseBoundColumns(SInsertParseContext* pCxt, SParsedDataColInfo*
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
SBoundIdxInfo* pColIdx = pColList->colIdxInfo; SBoundIdxInfo* pColIdx = pColList->colIdxInfo;
for (uint16_t i = 0; i < pColList->numOfBound; ++i) { for (int16_t i = 0; i < pColList->numOfBound; ++i) {
pColIdx[i].schemaColIdx = (uint16_t)pColList->boundedColumns[i]; pColIdx[i].schemaColIdx = pColList->boundColumns[i];
pColIdx[i].boundIdx = i; pColIdx[i].boundIdx = i;
} }
qsort(pColIdx, pColList->numOfBound, sizeof(SBoundIdxInfo), schemaIdxCompar); qsort(pColIdx, pColList->numOfBound, sizeof(SBoundIdxInfo), schemaIdxCompar);
for (uint16_t i = 0; i < pColList->numOfBound; ++i) { for (int16_t i = 0; i < pColList->numOfBound; ++i) {
pColIdx[i].finalIdx = i; pColIdx[i].finalIdx = i;
} }
qsort(pColIdx, pColList->numOfBound, sizeof(SBoundIdxInfo), boundIdxCompar); qsort(pColIdx, pColList->numOfBound, sizeof(SBoundIdxInfo), boundIdxCompar);
} }
memset(&pColList->boundedColumns[pColList->numOfBound], 0, sizeof(int32_t) * (pColList->numOfCols - pColList->numOfBound)); memset(&pColList->boundColumns[pColList->numOfBound], 0,
sizeof(int16_t) * (pColList->numOfCols - pColList->numOfBound));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -714,8 +717,8 @@ typedef struct SKvParam {
static int32_t KvRowAppend(const void *value, int32_t len, void *param) { static int32_t KvRowAppend(const void *value, int32_t len, void *param) {
SKvParam* pa = (SKvParam*) param; SKvParam* pa = (SKvParam*) param;
int32_t type = pa->schema->type; int8_t type = pa->schema->type;
int32_t colId = pa->schema->colId; int16_t colId = pa->schema->colId;
if (TSDB_DATA_TYPE_BINARY == type) { if (TSDB_DATA_TYPE_BINARY == type) {
STR_WITH_SIZE_TO_VARSTR(pa->buf, value, len); STR_WITH_SIZE_TO_VARSTR(pa->buf, value, len);
@ -747,7 +750,7 @@ static int32_t parseTagsClause(SInsertParseContext* pCxt, SSchema* pTagsSchema,
char tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW] = {0}; // used for deleting Escape character: \\, \', \" char tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW] = {0}; // used for deleting Escape character: \\, \', \"
for (int i = 0; i < pCxt->tags.numOfBound; ++i) { for (int i = 0; i < pCxt->tags.numOfBound; ++i) {
NEXT_TOKEN_WITH_PREV(pCxt->pSql, sToken); NEXT_TOKEN_WITH_PREV(pCxt->pSql, sToken);
SSchema* pSchema = &pTagsSchema[pCxt->tags.boundedColumns[i]]; SSchema* pSchema = &pTagsSchema[pCxt->tags.boundColumns[i]];
param.schema = pSchema; param.schema = pSchema;
CHECK_CODE(parseValueToken(&pCxt->pSql, &sToken, pSchema, precision, tmpTokenBuf, KvRowAppend, &param, &pCxt->msg)); CHECK_CODE(parseValueToken(&pCxt->pSql, &sToken, pSchema, precision, tmpTokenBuf, KvRowAppend, &param, &pCxt->msg));
} }
@ -813,9 +816,9 @@ static int parseOneRow(SInsertParseContext* pCxt, STableDataBlocks* pDataBlocks,
// 1. set the parsed value from sql string // 1. set the parsed value from sql string
for (int i = 0; i < spd->numOfBound; ++i) { for (int i = 0; i < spd->numOfBound; ++i) {
NEXT_TOKEN_WITH_PREV(pCxt->pSql, sToken); NEXT_TOKEN_WITH_PREV(pCxt->pSql, sToken);
SSchema *pSchema = &schema[spd->boundedColumns[i] - 1]; SSchema* pSchema = &schema[spd->boundColumns[i] - 1];
param.schema = pSchema; param.schema = pSchema;
getMemRowAppendInfo(schema, pBuilder->rowType, spd, i, &param.toffset, &param.colIdx); getSTSRowAppendInfo(schema, pBuilder->rowType, spd, i, &param.toffset, &param.colIdx);
CHECK_CODE(parseValueToken(&pCxt->pSql, &sToken, pSchema, timePrec, tmpTokenBuf, MemRowAppend, &param, &pCxt->msg)); CHECK_CODE(parseValueToken(&pCxt->pSql, &sToken, pSchema, timePrec, tmpTokenBuf, MemRowAppend, &param, &pCxt->msg));
if (PRIMARYKEY_TIMESTAMP_COL_ID == pSchema->colId) { if (PRIMARYKEY_TIMESTAMP_COL_ID == pSchema->colId) {

View File

@ -43,11 +43,11 @@ static int32_t rowDataCompar(const void *lhs, const void *rhs) {
} }
} }
void setBoundColumnInfo(SParsedDataColInfo* pColList, SSchema* pSchema, int32_t numOfCols) { void setBoundColumnInfo(SParsedDataColInfo* pColList, SSchema* pSchema, col_id_t numOfCols) {
pColList->numOfCols = numOfCols; pColList->numOfCols = numOfCols;
pColList->numOfBound = numOfCols; pColList->numOfBound = numOfCols;
pColList->orderStatus = ORDER_STATUS_ORDERED; // default is ORDERED for non-bound mode pColList->orderStatus = ORDER_STATUS_ORDERED; // default is ORDERED for non-bound mode
pColList->boundedColumns = taosMemoryCalloc(pColList->numOfCols, sizeof(int32_t)); pColList->boundColumns = taosMemoryCalloc(pColList->numOfCols, sizeof(col_id_t));
pColList->cols = taosMemoryCalloc(pColList->numOfCols, sizeof(SBoundColumn)); pColList->cols = taosMemoryCalloc(pColList->numOfCols, sizeof(SBoundColumn));
pColList->colIdxInfo = NULL; pColList->colIdxInfo = NULL;
pColList->flen = 0; pColList->flen = 0;
@ -73,7 +73,7 @@ void setBoundColumnInfo(SParsedDataColInfo* pColList, SSchema* pSchema, int32_t
default: default:
break; break;
} }
pColList->boundedColumns[i] = pSchema[i].colId; pColList->boundColumns[i] = pSchema[i].colId;
} }
pColList->allNullLen += pColList->flen; pColList->allNullLen += pColList->flen;
pColList->boundNullLen = pColList->allNullLen; // default set allNullLen pColList->boundNullLen = pColList->allNullLen; // default set allNullLen
@ -103,7 +103,7 @@ int32_t boundIdxCompar(const void *lhs, const void *rhs) {
} }
void destroyBoundColumnInfo(SParsedDataColInfo* pColList) { void destroyBoundColumnInfo(SParsedDataColInfo* pColList) {
taosMemoryFreeClear(pColList->boundedColumns); taosMemoryFreeClear(pColList->boundColumns);
taosMemoryFreeClear(pColList->cols); taosMemoryFreeClear(pColList->cols);
taosMemoryFreeClear(pColList->colIdxInfo); taosMemoryFreeClear(pColList->colIdxInfo);
} }

View File

@ -1959,7 +1959,7 @@ typedef struct SVgroupTablesBatch {
char dbName[TSDB_DB_NAME_LEN]; char dbName[TSDB_DB_NAME_LEN];
} SVgroupTablesBatch; } SVgroupTablesBatch;
static void toSchema(const SColumnDefNode* pCol, int32_t colId, SSchema* pSchema) { static void toSchema(const SColumnDefNode* pCol, col_id_t colId, SSchema* pSchema) {
pSchema->colId = colId; pSchema->colId = colId;
pSchema->type = pCol->dataType.type; pSchema->type = pCol->dataType.type;
pSchema->bytes = pCol->dataType.bytes; pSchema->bytes = pCol->dataType.bytes;

View File

@ -87,7 +87,7 @@ private:
return meta_; return meta_;
} }
int32_t colId_; col_id_t colId_;
int32_t rowsize_; int32_t rowsize_;
std::shared_ptr<MockTableMeta> meta_; std::shared_ptr<MockTableMeta> meta_;
}; };

View File

@ -193,7 +193,7 @@ char *jobTaskStatusStr(int32_t status) {
return "UNKNOWN"; return "UNKNOWN";
} }
SSchema createSchema(uint8_t type, int32_t bytes, int32_t colId, const char* name) { SSchema createSchema(int8_t type, int32_t bytes, col_id_t colId, const char* name) {
SSchema s = {0}; SSchema s = {0};
s.type = type; s.type = type;
s.bytes = bytes; s.bytes = bytes;

View File

@ -188,7 +188,7 @@ static int32_t queryConvertTableMetaMsg(STableMetaRsp *pMetaMsg) {
} }
if (pMetaMsg->pSchemas[0].colId != PRIMARYKEY_TIMESTAMP_COL_ID) { if (pMetaMsg->pSchemas[0].colId != PRIMARYKEY_TIMESTAMP_COL_ID) {
qError("invalid colId[%d] for the first column in table meta rsp msg", pMetaMsg->pSchemas[0].colId); qError("invalid colId[%" PRIi16 "] for the first column in table meta rsp msg", pMetaMsg->pSchemas[0].colId);
return TSDB_CODE_TSC_INVALID_VALUE; return TSDB_CODE_TSC_INVALID_VALUE;
} }

View File

@ -173,7 +173,7 @@ int32_t qwBuildAndSendShowRsp(SRpcMsg *pMsg, int32_t code) {
return -1; return -1;
} }
int32_t cols = 0; col_id_t cols = 0;
SSchema *pSchema = showRsp.tableMeta.pSchemas; SSchema *pSchema = showRsp.tableMeta.pSchemas;
const SSchema *s = tGetTbnameColumnSchema(); const SSchema *s = tGetTbnameColumnSchema();