refactor the payload to K_V format for prepare mode
This commit is contained in:
parent
889aa8a11b
commit
f24dca541f
|
@ -89,18 +89,26 @@ typedef struct SBoundColumn {
|
||||||
bool hasVal; // denote if current column has bound or not
|
bool hasVal; // denote if current column has bound or not
|
||||||
int32_t offset; // all column offset value
|
int32_t offset; // all column offset value
|
||||||
} SBoundColumn;
|
} SBoundColumn;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
uint16_t schemaColIdx;
|
uint16_t schemaColIdx;
|
||||||
uint16_t boundIdx;
|
uint16_t boundIdx;
|
||||||
uint16_t finalIdx;
|
uint16_t finalIdx;
|
||||||
} SBoundIdxInfo;
|
} SBoundIdxInfo;
|
||||||
|
|
||||||
|
typedef enum _COL_ORDER_STATUS {
|
||||||
|
ORDER_STATUS_UNKNOWN = 0,
|
||||||
|
ORDER_STATUS_ORDERED = 1,
|
||||||
|
ORDER_STATUS_DISORDERED = 2,
|
||||||
|
} EOrderStatus;
|
||||||
|
|
||||||
typedef struct SParsedDataColInfo {
|
typedef struct SParsedDataColInfo {
|
||||||
bool isOrdered; // bounded columns
|
|
||||||
int16_t numOfCols;
|
int16_t numOfCols;
|
||||||
int16_t numOfBound;
|
int16_t numOfBound;
|
||||||
int32_t * boundedColumns; // bounded column idx according to schema
|
int32_t * boundedColumns; // bounded column idx according to schema
|
||||||
SBoundColumn * cols;
|
SBoundColumn * cols;
|
||||||
SBoundIdxInfo *colIdxInfo;
|
SBoundIdxInfo *colIdxInfo;
|
||||||
|
int8_t orderStatus; // bounded columns:
|
||||||
} SParsedDataColInfo;
|
} SParsedDataColInfo;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
@ -409,6 +417,11 @@ extern int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo);
|
||||||
void tscBuildVgroupTableInfo(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, SArray* tables);
|
void tscBuildVgroupTableInfo(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, SArray* tables);
|
||||||
int16_t getNewResColId(SSqlCmd* pCmd);
|
int16_t getNewResColId(SSqlCmd* pCmd);
|
||||||
|
|
||||||
|
int32_t schemaIdxCompar(const void *lhs, const void *rhs);
|
||||||
|
int32_t boundIdxCompar(const void *lhs, const void *rhs);
|
||||||
|
int initSMemRowHelper(SMemRowHelper *pHelper, SSchema *pSSchema, uint16_t nCols, uint16_t allNullColsLen);
|
||||||
|
int32_t getExtendedRowSize(STableComInfo *tinfo);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -45,10 +45,10 @@ static int32_t tscAllocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSiz
|
||||||
static int32_t parseBoundColumns(SInsertStatementParam *pInsertParam, SParsedDataColInfo *pColInfo, SSchema *pSchema,
|
static int32_t parseBoundColumns(SInsertStatementParam *pInsertParam, SParsedDataColInfo *pColInfo, SSchema *pSchema,
|
||||||
char *str, char **end);
|
char *str, char **end);
|
||||||
|
|
||||||
static FORCE_INLINE int32_t getExtendedRowSize(STableComInfo *tinfo) {
|
int32_t getExtendedRowSize(STableComInfo *tinfo) {
|
||||||
return tinfo->rowSize + PAYLOAD_HEADER_LEN + PAYLOAD_COL_HEAD_LEN * tinfo->numOfColumns;
|
return tinfo->rowSize + PAYLOAD_HEADER_LEN + PAYLOAD_COL_HEAD_LEN * tinfo->numOfColumns;
|
||||||
}
|
}
|
||||||
static int initSMemRowHelper(SMemRowHelper *pHelper, SSchema *pSSchema, uint16_t nCols, uint16_t allNullColsLen) {
|
int initSMemRowHelper(SMemRowHelper *pHelper, SSchema *pSSchema, uint16_t nCols, uint16_t allNullColsLen) {
|
||||||
pHelper->allNullLen = allNullColsLen; // TODO: get allNullColsLen when creating or altering table meta
|
pHelper->allNullLen = allNullColsLen; // TODO: get allNullColsLen when creating or altering table meta
|
||||||
if (pHelper->allNullLen == 0) {
|
if (pHelper->allNullLen == 0) {
|
||||||
for (uint16_t i = 0; i < nCols; ++i) {
|
for (uint16_t i = 0; i < nCols; ++i) {
|
||||||
|
@ -56,9 +56,9 @@ static int initSMemRowHelper(SMemRowHelper *pHelper, SSchema *pSSchema, uint16_t
|
||||||
int32_t typeLen = TYPE_BYTES[type];
|
int32_t typeLen = TYPE_BYTES[type];
|
||||||
pHelper->allNullLen += typeLen;
|
pHelper->allNullLen += typeLen;
|
||||||
if (TSDB_DATA_TYPE_BINARY == type) {
|
if (TSDB_DATA_TYPE_BINARY == type) {
|
||||||
pHelper->allNullLen += (sizeof(VarDataLenT) + CHAR_BYTES);
|
pHelper->allNullLen += (VARSTR_HEADER_SIZE + CHAR_BYTES);
|
||||||
} else if (TSDB_DATA_TYPE_NCHAR == type) {
|
} else if (TSDB_DATA_TYPE_NCHAR == type) {
|
||||||
int len = sizeof(VarDataLenT) + TSDB_NCHAR_SIZE;
|
int len = VARSTR_HEADER_SIZE + TSDB_NCHAR_SIZE;
|
||||||
pHelper->allNullLen += len;
|
pHelper->allNullLen += len;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -867,14 +867,14 @@ int tsParseOneRow(char **str, STableDataBlocks *pDataBlocks, int16_t timePrec, i
|
||||||
TDRowLenT kvRowColLen = 0;
|
TDRowLenT kvRowColLen = 0;
|
||||||
TDRowLenT colValAppended = 0;
|
TDRowLenT colValAppended = 0;
|
||||||
|
|
||||||
if(!spd->isOrdered) {
|
if (spd->orderStatus == ORDER_STATUS_DISORDERED) {
|
||||||
ASSERT(spd->colIdxInfo != NULL);
|
ASSERT(spd->colIdxInfo != NULL);
|
||||||
if(!isPrimaryKey) {
|
if(!isPrimaryKey) {
|
||||||
kvStart = POINTER_SHIFT(kvPrimaryKeyStart, spd->colIdxInfo[i].finalIdx * PAYLOAD_COL_HEAD_LEN);
|
kvStart = POINTER_SHIFT(kvPrimaryKeyStart, spd->colIdxInfo[i].finalIdx * PAYLOAD_COL_HEAD_LEN);
|
||||||
} else {
|
} else {
|
||||||
ASSERT(spd->colIdxInfo[i].finalIdx == 0);
|
ASSERT(spd->colIdxInfo[i].finalIdx == 0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// the primary key locates in 1st column
|
// the primary key locates in 1st column
|
||||||
int32_t ret = tsParseOneColumnKV(pSchema, &sToken, payload, kvPrimaryKeyStart, kvStart, pInsertParam->msg, str,
|
int32_t ret = tsParseOneColumnKV(pSchema, &sToken, payload, kvPrimaryKeyStart, kvStart, pInsertParam->msg, str,
|
||||||
isPrimaryKey, timePrec, payloadValOffset + colValOffset, &colValAppended,
|
isPrimaryKey, timePrec, payloadValOffset + colValOffset, &colValAppended,
|
||||||
|
@ -891,7 +891,7 @@ int tsParseOneRow(char **str, STableDataBlocks *pDataBlocks, int16_t timePrec, i
|
||||||
payloadColSetOffset(kvPrimaryKeyStart, colValOffset);
|
payloadColSetOffset(kvPrimaryKeyStart, colValOffset);
|
||||||
} else {
|
} else {
|
||||||
payloadColSetOffset(kvStart, colValOffset);
|
payloadColSetOffset(kvStart, colValOffset);
|
||||||
if(spd->isOrdered) {
|
if (spd->orderStatus == ORDER_STATUS_ORDERED) {
|
||||||
kvStart += PAYLOAD_COL_HEAD_LEN; // move to next column
|
kvStart += PAYLOAD_COL_HEAD_LEN; // move to next column
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -929,7 +929,7 @@ static int32_t rowDataCompar(const void *lhs, const void *rhs) {
|
||||||
return left > right ? 1 : -1;
|
return left > right ? 1 : -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
static int32_t schemaIdxCompar(const void *lhs, const void *rhs) {
|
int32_t schemaIdxCompar(const void *lhs, const void *rhs) {
|
||||||
uint16_t left = *(uint16_t *)lhs;
|
uint16_t left = *(uint16_t *)lhs;
|
||||||
uint16_t right = *(uint16_t *)rhs;
|
uint16_t right = *(uint16_t *)rhs;
|
||||||
|
|
||||||
|
@ -940,7 +940,7 @@ static int32_t schemaIdxCompar(const void *lhs, const void *rhs) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t boundIdxCompar(const void *lhs, const void *rhs) {
|
int32_t boundIdxCompar(const void *lhs, const void *rhs) {
|
||||||
uint16_t left = *(uint16_t *)POINTER_SHIFT(lhs, sizeof(uint16_t));
|
uint16_t left = *(uint16_t *)POINTER_SHIFT(lhs, sizeof(uint16_t));
|
||||||
uint16_t right = *(uint16_t *)POINTER_SHIFT(rhs, sizeof(uint16_t));
|
uint16_t right = *(uint16_t *)POINTER_SHIFT(rhs, sizeof(uint16_t));
|
||||||
|
|
||||||
|
@ -1020,7 +1020,7 @@ int32_t tsParseValues(char **str, STableDataBlocks *pDataBlock, int maxRows, SIn
|
||||||
void tscSetBoundColumnInfo(SParsedDataColInfo *pColInfo, SSchema *pSchema, int32_t numOfCols) {
|
void tscSetBoundColumnInfo(SParsedDataColInfo *pColInfo, SSchema *pSchema, int32_t numOfCols) {
|
||||||
pColInfo->numOfCols = numOfCols;
|
pColInfo->numOfCols = numOfCols;
|
||||||
pColInfo->numOfBound = numOfCols;
|
pColInfo->numOfBound = numOfCols;
|
||||||
pColInfo->isOrdered = true;
|
pColInfo->orderStatus = ORDER_STATUS_UNKNOWN;
|
||||||
pColInfo->boundedColumns = calloc(pColInfo->numOfCols, sizeof(int32_t));
|
pColInfo->boundedColumns = calloc(pColInfo->numOfCols, sizeof(int32_t));
|
||||||
pColInfo->cols = calloc(pColInfo->numOfCols, sizeof(SBoundColumn));
|
pColInfo->cols = calloc(pColInfo->numOfCols, sizeof(SBoundColumn));
|
||||||
pColInfo->colIdxInfo = NULL;
|
pColInfo->colIdxInfo = NULL;
|
||||||
|
@ -1606,7 +1606,7 @@ static int32_t parseBoundColumns(SInsertStatementParam *pInsertParam, SParsedDat
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pColInfo->isOrdered = isOrdered;
|
pColInfo->orderStatus = isOrdered ? ORDER_STATUS_ORDERED : ORDER_STATUS_DISORDERED;
|
||||||
|
|
||||||
if (!isOrdered) {
|
if (!isOrdered) {
|
||||||
pColInfo->colIdxInfo = tcalloc(pColInfo->numOfBound, sizeof(SBoundIdxInfo));
|
pColInfo->colIdxInfo = tcalloc(pColInfo->numOfBound, sizeof(SBoundIdxInfo));
|
||||||
|
|
|
@ -291,6 +291,7 @@ static char* normalStmtBuildSql(STscStmt* stmt) {
|
||||||
return taosStringBuilderGetResult(&sb, NULL);
|
return taosStringBuilderGetResult(&sb, NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if 0
|
||||||
static int fillColumnsNull(STableDataBlocks* pBlock, int32_t rowNum) {
|
static int fillColumnsNull(STableDataBlocks* pBlock, int32_t rowNum) {
|
||||||
SParsedDataColInfo* spd = &pBlock->boundColumnInfo;
|
SParsedDataColInfo* spd = &pBlock->boundColumnInfo;
|
||||||
int32_t offset = 0;
|
int32_t offset = 0;
|
||||||
|
@ -318,8 +319,135 @@ static int fillColumnsNull(STableDataBlocks* pBlock, int32_t rowNum) {
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
/**
|
||||||
|
* input:
|
||||||
|
* - schema:
|
||||||
|
* - payload:
|
||||||
|
* - spd:
|
||||||
|
* output:
|
||||||
|
* - pBlock with data block replaced by K-V format
|
||||||
|
*/
|
||||||
|
static int refactorPayload(STableDataBlocks* pBlock, int32_t rowNum) {
|
||||||
|
SParsedDataColInfo* spd = &pBlock->boundColumnInfo;
|
||||||
|
SSchema* schema = (SSchema*)pBlock->pTableMeta->schema;
|
||||||
|
SMemRowHelper* pHelper = &pBlock->rowHelper;
|
||||||
|
STableMeta* pTableMeta = pBlock->pTableMeta;
|
||||||
|
STableComInfo tinfo = tscGetTableInfo(pTableMeta);
|
||||||
|
int code = TSDB_CODE_SUCCESS;
|
||||||
|
int32_t extendedRowSize = getExtendedRowSize(&tinfo);
|
||||||
|
TDRowTLenT destPayloadSize = sizeof(SSubmitBlk);
|
||||||
|
|
||||||
|
ASSERT(pHelper->allNullLen >= 8);
|
||||||
|
|
||||||
|
TDRowTLenT destAllocSize = sizeof(SSubmitBlk) + rowNum * extendedRowSize;
|
||||||
|
SSubmitBlk* pDestBlock = tcalloc(destAllocSize, 1);
|
||||||
|
if (pDestBlock == NULL) {
|
||||||
|
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
memcpy(pDestBlock, pBlock->pData, sizeof(SSubmitBlk));
|
||||||
|
char* destPayload = (char*)pDestBlock + sizeof(SSubmitBlk);
|
||||||
|
|
||||||
|
char* srcPayload = (char*)pBlock->pData + sizeof(SSubmitBlk);
|
||||||
|
|
||||||
|
for (int n = 0; n < rowNum; ++n) {
|
||||||
|
payloadSetNCols(destPayload, spd->numOfBound);
|
||||||
|
|
||||||
|
TDRowTLenT dataRowLen = pHelper->allNullLen;
|
||||||
|
TDRowTLenT kvRowLen = TD_MEM_ROW_KV_VER_SIZE + sizeof(SColIdx) * spd->numOfBound;
|
||||||
|
TDRowTLenT payloadValOffset = payloadValuesOffset(destPayload); // rely on payloadNCols
|
||||||
|
TDRowLenT colValOffset = 0;
|
||||||
|
|
||||||
|
char* kvPrimaryKeyStart = destPayload + PAYLOAD_HEADER_LEN; // primaryKey in 1st column tuple
|
||||||
|
char* kvStart = kvPrimaryKeyStart + PAYLOAD_COL_HEAD_LEN; // the column tuple behind the primaryKey
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < spd->numOfBound; ++i) {
|
||||||
|
int32_t colIndex = spd->boundedColumns[i];
|
||||||
|
ASSERT(spd->cols[colIndex].hasVal);
|
||||||
|
char* start = srcPayload + spd->cols[colIndex].offset;
|
||||||
|
SSchema* pSchema = &schema[colIndex]; // get colId here
|
||||||
|
bool isPrimaryKey = (colIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX);
|
||||||
|
|
||||||
|
// the primary key locates in 1st column
|
||||||
|
if (spd->orderStatus == ORDER_STATUS_DISORDERED) {
|
||||||
|
ASSERT(spd->colIdxInfo != NULL);
|
||||||
|
if (!isPrimaryKey) {
|
||||||
|
kvStart = POINTER_SHIFT(kvPrimaryKeyStart, spd->colIdxInfo[i].finalIdx * PAYLOAD_COL_HEAD_LEN);
|
||||||
|
} else {
|
||||||
|
ASSERT(spd->colIdxInfo[i].finalIdx == 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (isPrimaryKey) {
|
||||||
|
payloadColSetId(kvPrimaryKeyStart, pSchema->colId);
|
||||||
|
payloadColSetType(kvPrimaryKeyStart, pSchema->type);
|
||||||
|
payloadColSetOffset(kvPrimaryKeyStart, colValOffset);
|
||||||
|
memcpy(POINTER_SHIFT(destPayload, payloadValOffset + colValOffset), start, TYPE_BYTES[pSchema->type]);
|
||||||
|
colValOffset += TYPE_BYTES[pSchema->type];
|
||||||
|
kvRowLen += TYPE_BYTES[pSchema->type];
|
||||||
|
} else {
|
||||||
|
payloadColSetId(kvStart, pSchema->colId);
|
||||||
|
payloadColSetType(kvStart, pSchema->type);
|
||||||
|
payloadColSetOffset(kvStart, colValOffset);
|
||||||
|
if (IS_VAR_DATA_TYPE(pSchema->type)) {
|
||||||
|
varDataCopy(POINTER_SHIFT(destPayload, payloadValOffset + colValOffset), start);
|
||||||
|
colValOffset += varDataTLen(start);
|
||||||
|
kvRowLen += varDataTLen(start);
|
||||||
|
if (pSchema->type == TSDB_DATA_TYPE_BINARY) {
|
||||||
|
dataRowLen += (varDataLen(start) - CHAR_BYTES);
|
||||||
|
} else if (pSchema->type == TSDB_DATA_TYPE_NCHAR) {
|
||||||
|
dataRowLen += (varDataLen(start) - TSDB_NCHAR_SIZE);
|
||||||
|
} else {
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
memcpy(POINTER_SHIFT(destPayload, payloadValOffset + colValOffset), start, TYPE_BYTES[pSchema->type]);
|
||||||
|
colValOffset += TYPE_BYTES[pSchema->type];
|
||||||
|
kvRowLen += TYPE_BYTES[pSchema->type];
|
||||||
|
}
|
||||||
|
|
||||||
|
if (spd->orderStatus == ORDER_STATUS_ORDERED) {
|
||||||
|
kvStart += PAYLOAD_COL_HEAD_LEN; // move to next column
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
} // end of column
|
||||||
|
|
||||||
|
if (kvRowLen < dataRowLen) {
|
||||||
|
payloadSetType(destPayload, SMEM_ROW_KV);
|
||||||
|
} else {
|
||||||
|
payloadSetType(destPayload, SMEM_ROW_DATA);
|
||||||
|
}
|
||||||
|
|
||||||
|
ASSERT(colValOffset <= TSDB_MAX_BYTES_PER_ROW);
|
||||||
|
|
||||||
|
TDRowTLenT len = payloadValOffset + colValOffset;
|
||||||
|
payloadSetTLen(destPayload, len);
|
||||||
|
|
||||||
|
#if 0
|
||||||
|
TSKEY tsKey = payloadKey(destPayload);
|
||||||
|
ASSERT((tsKey < 1627747200000000 && tsKey > 1498838400000000) || (tsKey < 1627747200000 && tsKey > 1498838400000) ||
|
||||||
|
(tsKey < 1627747200 && tsKey > 1498838400));
|
||||||
|
#endif
|
||||||
|
|
||||||
|
// next loop
|
||||||
|
srcPayload += pBlock->rowSize;
|
||||||
|
destPayload += len;
|
||||||
|
|
||||||
|
destPayloadSize += len;
|
||||||
|
} // end of row
|
||||||
|
|
||||||
|
ASSERT(destPayloadSize <= destAllocSize);
|
||||||
|
|
||||||
|
tfree(pBlock->pData);
|
||||||
|
pBlock->pData = (char*)pDestBlock;
|
||||||
|
pBlock->nAllocSize = destAllocSize;
|
||||||
|
pBlock->size = destPayloadSize;
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
#if 0
|
||||||
int32_t fillTablesColumnsNull(SSqlObj* pSql) {
|
int32_t fillTablesColumnsNull(SSqlObj* pSql) {
|
||||||
SSqlCmd* pCmd = &pSql->cmd;
|
SSqlCmd* pCmd = &pSql->cmd;
|
||||||
|
|
||||||
|
@ -342,17 +470,98 @@ int32_t fillTablesColumnsNull(SSqlObj* pSql) {
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
/**
|
||||||
|
* check and sort
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
*/
|
||||||
// functions for insertion statement preparation
|
static int initPayloadEnv(STableDataBlocks* pBlock, int32_t rowNum) {
|
||||||
static int doBindParam(STableDataBlocks* pBlock, char* data, SParamInfo* param, TAOS_BIND* bind, int32_t colNum) {
|
SParsedDataColInfo* spd = &pBlock->boundColumnInfo;
|
||||||
if (bind->is_null != NULL && *(bind->is_null)) {
|
if (spd->orderStatus != ORDER_STATUS_UNKNOWN) {
|
||||||
setNull(data + param->offset, param->type, param->bytes);
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool isOrdered = true;
|
||||||
|
int32_t lastColIdx = -1;
|
||||||
|
for (int32_t i = 0; i < spd->numOfBound; ++i) {
|
||||||
|
ASSERT(spd->cols[i].hasVal);
|
||||||
|
int32_t colIdx = spd->boundedColumns[i];
|
||||||
|
if (isOrdered) {
|
||||||
|
if (lastColIdx > colIdx) {
|
||||||
|
isOrdered = false;
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
|
lastColIdx = colIdx;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
spd->orderStatus = isOrdered ? ORDER_STATUS_ORDERED : ORDER_STATUS_DISORDERED;
|
||||||
|
|
||||||
|
if (isOrdered) {
|
||||||
|
spd->colIdxInfo = NULL;
|
||||||
|
} else {
|
||||||
|
spd->colIdxInfo = calloc(spd->numOfBound, sizeof(SBoundIdxInfo));
|
||||||
|
if (spd->colIdxInfo == NULL) {
|
||||||
|
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
SBoundIdxInfo* pColIdx = spd->colIdxInfo;
|
||||||
|
for (uint16_t i = 0; i < spd->numOfBound; ++i) {
|
||||||
|
pColIdx[i].schemaColIdx = (uint16_t)spd->boundedColumns[i];
|
||||||
|
pColIdx[i].boundIdx = i;
|
||||||
|
}
|
||||||
|
qsort(pColIdx, spd->numOfBound, sizeof(SBoundIdxInfo), schemaIdxCompar);
|
||||||
|
for (uint16_t i = 0; i < spd->numOfBound; ++i) {
|
||||||
|
pColIdx[i].finalIdx = i;
|
||||||
|
}
|
||||||
|
qsort(pColIdx, spd->numOfBound, sizeof(SBoundIdxInfo), boundIdxCompar);
|
||||||
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Refactor the raw payload structure to K-V format as the in tsParseOneRow()
|
||||||
|
*/
|
||||||
|
int32_t fillTablesPayload(SSqlObj* pSql) {
|
||||||
|
SSqlCmd* pCmd = &pSql->cmd;
|
||||||
|
int code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
|
STableDataBlocks** p = taosHashIterate(pCmd->insertParam.pTableBlockHashList, NULL);
|
||||||
|
|
||||||
|
STableDataBlocks* pOneTableBlock = *p;
|
||||||
|
while (pOneTableBlock) {
|
||||||
|
SSubmitBlk* pBlocks = (SSubmitBlk*)pOneTableBlock->pData;
|
||||||
|
|
||||||
|
if (pBlocks->numOfRows > 0) {
|
||||||
|
initSMemRowHelper(&pOneTableBlock->rowHelper, tscGetTableSchema(pOneTableBlock->pTableMeta),
|
||||||
|
tscGetNumOfColumns(pOneTableBlock->pTableMeta), 0);
|
||||||
|
if ((code = initPayloadEnv(pOneTableBlock, pBlocks->numOfRows)) != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
if ((code = refactorPayload(pOneTableBlock, pBlocks->numOfRows)) != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
p = taosHashIterate(pCmd->insertParam.pTableBlockHashList, p);
|
||||||
|
if (p == NULL) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
pOneTableBlock = *p;
|
||||||
|
}
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
// functions for insertion statement preparation
|
||||||
|
static int doBindParam(STableDataBlocks* pBlock, char* data, SParamInfo* param, TAOS_BIND* bind, int32_t colNum) {
|
||||||
|
if (bind->is_null != NULL && *(bind->is_null)) {
|
||||||
|
setNull(data + param->offset, param->type, param->bytes);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
#if 0
|
#if 0
|
||||||
if (0) {
|
if (0) {
|
||||||
// allow user bind param data with different type
|
// allow user bind param data with different type
|
||||||
|
@ -1106,9 +1315,12 @@ static int insertStmtExecute(STscStmt* stmt) {
|
||||||
pBlk->uid = pTableMeta->id.uid;
|
pBlk->uid = pTableMeta->id.uid;
|
||||||
pBlk->tid = pTableMeta->id.tid;
|
pBlk->tid = pTableMeta->id.tid;
|
||||||
|
|
||||||
fillTablesColumnsNull(stmt->pSql);
|
int code = fillTablesPayload(stmt->pSql);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
int code = tscMergeTableDataBlocks(&stmt->pSql->cmd.insertParam, false);
|
code = tscMergeTableDataBlocks(&stmt->pSql->cmd.insertParam, false);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -1185,7 +1397,7 @@ static int insertBatchStmtExecute(STscStmt* pStmt) {
|
||||||
return TSDB_CODE_TSC_APP_ERROR;
|
return TSDB_CODE_TSC_APP_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
fillTablesColumnsNull(pStmt->pSql);
|
fillTablesPayload(pStmt->pSql);
|
||||||
|
|
||||||
if ((code = tscMergeTableDataBlocks(&pStmt->pSql->cmd.insertParam, false)) != TSDB_CODE_SUCCESS) {
|
if ((code = tscMergeTableDataBlocks(&pStmt->pSql->cmd.insertParam, false)) != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
|
|
Loading…
Reference in New Issue