diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index bb14b28499..072db868d0 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -89,18 +89,26 @@ typedef struct SBoundColumn { bool hasVal; // denote if current column has bound or not int32_t offset; // all column offset value } SBoundColumn; + typedef struct { uint16_t schemaColIdx; uint16_t boundIdx; uint16_t finalIdx; } SBoundIdxInfo; + +typedef enum _COL_ORDER_STATUS { + ORDER_STATUS_UNKNOWN = 0, + ORDER_STATUS_ORDERED = 1, + ORDER_STATUS_DISORDERED = 2, +} EOrderStatus; + typedef struct SParsedDataColInfo { - bool isOrdered; // bounded columns int16_t numOfCols; int16_t numOfBound; int32_t * boundedColumns; // bounded column idx according to schema SBoundColumn * cols; SBoundIdxInfo *colIdxInfo; + int8_t orderStatus; // bounded columns: } SParsedDataColInfo; typedef struct { @@ -409,6 +417,11 @@ extern int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo); void tscBuildVgroupTableInfo(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, SArray* tables); int16_t getNewResColId(SSqlCmd* pCmd); +int32_t schemaIdxCompar(const void *lhs, const void *rhs); +int32_t boundIdxCompar(const void *lhs, const void *rhs); +int initSMemRowHelper(SMemRowHelper *pHelper, SSchema *pSSchema, uint16_t nCols, uint16_t allNullColsLen); +int32_t getExtendedRowSize(STableComInfo *tinfo); + #ifdef __cplusplus } #endif diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c index 3b21601caf..edf2421746 100644 --- a/src/client/src/tscParseInsert.c +++ b/src/client/src/tscParseInsert.c @@ -45,10 +45,10 @@ static int32_t tscAllocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSiz static int32_t parseBoundColumns(SInsertStatementParam *pInsertParam, SParsedDataColInfo *pColInfo, SSchema *pSchema, char *str, char **end); -static FORCE_INLINE int32_t getExtendedRowSize(STableComInfo *tinfo) { +int32_t getExtendedRowSize(STableComInfo *tinfo) { return tinfo->rowSize + PAYLOAD_HEADER_LEN + PAYLOAD_COL_HEAD_LEN * tinfo->numOfColumns; } -static int initSMemRowHelper(SMemRowHelper *pHelper, SSchema *pSSchema, uint16_t nCols, uint16_t allNullColsLen) { +int initSMemRowHelper(SMemRowHelper *pHelper, SSchema *pSSchema, uint16_t nCols, uint16_t allNullColsLen) { pHelper->allNullLen = allNullColsLen; // TODO: get allNullColsLen when creating or altering table meta if (pHelper->allNullLen == 0) { for (uint16_t i = 0; i < nCols; ++i) { @@ -56,9 +56,9 @@ static int initSMemRowHelper(SMemRowHelper *pHelper, SSchema *pSSchema, uint16_t int32_t typeLen = TYPE_BYTES[type]; pHelper->allNullLen += typeLen; if (TSDB_DATA_TYPE_BINARY == type) { - pHelper->allNullLen += (sizeof(VarDataLenT) + CHAR_BYTES); + pHelper->allNullLen += (VARSTR_HEADER_SIZE + CHAR_BYTES); } else if (TSDB_DATA_TYPE_NCHAR == type) { - int len = sizeof(VarDataLenT) + TSDB_NCHAR_SIZE; + int len = VARSTR_HEADER_SIZE + TSDB_NCHAR_SIZE; pHelper->allNullLen += len; } } @@ -867,14 +867,14 @@ int tsParseOneRow(char **str, STableDataBlocks *pDataBlocks, int16_t timePrec, i TDRowLenT kvRowColLen = 0; TDRowLenT colValAppended = 0; - if(!spd->isOrdered) { + if (spd->orderStatus == ORDER_STATUS_DISORDERED) { ASSERT(spd->colIdxInfo != NULL); if(!isPrimaryKey) { kvStart = POINTER_SHIFT(kvPrimaryKeyStart, spd->colIdxInfo[i].finalIdx * PAYLOAD_COL_HEAD_LEN); } else { ASSERT(spd->colIdxInfo[i].finalIdx == 0); } - } + } // the primary key locates in 1st column int32_t ret = tsParseOneColumnKV(pSchema, &sToken, payload, kvPrimaryKeyStart, kvStart, pInsertParam->msg, str, isPrimaryKey, timePrec, payloadValOffset + colValOffset, &colValAppended, @@ -891,7 +891,7 @@ int tsParseOneRow(char **str, STableDataBlocks *pDataBlocks, int16_t timePrec, i payloadColSetOffset(kvPrimaryKeyStart, colValOffset); } else { payloadColSetOffset(kvStart, colValOffset); - if(spd->isOrdered) { + if (spd->orderStatus == ORDER_STATUS_ORDERED) { kvStart += PAYLOAD_COL_HEAD_LEN; // move to next column } } @@ -929,7 +929,7 @@ static int32_t rowDataCompar(const void *lhs, const void *rhs) { return left > right ? 1 : -1; } } -static int32_t schemaIdxCompar(const void *lhs, const void *rhs) { +int32_t schemaIdxCompar(const void *lhs, const void *rhs) { uint16_t left = *(uint16_t *)lhs; uint16_t right = *(uint16_t *)rhs; @@ -940,7 +940,7 @@ static int32_t schemaIdxCompar(const void *lhs, const void *rhs) { } } -static int32_t boundIdxCompar(const void *lhs, const void *rhs) { +int32_t boundIdxCompar(const void *lhs, const void *rhs) { uint16_t left = *(uint16_t *)POINTER_SHIFT(lhs, sizeof(uint16_t)); uint16_t right = *(uint16_t *)POINTER_SHIFT(rhs, sizeof(uint16_t)); @@ -1020,7 +1020,7 @@ int32_t tsParseValues(char **str, STableDataBlocks *pDataBlock, int maxRows, SIn void tscSetBoundColumnInfo(SParsedDataColInfo *pColInfo, SSchema *pSchema, int32_t numOfCols) { pColInfo->numOfCols = numOfCols; pColInfo->numOfBound = numOfCols; - pColInfo->isOrdered = true; + pColInfo->orderStatus = ORDER_STATUS_UNKNOWN; pColInfo->boundedColumns = calloc(pColInfo->numOfCols, sizeof(int32_t)); pColInfo->cols = calloc(pColInfo->numOfCols, sizeof(SBoundColumn)); pColInfo->colIdxInfo = NULL; @@ -1606,7 +1606,7 @@ static int32_t parseBoundColumns(SInsertStatementParam *pInsertParam, SParsedDat } } - pColInfo->isOrdered = isOrdered; + pColInfo->orderStatus = isOrdered ? ORDER_STATUS_ORDERED : ORDER_STATUS_DISORDERED; if (!isOrdered) { pColInfo->colIdxInfo = tcalloc(pColInfo->numOfBound, sizeof(SBoundIdxInfo)); diff --git a/src/client/src/tscPrepare.c b/src/client/src/tscPrepare.c index 08d3cc599e..68fa2454d1 100644 --- a/src/client/src/tscPrepare.c +++ b/src/client/src/tscPrepare.c @@ -291,6 +291,7 @@ static char* normalStmtBuildSql(STscStmt* stmt) { return taosStringBuilderGetResult(&sb, NULL); } +#if 0 static int fillColumnsNull(STableDataBlocks* pBlock, int32_t rowNum) { SParsedDataColInfo* spd = &pBlock->boundColumnInfo; int32_t offset = 0; @@ -318,8 +319,135 @@ static int fillColumnsNull(STableDataBlocks* pBlock, int32_t rowNum) { return TSDB_CODE_SUCCESS; } +#endif + +/** + * input: + * - schema: + * - payload: + * - spd: + * output: + * - pBlock with data block replaced by K-V format + */ +static int refactorPayload(STableDataBlocks* pBlock, int32_t rowNum) { + SParsedDataColInfo* spd = &pBlock->boundColumnInfo; + SSchema* schema = (SSchema*)pBlock->pTableMeta->schema; + SMemRowHelper* pHelper = &pBlock->rowHelper; + STableMeta* pTableMeta = pBlock->pTableMeta; + STableComInfo tinfo = tscGetTableInfo(pTableMeta); + int code = TSDB_CODE_SUCCESS; + int32_t extendedRowSize = getExtendedRowSize(&tinfo); + TDRowTLenT destPayloadSize = sizeof(SSubmitBlk); + + ASSERT(pHelper->allNullLen >= 8); + + TDRowTLenT destAllocSize = sizeof(SSubmitBlk) + rowNum * extendedRowSize; + SSubmitBlk* pDestBlock = tcalloc(destAllocSize, 1); + if (pDestBlock == NULL) { + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + memcpy(pDestBlock, pBlock->pData, sizeof(SSubmitBlk)); + char* destPayload = (char*)pDestBlock + sizeof(SSubmitBlk); + + char* srcPayload = (char*)pBlock->pData + sizeof(SSubmitBlk); + + for (int n = 0; n < rowNum; ++n) { + payloadSetNCols(destPayload, spd->numOfBound); + + TDRowTLenT dataRowLen = pHelper->allNullLen; + TDRowTLenT kvRowLen = TD_MEM_ROW_KV_VER_SIZE + sizeof(SColIdx) * spd->numOfBound; + TDRowTLenT payloadValOffset = payloadValuesOffset(destPayload); // rely on payloadNCols + TDRowLenT colValOffset = 0; + + char* kvPrimaryKeyStart = destPayload + PAYLOAD_HEADER_LEN; // primaryKey in 1st column tuple + char* kvStart = kvPrimaryKeyStart + PAYLOAD_COL_HEAD_LEN; // the column tuple behind the primaryKey + + for (int32_t i = 0; i < spd->numOfBound; ++i) { + int32_t colIndex = spd->boundedColumns[i]; + ASSERT(spd->cols[colIndex].hasVal); + char* start = srcPayload + spd->cols[colIndex].offset; + SSchema* pSchema = &schema[colIndex]; // get colId here + bool isPrimaryKey = (colIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX); + + // the primary key locates in 1st column + if (spd->orderStatus == ORDER_STATUS_DISORDERED) { + ASSERT(spd->colIdxInfo != NULL); + if (!isPrimaryKey) { + kvStart = POINTER_SHIFT(kvPrimaryKeyStart, spd->colIdxInfo[i].finalIdx * PAYLOAD_COL_HEAD_LEN); + } else { + ASSERT(spd->colIdxInfo[i].finalIdx == 0); + } + } + if (isPrimaryKey) { + payloadColSetId(kvPrimaryKeyStart, pSchema->colId); + payloadColSetType(kvPrimaryKeyStart, pSchema->type); + payloadColSetOffset(kvPrimaryKeyStart, colValOffset); + memcpy(POINTER_SHIFT(destPayload, payloadValOffset + colValOffset), start, TYPE_BYTES[pSchema->type]); + colValOffset += TYPE_BYTES[pSchema->type]; + kvRowLen += TYPE_BYTES[pSchema->type]; + } else { + payloadColSetId(kvStart, pSchema->colId); + payloadColSetType(kvStart, pSchema->type); + payloadColSetOffset(kvStart, colValOffset); + if (IS_VAR_DATA_TYPE(pSchema->type)) { + varDataCopy(POINTER_SHIFT(destPayload, payloadValOffset + colValOffset), start); + colValOffset += varDataTLen(start); + kvRowLen += varDataTLen(start); + if (pSchema->type == TSDB_DATA_TYPE_BINARY) { + dataRowLen += (varDataLen(start) - CHAR_BYTES); + } else if (pSchema->type == TSDB_DATA_TYPE_NCHAR) { + dataRowLen += (varDataLen(start) - TSDB_NCHAR_SIZE); + } else { + ASSERT(0); + } + } else { + memcpy(POINTER_SHIFT(destPayload, payloadValOffset + colValOffset), start, TYPE_BYTES[pSchema->type]); + colValOffset += TYPE_BYTES[pSchema->type]; + kvRowLen += TYPE_BYTES[pSchema->type]; + } + + if (spd->orderStatus == ORDER_STATUS_ORDERED) { + kvStart += PAYLOAD_COL_HEAD_LEN; // move to next column + } + } + } // end of column + + if (kvRowLen < dataRowLen) { + payloadSetType(destPayload, SMEM_ROW_KV); + } else { + payloadSetType(destPayload, SMEM_ROW_DATA); + } + + ASSERT(colValOffset <= TSDB_MAX_BYTES_PER_ROW); + + TDRowTLenT len = payloadValOffset + colValOffset; + payloadSetTLen(destPayload, len); + +#if 0 + TSKEY tsKey = payloadKey(destPayload); + ASSERT((tsKey < 1627747200000000 && tsKey > 1498838400000000) || (tsKey < 1627747200000 && tsKey > 1498838400000) || + (tsKey < 1627747200 && tsKey > 1498838400)); +#endif + + // next loop + srcPayload += pBlock->rowSize; + destPayload += len; + + destPayloadSize += len; + } // end of row + + ASSERT(destPayloadSize <= destAllocSize); + + tfree(pBlock->pData); + pBlock->pData = (char*)pDestBlock; + pBlock->nAllocSize = destAllocSize; + pBlock->size = destPayloadSize; + + return code; +} +#if 0 int32_t fillTablesColumnsNull(SSqlObj* pSql) { SSqlCmd* pCmd = &pSql->cmd; @@ -342,17 +470,98 @@ int32_t fillTablesColumnsNull(SSqlObj* pSql) { return TSDB_CODE_SUCCESS; } +#endif - - -//////////////////////////////////////////////////////////////////////////////// -// functions for insertion statement preparation -static int doBindParam(STableDataBlocks* pBlock, char* data, SParamInfo* param, TAOS_BIND* bind, int32_t colNum) { - if (bind->is_null != NULL && *(bind->is_null)) { - setNull(data + param->offset, param->type, param->bytes); +/** + * check and sort + */ +static int initPayloadEnv(STableDataBlocks* pBlock, int32_t rowNum) { + SParsedDataColInfo* spd = &pBlock->boundColumnInfo; + if (spd->orderStatus != ORDER_STATUS_UNKNOWN) { return TSDB_CODE_SUCCESS; } + bool isOrdered = true; + int32_t lastColIdx = -1; + for (int32_t i = 0; i < spd->numOfBound; ++i) { + ASSERT(spd->cols[i].hasVal); + int32_t colIdx = spd->boundedColumns[i]; + if (isOrdered) { + if (lastColIdx > colIdx) { + isOrdered = false; + break; + } else { + lastColIdx = colIdx; + } + } + } + + spd->orderStatus = isOrdered ? ORDER_STATUS_ORDERED : ORDER_STATUS_DISORDERED; + + if (isOrdered) { + spd->colIdxInfo = NULL; + } else { + spd->colIdxInfo = calloc(spd->numOfBound, sizeof(SBoundIdxInfo)); + if (spd->colIdxInfo == NULL) { + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + SBoundIdxInfo* pColIdx = spd->colIdxInfo; + for (uint16_t i = 0; i < spd->numOfBound; ++i) { + pColIdx[i].schemaColIdx = (uint16_t)spd->boundedColumns[i]; + pColIdx[i].boundIdx = i; + } + qsort(pColIdx, spd->numOfBound, sizeof(SBoundIdxInfo), schemaIdxCompar); + for (uint16_t i = 0; i < spd->numOfBound; ++i) { + pColIdx[i].finalIdx = i; + } + qsort(pColIdx, spd->numOfBound, sizeof(SBoundIdxInfo), boundIdxCompar); + } + + return TSDB_CODE_SUCCESS; +} + +/** + * Refactor the raw payload structure to K-V format as the in tsParseOneRow() + */ +int32_t fillTablesPayload(SSqlObj* pSql) { + SSqlCmd* pCmd = &pSql->cmd; + int code = TSDB_CODE_SUCCESS; + + STableDataBlocks** p = taosHashIterate(pCmd->insertParam.pTableBlockHashList, NULL); + + STableDataBlocks* pOneTableBlock = *p; + while (pOneTableBlock) { + SSubmitBlk* pBlocks = (SSubmitBlk*)pOneTableBlock->pData; + + if (pBlocks->numOfRows > 0) { + initSMemRowHelper(&pOneTableBlock->rowHelper, tscGetTableSchema(pOneTableBlock->pTableMeta), + tscGetNumOfColumns(pOneTableBlock->pTableMeta), 0); + if ((code = initPayloadEnv(pOneTableBlock, pBlocks->numOfRows)) != TSDB_CODE_SUCCESS) { + return code; + } + if ((code = refactorPayload(pOneTableBlock, pBlocks->numOfRows)) != TSDB_CODE_SUCCESS) { + return code; + }; + } + + p = taosHashIterate(pCmd->insertParam.pTableBlockHashList, p); + if (p == NULL) { + break; + } + + pOneTableBlock = *p; + } + + return code; +} + //////////////////////////////////////////////////////////////////////////////// + // functions for insertion statement preparation + static int doBindParam(STableDataBlocks* pBlock, char* data, SParamInfo* param, TAOS_BIND* bind, int32_t colNum) { + if (bind->is_null != NULL && *(bind->is_null)) { + setNull(data + param->offset, param->type, param->bytes); + return TSDB_CODE_SUCCESS; + } + #if 0 if (0) { // allow user bind param data with different type @@ -1106,9 +1315,12 @@ static int insertStmtExecute(STscStmt* stmt) { pBlk->uid = pTableMeta->id.uid; pBlk->tid = pTableMeta->id.tid; - fillTablesColumnsNull(stmt->pSql); + int code = fillTablesPayload(stmt->pSql); + if (code != TSDB_CODE_SUCCESS) { + return code; + } - int code = tscMergeTableDataBlocks(&stmt->pSql->cmd.insertParam, false); + code = tscMergeTableDataBlocks(&stmt->pSql->cmd.insertParam, false); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -1185,7 +1397,7 @@ static int insertBatchStmtExecute(STscStmt* pStmt) { return TSDB_CODE_TSC_APP_ERROR; } - fillTablesColumnsNull(pStmt->pSql); + fillTablesPayload(pStmt->pSql); if ((code = tscMergeTableDataBlocks(&pStmt->pSql->cmd.insertParam, false)) != TSDB_CODE_SUCCESS) { return code;