Merge pull request #18498 from taosdata/refact/submit_req_wxy

enh: replace row format
This commit is contained in:
Xiaoyu Wang 2022-11-28 00:09:27 +08:00 committed by GitHub
commit 0ded55cb52
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 548 additions and 313 deletions

View File

@ -2080,6 +2080,10 @@ int tEncodeSVCreateTbReq(SEncoder* pCoder, const SVCreateTbReq* pReq);
int tDecodeSVCreateTbReq(SDecoder* pCoder, SVCreateTbReq* pReq);
static FORCE_INLINE void tdDestroySVCreateTbReq(SVCreateTbReq* req) {
if (NULL == req) {
return;
}
taosMemoryFreeClear(req->name);
taosMemoryFreeClear(req->comment);
if (req->type == TSDB_CHILD_TABLE) {
@ -3246,6 +3250,7 @@ typedef struct {
int32_t tEncodeSSubmitReq2(SEncoder* pCoder, const SSubmitReq2* pReq);
int32_t tDecodeSSubmitReq2(SDecoder* pCoder, SSubmitReq2** ppReq);
void tDestroySSubmitTbData(SSubmitTbData* pTbData);
void tDestroySSubmitReq2(SSubmitReq2* pReq);
#pragma pack(pop)

View File

@ -354,33 +354,33 @@ typedef struct SVgDataBlocks {
void* pData; // SSubmitReq + SSubmitBlk + ...
} SVgDataBlocks;
typedef void (*FFreeDataBlockHash)(SHashObj*);
typedef void (*FFreeDataBlockArray)(SArray*);
typedef void (*FFreeTableBlockHash)(SHashObj*);
typedef void (*FFreeVgourpBlockArray)(SArray*);
typedef struct SVnodeModifOpStmt {
ENodeType nodeType;
ENodeType sqlNodeType;
SArray* pDataBlocks; // data block for each vgroup, SArray<SVgDataBlocks*>.
uint32_t insertType; // insert data from [file|sql statement| bound statement]
const char* pSql; // current sql statement position
int32_t totalRowsNum;
int32_t totalTbNum;
SName targetTableName;
SName usingTableName;
const char* pBoundCols;
struct STableMeta* pTableMeta;
SHashObj* pVgroupsHashObj;
SHashObj* pTableBlockHashObj;
SHashObj* pSubTableHashObj;
SHashObj* pTableNameHashObj;
SHashObj* pDbFNameHashObj;
SArray* pVgDataBlocks;
SVCreateTbReq createTblReq;
TdFilePtr fp;
FFreeDataBlockHash freeHashFunc;
FFreeDataBlockArray freeArrayFunc;
bool usingTableProcessing;
bool fileProcessing;
ENodeType nodeType;
ENodeType sqlNodeType;
SArray* pDataBlocks; // data block for each vgroup, SArray<SVgDataBlocks*>.
uint32_t insertType; // insert data from [file|sql statement| bound statement]
const char* pSql; // current sql statement position
int32_t totalRowsNum;
int32_t totalTbNum;
SName targetTableName;
SName usingTableName;
const char* pBoundCols;
struct STableMeta* pTableMeta;
SHashObj* pVgroupsHashObj;
SHashObj* pTableBlockHashObj; // SHashObj<tuid, STableDataCxt*>
SHashObj* pSubTableHashObj;
SHashObj* pTableNameHashObj;
SHashObj* pDbFNameHashObj;
SArray* pVgDataBlocks; // SArray<SVgroupDataCxt*>
SVCreateTbReq* pCreateTblReq;
TdFilePtr fp;
FFreeTableBlockHash freeHashFunc;
FFreeVgourpBlockArray freeArrayFunc;
bool usingTableProcessing;
bool fileProcessing;
} SVnodeModifOpStmt;
typedef struct SExplainOptions {

View File

@ -6830,6 +6830,10 @@ _exit:
return 0;
}
void tDestroySSubmitTbData(SSubmitTbData *pTbData) {
// todo
}
void tDestroySSubmitReq2(SSubmitReq2 *pReq) {
if (NULL == pReq) return;

View File

@ -807,7 +807,8 @@ void nodesDestroyNode(SNode* pNode) {
if (pStmt->freeArrayFunc) {
pStmt->freeArrayFunc(pStmt->pVgDataBlocks);
}
tdDestroySVCreateTbReq(&pStmt->createTblReq);
tdDestroySVCreateTbReq(pStmt->pCreateTblReq);
taosMemoryFreeClear(pStmt->pCreateTblReq);
taosCloseFile(&pStmt->fp);
break;
}

View File

@ -20,8 +20,6 @@
struct SToken;
#define IS_DATA_COL_ORDERED(spd) ((spd->orderStatus) == (int8_t)ORDER_STATUS_ORDERED)
#define NEXT_TOKEN(pSql, sToken) \
do { \
int32_t index = 0; \
@ -37,6 +35,8 @@ struct SToken;
} \
} while (0)
#define IS_DATA_COL_ORDERED(spd) ((spd->orderStatus) == (int8_t)ORDER_STATUS_ORDERED)
typedef enum EOrderStatus {
ORDER_STATUS_UNKNOWN = 0,
ORDER_STATUS_ORDERED = 1,
@ -141,4 +141,34 @@ int32_t insCheckTimestamp(STableDataBlocks *pDataBlocks, const char *start);
int32_t insBuildOutput(SHashObj *pVgroupsHashObj, SArray *pVgDataBlocks, SArray **pDataBlocks);
void insDestroyDataBlock(STableDataBlocks *pDataBlock);
typedef struct SBoundColInfo {
int32_t *pColIndex; // bound index => schema index
int32_t numOfCols;
int32_t numOfBound;
} SBoundColInfo;
typedef struct STableDataCxt {
STableMeta *pMeta;
STSchema *pSchema;
SBoundColInfo boundColsInfo;
SArray *pValues;
SVCreateTbReq *pCreateTblReq;
SSubmitTbData *pData;
} STableDataCxt;
typedef struct SVgroupDataCxt {
int32_t vgId;
SSubmitReq2 *pData;
} SVgroupDataCxt;
int32_t insInitBoundColsInfo(int32_t numOfBound, SBoundColInfo *pInfo);
int32_t insGetTableDataCxt(SHashObj *pHash, void *id, int32_t idLen, STableMeta *pTableMeta,
SVCreateTbReq **pCreateTbReq, STableDataCxt **pTableCxt);
int32_t insMergeTableDataCxt(SHashObj *pTableHash, SArray **pVgDataBlocks);
int32_t insBuildVgDataBlocks(SHashObj *pVgroupsHashObj, SArray *pVgDataBlocks, SArray **pDataBlocks);
void insDestroyTableDataCxtHashMap(SHashObj *pTableCxtHash);
void insDestroyVgroupDataCxt(SVgroupDataCxt *pVgCxt);
void insDestroyVgroupDataCxtList(SArray *pVgCxtList);
void insDestroyVgroupDataCxtHashMap(SHashObj *pVgCxtHash);
#endif // TDENGINE_PAR_INSERT_UTIL_H

View File

@ -38,12 +38,12 @@
} while (TK_NK_SPACE == (token).type)
typedef struct SInsertParseContext {
SParseContext* pComCxt;
SMsgBuf msg;
char tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW];
SParsedDataColInfo tags; // for stmt
bool missCache;
bool usingDuplicateTable;
SParseContext* pComCxt;
SMsgBuf msg;
char tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW];
SBoundColInfo tags; // for stmt
bool missCache;
bool usingDuplicateTable;
} SInsertParseContext;
typedef int32_t (*_row_append_fn_t)(SMsgBuf* pMsgBuf, const void* value, int32_t len, void* param);
@ -172,21 +172,19 @@ static int32_t parseDuplicateUsingClause(SInsertParseContext* pCxt, SVnodeModifO
}
// pStmt->pSql -> field1_name, ...)
static int32_t parseBoundColumns(SInsertParseContext* pCxt, const char** pSql, bool isTags,
SParsedDataColInfo* pColList, SSchema* pSchema) {
col_id_t nCols = pColList->numOfCols;
pColList->numOfBound = 0;
pColList->boundNullLen = 0;
memset(pColList->boundColumns, 0, sizeof(col_id_t) * nCols);
for (col_id_t i = 0; i < nCols; ++i) {
pColList->cols[i].valStat = VAL_STAT_NONE;
static int32_t parseBoundColumns(SInsertParseContext* pCxt, const char** pSql, bool isTags, SSchema* pSchema,
SBoundColInfo* pBoundInfo) {
bool* pUseCols = taosMemoryCalloc(pBoundInfo->numOfCols, sizeof(bool));
if (NULL == pUseCols) {
return TSDB_CODE_OUT_OF_MEMORY;
}
SToken token;
bool isOrdered = true;
col_id_t lastColIdx = -1; // last column found
while (1) {
pBoundInfo->numOfBound = 0;
int32_t lastColIdx = -1; // last column found
int32_t code = TSDB_CODE_SUCCESS;
while (TSDB_CODE_SUCCESS == code) {
SToken token;
NEXT_TOKEN(*pSql, token);
if (TK_NK_RP == token.type) {
@ -198,64 +196,30 @@ static int32_t parseBoundColumns(SInsertParseContext* pCxt, const char** pSql, b
token.z = tmpTokenBuf;
token.n = strdequote(token.z);
col_id_t t = lastColIdx + 1;
col_id_t index = insFindCol(&token, t, nCols, pSchema);
int32_t t = lastColIdx + 1;
int32_t index = insFindCol(&token, t, pBoundInfo->numOfCols, pSchema);
if (index < 0 && t > 0) {
index = insFindCol(&token, 0, t, pSchema);
isOrdered = false;
}
if (index < 0) {
return generateSyntaxErrMsg(&pCxt->msg, TSDB_CODE_PAR_INVALID_COLUMN, token.z);
}
if (pColList->cols[index].valStat == VAL_STAT_HAS) {
return buildSyntaxErrMsg(&pCxt->msg, "duplicated column name", token.z);
}
lastColIdx = index;
pColList->cols[index].valStat = VAL_STAT_HAS;
pColList->boundColumns[pColList->numOfBound] = index;
++pColList->numOfBound;
switch (pSchema[t].type) {
case TSDB_DATA_TYPE_BINARY:
pColList->boundNullLen += (sizeof(VarDataOffsetT) + VARSTR_HEADER_SIZE + CHAR_BYTES);
break;
case TSDB_DATA_TYPE_NCHAR:
pColList->boundNullLen += (sizeof(VarDataOffsetT) + VARSTR_HEADER_SIZE + TSDB_NCHAR_SIZE);
break;
default:
pColList->boundNullLen += TYPE_BYTES[pSchema[t].type];
break;
code = generateSyntaxErrMsg(&pCxt->msg, TSDB_CODE_PAR_INVALID_COLUMN, token.z);
} else if (pUseCols[index]) {
code = buildSyntaxErrMsg(&pCxt->msg, "duplicated column name", token.z);
} else {
lastColIdx = index;
pUseCols[index] = true;
pBoundInfo->pColIndex[pBoundInfo->numOfBound] = index;
++pBoundInfo->numOfBound;
}
}
if (!isTags && pColList->cols[0].valStat == VAL_STAT_NONE) {
return buildInvalidOperationMsg(&pCxt->msg, "primary timestamp column can not be null");
if (TSDB_CODE_SUCCESS == code && !isTags && !pUseCols[0]) {
code = buildInvalidOperationMsg(&pCxt->msg, "primary timestamp column can not be null");
}
pColList->orderStatus = isOrdered ? ORDER_STATUS_ORDERED : ORDER_STATUS_DISORDERED;
taosMemoryFree(pUseCols);
if (!isOrdered) {
pColList->colIdxInfo = taosMemoryCalloc(pColList->numOfBound, sizeof(SBoundIdxInfo));
if (NULL == pColList->colIdxInfo) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
SBoundIdxInfo* pColIdx = pColList->colIdxInfo;
for (col_id_t i = 0; i < pColList->numOfBound; ++i) {
pColIdx[i].schemaColIdx = pColList->boundColumns[i];
pColIdx[i].boundIdx = i;
}
taosSort(pColIdx, pColList->numOfBound, sizeof(SBoundIdxInfo), insSchemaIdxCompar);
for (col_id_t i = 0; i < pColList->numOfBound; ++i) {
pColIdx[i].finalIdx = i;
}
taosSort(pColIdx, pColList->numOfBound, sizeof(SBoundIdxInfo), insBoundIdxCompar);
}
if (pColList->numOfCols > pColList->numOfBound) {
memset(&pColList->boundColumns[pColList->numOfBound], 0,
sizeof(col_id_t) * (pColList->numOfCols - pColList->numOfBound));
}
return TSDB_CODE_SUCCESS;
return code;
}
static int parseTime(const char** end, SToken* pToken, int16_t timePrec, int64_t* time, SMsgBuf* pMsgBuf) {
@ -518,8 +482,7 @@ static int32_t parseTagToken(const char** end, SToken* pToken, SSchema* pSchema,
// input pStmt->pSql: [(tag1_name, ...)] TAGS (tag1_value, ...) ...
// output pStmt->pSql: TAGS (tag1_value, ...) ...
static int32_t parseBoundTagsClause(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt) {
SSchema* pTagsSchema = getTableTagSchema(pStmt->pTableMeta);
insSetBoundColumnInfo(&pCxt->tags, pTagsSchema, getNumOfTags(pStmt->pTableMeta));
insInitBoundColsInfo(getNumOfTags(pStmt->pTableMeta), &pCxt->tags);
SToken token;
int32_t index = 0;
@ -529,7 +492,7 @@ static int32_t parseBoundTagsClause(SInsertParseContext* pCxt, SVnodeModifOpStmt
}
pStmt->pSql += index;
return parseBoundColumns(pCxt, &pStmt->pSql, true, &pCxt->tags, pTagsSchema);
return parseBoundColumns(pCxt, &pStmt->pSql, true, getTableTagSchema(pStmt->pTableMeta), &pCxt->tags);
}
static int32_t parseTagValue(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt, SSchema* pTagSchema, SToken* pToken,
@ -561,7 +524,7 @@ static int32_t parseTagValue(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt
}
static void buildCreateTbReq(SVnodeModifOpStmt* pStmt, STag* pTag, SArray* pTagName) {
insBuildCreateTbReq(&pStmt->createTblReq, pStmt->targetTableName.tname, pTag, pStmt->pTableMeta->suid,
insBuildCreateTbReq(pStmt->pCreateTblReq, pStmt->targetTableName.tname, pTag, pStmt->pTableMeta->suid,
pStmt->usingTableName.tname, pTagName, pStmt->pTableMeta->tableInfo.numOfTags);
}
@ -616,7 +579,7 @@ static int32_t parseTagsClauseImpl(SInsertParseContext* pCxt, SVnodeModifOpStmt*
break;
}
SSchema* pTagSchema = &pSchema[pCxt->tags.boundColumns[i]];
SSchema* pTagSchema = &pSchema[pCxt->tags.pColIndex[i]];
isJson = pTagSchema->type == TSDB_DATA_TYPE_JSON;
code = checkAndTrimValue(&token, pCxt->tmpTokenBuf, &pCxt->msg);
if (TSDB_CODE_SUCCESS == code) {
@ -697,8 +660,8 @@ static int32_t parseTableOptions(SInsertParseContext* pCxt, SVnodeModifOpStmt* p
if (TK_NK_INTEGER != token.type) {
return buildSyntaxErrMsg(&pCxt->msg, "Invalid option ttl", token.z);
}
pStmt->createTblReq.ttl = taosStr2Int32(token.z, NULL, 10);
if (pStmt->createTblReq.ttl < 0) {
pStmt->pCreateTblReq->ttl = taosStr2Int32(token.z, NULL, 10);
if (pStmt->pCreateTblReq->ttl < 0) {
return buildSyntaxErrMsg(&pCxt->msg, "Invalid option ttl", token.z);
}
} else if (TK_COMMENT == token.type) {
@ -711,11 +674,11 @@ static int32_t parseTableOptions(SInsertParseContext* pCxt, SVnodeModifOpStmt* p
return buildSyntaxErrMsg(&pCxt->msg, "comment too long", token.z);
}
int32_t len = trimString(token.z, token.n, pCxt->tmpTokenBuf, TSDB_TB_COMMENT_LEN);
pStmt->createTblReq.comment = strndup(pCxt->tmpTokenBuf, len);
if (NULL == pStmt->createTblReq.comment) {
pStmt->pCreateTblReq->comment = strndup(pCxt->tmpTokenBuf, len);
if (NULL == pStmt->pCreateTblReq->comment) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pStmt->createTblReq.commentLen = len;
pStmt->pCreateTblReq->commentLen = len;
} else {
break;
}
@ -916,27 +879,22 @@ static int32_t preParseBoundColumnsClause(SInsertParseContext* pCxt, SVnodeModif
return skipParentheses(pCxt, &pStmt->pSql);
}
static int32_t getTableDataBlocks(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt, STableDataBlocks** pDataBuf) {
static int32_t getTableDataCxt(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt, STableDataCxt** pTableCxt) {
if (pCxt->pComCxt->async) {
uint64_t uid = pStmt->pTableMeta->uid;
if (pStmt->usingTableProcessing) {
pStmt->pTableMeta->uid = 0;
}
return insGetDataBlockFromList(pStmt->pTableBlockHashObj, &uid, sizeof(pStmt->pTableMeta->uid),
TSDB_DEFAULT_PAYLOAD_SIZE, sizeof(SSubmitBlk),
getTableInfo(pStmt->pTableMeta).rowSize, pStmt->pTableMeta, pDataBuf, NULL,
&pStmt->createTblReq);
return insGetTableDataCxt(pStmt->pTableBlockHashObj, &uid, sizeof(pStmt->pTableMeta->uid), pStmt->pTableMeta,
&pStmt->pCreateTblReq, pTableCxt);
}
char tbFName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(&pStmt->targetTableName, tbFName);
return insGetDataBlockFromList(pStmt->pTableBlockHashObj, tbFName, strlen(tbFName), TSDB_DEFAULT_PAYLOAD_SIZE,
sizeof(SSubmitBlk), getTableInfo(pStmt->pTableMeta).rowSize, pStmt->pTableMeta,
pDataBuf, NULL, &pStmt->createTblReq);
return insGetTableDataCxt(pStmt->pTableBlockHashObj, tbFName, strlen(tbFName), pStmt->pTableMeta,
&pStmt->pCreateTblReq, pTableCxt);
}
static int32_t parseBoundColumnsClause(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt,
STableDataBlocks* pDataBuf) {
static int32_t parseBoundColumnsClause(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt, STableDataCxt* pTableCxt) {
SToken token;
int32_t index = 0;
NEXT_TOKEN_KEEP_SQL(pStmt->pSql, token, index);
@ -946,13 +904,13 @@ static int32_t parseBoundColumnsClause(SInsertParseContext* pCxt, SVnodeModifOpS
return buildSyntaxErrMsg(&pCxt->msg, "keyword VALUES or FILE is expected", token.z);
}
// pStmt->pSql -> field1_name, ...)
return parseBoundColumns(pCxt, &pStmt->pSql, false, &pDataBuf->boundColumnInfo,
getTableColumnSchema(pStmt->pTableMeta));
return parseBoundColumns(pCxt, &pStmt->pSql, false, getTableColumnSchema(pStmt->pTableMeta),
&pTableCxt->boundColsInfo);
}
if (NULL != pStmt->pBoundCols) {
return parseBoundColumns(pCxt, &pStmt->pBoundCols, false, &pDataBuf->boundColumnInfo,
getTableColumnSchema(pStmt->pTableMeta));
return parseBoundColumns(pCxt, &pStmt->pBoundCols, false, getTableColumnSchema(pStmt->pTableMeta),
&pTableCxt->boundColsInfo);
}
return TSDB_CODE_SUCCESS;
@ -962,14 +920,13 @@ static int32_t parseBoundColumnsClause(SInsertParseContext* pCxt, SVnodeModifOpS
// 1. [(tag1_name, ...)] ...
// 2. VALUES ... | FILE ...
// output pStmt->pSql: VALUES ... | FILE ...
static int32_t parseSchemaClauseBottom(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt,
STableDataBlocks** pDataBuf) {
static int32_t parseSchemaClauseBottom(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt, STableDataCxt** pTableCxt) {
int32_t code = parseUsingClauseBottom(pCxt, pStmt);
if (TSDB_CODE_SUCCESS == code) {
code = getTableDataBlocks(pCxt, pStmt, pDataBuf);
code = getTableDataCxt(pCxt, pStmt, pTableCxt);
}
if (TSDB_CODE_SUCCESS == code) {
code = parseBoundColumnsClause(pCxt, pStmt, *pDataBuf);
code = parseBoundColumnsClause(pCxt, pStmt, *pTableCxt);
}
return code;
}
@ -992,108 +949,88 @@ static int32_t parseSchemaClauseTop(SInsertParseContext* pCxt, SVnodeModifOpStmt
}
static int32_t parseValueTokenImpl(SInsertParseContext* pCxt, const char** pSql, SToken* pToken, SSchema* pSchema,
int16_t timePrec, _row_append_fn_t func, void* param) {
int64_t iv;
uint64_t uv;
char* endptr = NULL;
int16_t timePrec, SColVal* pVal) {
switch (pSchema->type) {
case TSDB_DATA_TYPE_BOOL: {
if ((pToken->type == TK_NK_BOOL || pToken->type == TK_NK_STRING) && (pToken->n != 0)) {
if (strncmp(pToken->z, "true", pToken->n) == 0) {
return func(&pCxt->msg, &TRUE_VALUE, pSchema->bytes, param);
pVal->value.val = TRUE_VALUE;
} else if (strncmp(pToken->z, "false", pToken->n) == 0) {
return func(&pCxt->msg, &FALSE_VALUE, pSchema->bytes, param);
pVal->value.val = FALSE_VALUE;
} else {
return buildSyntaxErrMsg(&pCxt->msg, "invalid bool data", pToken->z);
}
} else if (pToken->type == TK_NK_INTEGER) {
return func(&pCxt->msg, ((taosStr2Int64(pToken->z, NULL, 10) == 0) ? &FALSE_VALUE : &TRUE_VALUE),
pSchema->bytes, param);
pVal->value.val = ((taosStr2Int64(pToken->z, NULL, 10) == 0) ? FALSE_VALUE : TRUE_VALUE);
} else if (pToken->type == TK_NK_FLOAT) {
return func(&pCxt->msg, ((taosStr2Double(pToken->z, NULL) == 0) ? &FALSE_VALUE : &TRUE_VALUE), pSchema->bytes,
param);
pVal->value.val = ((taosStr2Double(pToken->z, NULL) == 0) ? FALSE_VALUE : TRUE_VALUE);
} else {
return buildSyntaxErrMsg(&pCxt->msg, "invalid bool data", pToken->z);
}
break;
}
case TSDB_DATA_TYPE_TINYINT: {
if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv)) {
if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &pVal->value.val)) {
return buildSyntaxErrMsg(&pCxt->msg, "invalid tinyint data", pToken->z);
} else if (!IS_VALID_TINYINT(iv)) {
} else if (!IS_VALID_TINYINT(pVal->value.val)) {
return buildSyntaxErrMsg(&pCxt->msg, "tinyint data overflow", pToken->z);
}
uint8_t tmpVal = (uint8_t)iv;
return func(&pCxt->msg, &tmpVal, pSchema->bytes, param);
break;
}
case TSDB_DATA_TYPE_UTINYINT: {
if (TSDB_CODE_SUCCESS != toUInteger(pToken->z, pToken->n, 10, &uv)) {
if (TSDB_CODE_SUCCESS != toUInteger(pToken->z, pToken->n, 10, &pVal->value.val)) {
return buildSyntaxErrMsg(&pCxt->msg, "invalid unsigned tinyint data", pToken->z);
} else if (uv > UINT8_MAX) {
} else if (pVal->value.val > UINT8_MAX) {
return buildSyntaxErrMsg(&pCxt->msg, "unsigned tinyint data overflow", pToken->z);
}
uint8_t tmpVal = (uint8_t)uv;
return func(&pCxt->msg, &tmpVal, pSchema->bytes, param);
break;
}
case TSDB_DATA_TYPE_SMALLINT: {
if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv)) {
if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &pVal->value.val)) {
return buildSyntaxErrMsg(&pCxt->msg, "invalid smallint data", pToken->z);
} else if (!IS_VALID_SMALLINT(iv)) {
} else if (!IS_VALID_SMALLINT(pVal->value.val)) {
return buildSyntaxErrMsg(&pCxt->msg, "smallint data overflow", pToken->z);
}
int16_t tmpVal = (int16_t)iv;
return func(&pCxt->msg, &tmpVal, pSchema->bytes, param);
break;
}
case TSDB_DATA_TYPE_USMALLINT: {
if (TSDB_CODE_SUCCESS != toUInteger(pToken->z, pToken->n, 10, &uv)) {
if (TSDB_CODE_SUCCESS != toUInteger(pToken->z, pToken->n, 10, &pVal->value.val)) {
return buildSyntaxErrMsg(&pCxt->msg, "invalid unsigned smallint data", pToken->z);
} else if (uv > UINT16_MAX) {
} else if (pVal->value.val > UINT16_MAX) {
return buildSyntaxErrMsg(&pCxt->msg, "unsigned smallint data overflow", pToken->z);
}
uint16_t tmpVal = (uint16_t)uv;
return func(&pCxt->msg, &tmpVal, pSchema->bytes, param);
break;
}
case TSDB_DATA_TYPE_INT: {
if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv)) {
if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &pVal->value.val)) {
return buildSyntaxErrMsg(&pCxt->msg, "invalid int data", pToken->z);
} else if (!IS_VALID_INT(iv)) {
} else if (!IS_VALID_INT(pVal->value.val)) {
return buildSyntaxErrMsg(&pCxt->msg, "int data overflow", pToken->z);
}
int32_t tmpVal = (int32_t)iv;
return func(&pCxt->msg, &tmpVal, pSchema->bytes, param);
break;
}
case TSDB_DATA_TYPE_UINT: {
if (TSDB_CODE_SUCCESS != toUInteger(pToken->z, pToken->n, 10, &uv)) {
if (TSDB_CODE_SUCCESS != toUInteger(pToken->z, pToken->n, 10, &pVal->value.val)) {
return buildSyntaxErrMsg(&pCxt->msg, "invalid unsigned int data", pToken->z);
} else if (uv > UINT32_MAX) {
} else if (pVal->value.val > UINT32_MAX) {
return buildSyntaxErrMsg(&pCxt->msg, "unsigned int data overflow", pToken->z);
}
uint32_t tmpVal = (uint32_t)uv;
return func(&pCxt->msg, &tmpVal, pSchema->bytes, param);
break;
}
case TSDB_DATA_TYPE_BIGINT: {
if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv)) {
if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &pVal->value.val)) {
return buildSyntaxErrMsg(&pCxt->msg, "invalid bigint data", pToken->z);
}
return func(&pCxt->msg, &iv, pSchema->bytes, param);
break;
}
case TSDB_DATA_TYPE_UBIGINT: {
if (TSDB_CODE_SUCCESS != toUInteger(pToken->z, pToken->n, 10, &uv)) {
if (TSDB_CODE_SUCCESS != toUInteger(pToken->z, pToken->n, 10, &pVal->value.val)) {
return buildSyntaxErrMsg(&pCxt->msg, "invalid unsigned bigint data", pToken->z);
}
return func(&pCxt->msg, &uv, pSchema->bytes, param);
break;
}
case TSDB_DATA_TYPE_FLOAT: {
char* endptr = NULL;
double dv;
if (TK_NK_ILLEGAL == toDouble(pToken, &dv, &endptr)) {
return buildSyntaxErrMsg(&pCxt->msg, "illegal float data", pToken->z);
@ -1102,11 +1039,11 @@ static int32_t parseValueTokenImpl(SInsertParseContext* pCxt, const char** pSql,
isnan(dv)) {
return buildSyntaxErrMsg(&pCxt->msg, "illegal float data", pToken->z);
}
float tmpVal = (float)dv;
return func(&pCxt->msg, &tmpVal, pSchema->bytes, param);
pVal->value.val = *(int64_t*)&dv;
break;
}
case TSDB_DATA_TYPE_DOUBLE: {
char* endptr = NULL;
double dv;
if (TK_NK_ILLEGAL == toDouble(pToken, &dv, &endptr)) {
return buildSyntaxErrMsg(&pCxt->msg, "illegal double data", pToken->z);
@ -1114,49 +1051,68 @@ static int32_t parseValueTokenImpl(SInsertParseContext* pCxt, const char** pSql,
if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || isinf(dv) || isnan(dv)) {
return buildSyntaxErrMsg(&pCxt->msg, "illegal double data", pToken->z);
}
return func(&pCxt->msg, &dv, pSchema->bytes, param);
pVal->value.val = *(int64_t*)&dv;
break;
}
case TSDB_DATA_TYPE_BINARY: {
// Too long values will raise the invalid sql error message
if (pToken->n + VARSTR_HEADER_SIZE > pSchema->bytes) {
return generateSyntaxErrMsg(&pCxt->msg, TSDB_CODE_PAR_VALUE_TOO_LONG, pSchema->name);
}
return func(&pCxt->msg, pToken->z, pToken->n, param);
pVal->value.pData = pToken->z;
pVal->value.nData = pToken->n;
break;
}
case TSDB_DATA_TYPE_NCHAR: {
return func(&pCxt->msg, pToken->z, pToken->n, param);
// if the converted output len is over than pColumnModel->bytes, return error: 'Argument list too long'
int32_t len = 0;
char* pUcs4 = taosMemoryCalloc(1, pSchema->bytes - VARSTR_HEADER_SIZE);
if (NULL == pUcs4) {
return TSDB_CODE_OUT_OF_MEMORY;
}
if (!taosMbsToUcs4(pToken->z, pToken->n, (TdUcs4*)pUcs4, pSchema->bytes - VARSTR_HEADER_SIZE, &len)) {
if (errno == E2BIG) {
return generateSyntaxErrMsg(&pCxt->msg, TSDB_CODE_PAR_VALUE_TOO_LONG, pSchema->name);
}
char buf[512] = {0};
snprintf(buf, tListLen(buf), "%s", strerror(errno));
return buildSyntaxErrMsg(&pCxt->msg, buf, pToken->z);
}
pVal->value.pData = pUcs4;
pVal->value.nData = len;
break;
}
case TSDB_DATA_TYPE_JSON: {
if (pToken->n > (TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
return buildSyntaxErrMsg(&pCxt->msg, "json string too long than 4095", pToken->z);
}
return func(&pCxt->msg, pToken->z, pToken->n, param);
pVal->value.pData = pToken->z;
pVal->value.nData = pToken->n;
break;
}
case TSDB_DATA_TYPE_TIMESTAMP: {
int64_t tmpVal;
if (parseTime(pSql, pToken, timePrec, &tmpVal, &pCxt->msg) != TSDB_CODE_SUCCESS) {
if (parseTime(pSql, pToken, timePrec, &pVal->value.val, &pCxt->msg) != TSDB_CODE_SUCCESS) {
return buildSyntaxErrMsg(&pCxt->msg, "invalid timestamp", pToken->z);
}
return func(&pCxt->msg, &tmpVal, pSchema->bytes, param);
break;
}
default:
return TSDB_CODE_FAILED;
}
return TSDB_CODE_FAILED;
pVal->flag = CV_FLAG_VALUE;
return TSDB_CODE_SUCCESS;
}
static int32_t parseValueToken(SInsertParseContext* pCxt, const char** pSql, SToken* pToken, SSchema* pSchema,
int16_t timePrec, _row_append_fn_t func, void* param) {
int16_t timePrec, SColVal* pVal) {
int32_t code = checkAndTrimValue(pToken, pCxt->tmpTokenBuf, &pCxt->msg);
if (TSDB_CODE_SUCCESS == code && isNullValue(pSchema->type, pToken)) {
if (TSDB_DATA_TYPE_TIMESTAMP == pSchema->type && PRIMARYKEY_TIMESTAMP_COL_ID == pSchema->colId) {
return buildSyntaxErrMsg(&pCxt->msg, "primary timestamp should not be null", pToken->z);
}
return func(&pCxt->msg, NULL, 0, param);
pVal->flag = CV_FLAG_NULL;
return TSDB_CODE_SUCCESS;
}
if (TSDB_CODE_SUCCESS == code && IS_NUMERIC_TYPE(pSchema->type) && pToken->n == 0) {
@ -1164,26 +1120,24 @@ static int32_t parseValueToken(SInsertParseContext* pCxt, const char** pSql, STo
}
if (TSDB_CODE_SUCCESS == code) {
code = parseValueTokenImpl(pCxt, pSql, pToken, pSchema, timePrec, func, param);
code = parseValueTokenImpl(pCxt, pSql, pToken, pSchema, timePrec, pVal);
}
return code;
}
static int parseOneRow(SInsertParseContext* pCxt, const char** pSql, STableDataBlocks* pDataBuf, bool* pGotRow,
static int parseOneRow(SInsertParseContext* pCxt, const char** pSql, STableDataCxt* pTableCxt, bool* pGotRow,
SToken* pToken) {
SRowBuilder* pBuilder = &pDataBuf->rowBuilder;
STSRow* row = (STSRow*)(pDataBuf->pData + pDataBuf->size); // skip the SSubmitBlk header
SParsedDataColInfo* pCols = &pDataBuf->boundColumnInfo;
bool isParseBindParam = false;
SSchema* pSchemas = getTableColumnSchema(pDataBuf->pTableMeta);
SMemParam param = {.rb = pBuilder};
SBoundColInfo* pCols = &pTableCxt->boundColsInfo;
bool isParseBindParam = false;
SSchema* pSchemas = getTableColumnSchema(pTableCxt->pMeta);
int32_t code = tdSRowResetBuf(pBuilder, row);
int32_t code = TSDB_CODE_SUCCESS;
// 1. set the parsed value from sql string
for (int i = 0; i < pCols->numOfBound && TSDB_CODE_SUCCESS == code; ++i) {
NEXT_TOKEN_WITH_PREV(*pSql, *pToken);
SSchema* pSchema = &pSchemas[pCols->boundColumns[i]];
SSchema* pSchema = &pSchemas[pCols->pColIndex[i]];
SColVal* pVal = taosArrayGet(pTableCxt->pValues, pCols->pColIndex[i]);
if (pToken->type == TK_NK_QUESTION) {
isParseBindParam = true;
@ -1202,10 +1156,7 @@ static int parseOneRow(SInsertParseContext* pCxt, const char** pSql, STableDataB
}
if (TSDB_CODE_SUCCESS == code) {
param.schema = pSchema;
insGetSTSRowAppendInfo(pBuilder->rowType, pCols, i, &param.toffset, &param.colIdx);
code = parseValueToken(pCxt, pSql, pToken, pSchema, getTableInfo(pDataBuf->pTableMeta).precision, insMemRowAppend,
&param);
code = parseValueToken(pCxt, pSql, pToken, pSchema, getTableInfo(pTableCxt->pMeta).precision, pVal);
}
if (TSDB_CODE_SUCCESS == code && i < pCols->numOfBound - 1) {
@ -1217,64 +1168,22 @@ static int parseOneRow(SInsertParseContext* pCxt, const char** pSql, STableDataB
}
if (TSDB_CODE_SUCCESS == code) {
TSKEY tsKey = TD_ROW_KEY(row);
code = insCheckTimestamp(pDataBuf, (const char*)&tsKey);
SRow** pRow = taosArrayReserve(pTableCxt->pData->aRowP, 1);
code = tRowBuild(pTableCxt->pValues, pTableCxt->pSchema, pRow);
}
if (TSDB_CODE_SUCCESS == code && !isParseBindParam) {
// set the null value for the columns that do not assign values
if ((pCols->numOfBound < pCols->numOfCols) && TD_IS_TP_ROW(row)) {
pBuilder->hasNone = true;
}
tdSRowEnd(pBuilder);
*pGotRow = true;
#ifdef TD_DEBUG_PRINT_ROW
STSchema* pSTSchema = tBuildTSchema(schema, spd->numOfCols, 1);
tdSRowPrint(row, pSTSchema, __func__);
taosMemoryFree(pSTSchema);
#endif
}
return code;
}
static int32_t allocateMemIfNeed(STableDataBlocks* pDataBlock, int32_t rowSize, int32_t* numOfRows) {
size_t remain = pDataBlock->nAllocSize - pDataBlock->size;
const int factor = 5;
uint32_t nAllocSizeOld = pDataBlock->nAllocSize;
// expand the allocated size
if (remain < rowSize * factor) {
while (remain < rowSize * factor) {
pDataBlock->nAllocSize = (uint32_t)(pDataBlock->nAllocSize * 1.5);
remain = pDataBlock->nAllocSize - pDataBlock->size;
}
char* tmp = taosMemoryRealloc(pDataBlock->pData, (size_t)pDataBlock->nAllocSize);
if (tmp != NULL) {
pDataBlock->pData = tmp;
memset(pDataBlock->pData + pDataBlock->size, 0, pDataBlock->nAllocSize - pDataBlock->size);
} else {
// do nothing, if allocate more memory failed
pDataBlock->nAllocSize = nAllocSizeOld;
*numOfRows = (int32_t)(pDataBlock->nAllocSize - pDataBlock->headerSize) / rowSize;
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
}
*numOfRows = (int32_t)(pDataBlock->nAllocSize - pDataBlock->headerSize) / rowSize;
return TSDB_CODE_SUCCESS;
}
// pSql -> (field1_value, ...) [(field1_value2, ...) ...]
static int32_t parseValues(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt, STableDataBlocks* pDataBuf,
int32_t maxRows, int32_t* pNumOfRows, SToken* pToken) {
int32_t code = insInitRowBuilder(&pDataBuf->rowBuilder, pDataBuf->pTableMeta->sversion, &pDataBuf->boundColumnInfo);
static int32_t parseValues(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt, STableDataCxt* pTableCxt,
int32_t* pNumOfRows, SToken* pToken) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t extendedRowSize = insGetExtendedRowSize(pDataBuf);
(*pNumOfRows) = 0;
while (TSDB_CODE_SUCCESS == code) {
int32_t index = 0;
@ -1284,13 +1193,9 @@ static int32_t parseValues(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt,
}
pStmt->pSql += index;
if ((*pNumOfRows) >= maxRows || pDataBuf->size + extendedRowSize >= pDataBuf->nAllocSize) {
code = allocateMemIfNeed(pDataBuf, extendedRowSize, &maxRows);
}
bool gotRow = false;
if (TSDB_CODE_SUCCESS == code) {
code = parseOneRow(pCxt, &pStmt->pSql, pDataBuf, &gotRow, pToken);
code = parseOneRow(pCxt, &pStmt->pSql, pTableCxt, &gotRow, pToken);
}
if (TSDB_CODE_SUCCESS == code) {
@ -1303,7 +1208,6 @@ static int32_t parseValues(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt,
}
if (TSDB_CODE_SUCCESS == code && gotRow) {
pDataBuf->size += extendedRowSize;
(*pNumOfRows)++;
}
}
@ -1316,19 +1220,11 @@ static int32_t parseValues(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt,
}
// VALUES (field1_value, ...) [(field1_value2, ...) ...]
static int32_t parseValuesClause(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt, STableDataBlocks* pDataBuf,
static int32_t parseValuesClause(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt, STableDataCxt* pTableCxt,
SToken* pToken) {
int32_t maxNumOfRows = 0;
int32_t numOfRows = 0;
int32_t code = allocateMemIfNeed(pDataBuf, insGetExtendedRowSize(pDataBuf), &maxNumOfRows);
int32_t code = parseValues(pCxt, pStmt, pTableCxt, &numOfRows, pToken);
if (TSDB_CODE_SUCCESS == code) {
code = parseValues(pCxt, pStmt, pDataBuf, maxNumOfRows, &numOfRows, pToken);
}
if (TSDB_CODE_SUCCESS == code) {
code = insSetBlockInfo((SSubmitBlk*)(pDataBuf->pData), pDataBuf, numOfRows, &pCxt->msg);
}
if (TSDB_CODE_SUCCESS == code) {
pDataBuf->numOfTables = 1;
pStmt->totalRowsNum += numOfRows;
pStmt->totalTbNum += 1;
TSDB_QUERY_SET_TYPE(pStmt->insertType, TSDB_QUERY_TYPE_INSERT);
@ -1336,11 +1232,9 @@ static int32_t parseValuesClause(SInsertParseContext* pCxt, SVnodeModifOpStmt* p
return code;
}
static int32_t parseCsvFile(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt, STableDataBlocks* pDataBuf,
int maxRows, int32_t* pNumOfRows) {
int32_t code = insInitRowBuilder(&pDataBuf->rowBuilder, pDataBuf->pTableMeta->sversion, &pDataBuf->boundColumnInfo);
int32_t extendedRowSize = insGetExtendedRowSize(pDataBuf);
static int32_t parseCsvFile(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt, STableDataCxt* pTableCxt,
int32_t* pNumOfRows) {
int32_t code = TSDB_CODE_SUCCESS;
(*pNumOfRows) = 0;
char* pLine = NULL;
int64_t readLen = 0;
@ -1354,24 +1248,19 @@ static int32_t parseCsvFile(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt,
continue;
}
if ((*pNumOfRows) >= maxRows || pDataBuf->size + extendedRowSize >= pDataBuf->nAllocSize) {
code = allocateMemIfNeed(pDataBuf, extendedRowSize, &maxRows);
}
bool gotRow = false;
if (TSDB_CODE_SUCCESS == code) {
SToken token;
strtolower(pLine, pLine);
const char* pRow = pLine;
code = parseOneRow(pCxt, (const char**)&pRow, pDataBuf, &gotRow, &token);
code = parseOneRow(pCxt, (const char**)&pRow, pTableCxt, &gotRow, &token);
}
if (TSDB_CODE_SUCCESS == code && gotRow) {
pDataBuf->size += extendedRowSize;
(*pNumOfRows)++;
}
if (TSDB_CODE_SUCCESS == code && pDataBuf->nAllocSize > tsMaxMemUsedByInsert * 1024 * 1024) {
if (TSDB_CODE_SUCCESS == code && (*pNumOfRows) > tsMaxMemUsedByInsert * 1024 * 1024) {
pStmt->fileProcessing = true;
break;
}
@ -1385,18 +1274,10 @@ static int32_t parseCsvFile(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt,
return code;
}
static int32_t parseDataFromFileImpl(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt, STableDataBlocks* pDataBuf) {
int32_t maxNumOfRows = 0;
static int32_t parseDataFromFileImpl(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt, STableDataCxt* pTableCxt) {
int32_t numOfRows = 0;
int32_t code = allocateMemIfNeed(pDataBuf, insGetExtendedRowSize(pDataBuf), &maxNumOfRows);
int32_t code = parseCsvFile(pCxt, pStmt, pTableCxt, &numOfRows);
if (TSDB_CODE_SUCCESS == code) {
code = parseCsvFile(pCxt, pStmt, pDataBuf, maxNumOfRows, &numOfRows);
}
if (TSDB_CODE_SUCCESS == code) {
code = insSetBlockInfo((SSubmitBlk*)(pDataBuf->pData), pDataBuf, numOfRows, &pCxt->msg);
}
if (TSDB_CODE_SUCCESS == code) {
pDataBuf->numOfTables = 1;
pStmt->totalRowsNum += numOfRows;
pStmt->totalTbNum += 1;
TSDB_QUERY_SET_TYPE(pStmt->insertType, TSDB_QUERY_TYPE_FILE_INSERT);
@ -1410,7 +1291,7 @@ static int32_t parseDataFromFileImpl(SInsertParseContext* pCxt, SVnodeModifOpStm
}
static int32_t parseDataFromFile(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt, SToken* pFilePath,
STableDataBlocks* pDataBuf) {
STableDataCxt* pTableCxt) {
char filePathStr[TSDB_FILENAME_LEN] = {0};
if (TK_NK_STRING == pFilePath->type) {
trimString(pFilePath->z, pFilePath->n, filePathStr, sizeof(filePathStr));
@ -1422,27 +1303,27 @@ static int32_t parseDataFromFile(SInsertParseContext* pCxt, SVnodeModifOpStmt* p
return TAOS_SYSTEM_ERROR(errno);
}
return parseDataFromFileImpl(pCxt, pStmt, pDataBuf);
return parseDataFromFileImpl(pCxt, pStmt, pTableCxt);
}
static int32_t parseFileClause(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt, STableDataBlocks* pDataBuf,
static int32_t parseFileClause(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt, STableDataCxt* pTableCxt,
SToken* pToken) {
NEXT_TOKEN(pStmt->pSql, *pToken);
if (0 == pToken->n || (TK_NK_STRING != pToken->type && TK_NK_ID != pToken->type)) {
return buildSyntaxErrMsg(&pCxt->msg, "file path is required following keyword FILE", pToken->z);
}
return parseDataFromFile(pCxt, pStmt, pToken, pDataBuf);
return parseDataFromFile(pCxt, pStmt, pToken, pTableCxt);
}
// VALUES (field1_value, ...) [(field1_value2, ...) ...] | FILE csv_file_path
static int32_t parseDataClause(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt, STableDataBlocks* pDataBuf) {
static int32_t parseDataClause(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt, STableDataCxt* pTableCxt) {
SToken token;
NEXT_TOKEN(pStmt->pSql, token);
switch (token.type) {
case TK_VALUES:
return parseValuesClause(pCxt, pStmt, pDataBuf, &token);
return parseValuesClause(pCxt, pStmt, pTableCxt, &token);
case TK_FILE:
return parseFileClause(pCxt, pStmt, pDataBuf, &token);
return parseFileClause(pCxt, pStmt, pTableCxt, &token);
default:
break;
}
@ -1453,18 +1334,19 @@ static int32_t parseDataClause(SInsertParseContext* pCxt, SVnodeModifOpStmt* pSt
// 1. [(tag1_name, ...)] ...
// 2. VALUES ... | FILE ...
static int32_t parseInsertTableClauseBottom(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt) {
STableDataBlocks* pDataBuf = NULL;
int32_t code = parseSchemaClauseBottom(pCxt, pStmt, &pDataBuf);
STableDataCxt* pTableCxt = NULL;
int32_t code = parseSchemaClauseBottom(pCxt, pStmt, &pTableCxt);
if (TSDB_CODE_SUCCESS == code) {
code = parseDataClause(pCxt, pStmt, pDataBuf);
code = parseDataClause(pCxt, pStmt, pTableCxt);
}
return code;
}
static void resetEnvPreTable(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt) {
destroyBoundColumnInfo(&pCxt->tags);
// destroyBoundColumnInfo(&pCxt->tags);
taosMemoryFreeClear(pStmt->pTableMeta);
tdDestroySVCreateTbReq(&pStmt->createTblReq);
tdDestroySVCreateTbReq(pStmt->pCreateTblReq);
taosMemoryFreeClear(pStmt->pCreateTblReq);
pCxt->missCache = false;
pCxt->usingDuplicateTable = false;
pStmt->pBoundCols = NULL;
@ -1522,6 +1404,7 @@ static int32_t checkTableClauseFirstToken(SInsertParseContext* pCxt, SVnodeModif
}
static int32_t setStmtInfo(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt) {
#if 0
SParsedDataColInfo* tags = taosMemoryMalloc(sizeof(pCxt->tags));
if (NULL == tags) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
@ -1538,6 +1421,9 @@ static int32_t setStmtInfo(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt)
pStmt->pVgroupsHashObj = NULL;
pStmt->pTableBlockHashObj = NULL;
return code;
#else
return TSDB_CODE_FAILED;
#endif
}
static int32_t parseInsertBodyBottom(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt) {
@ -1546,13 +1432,11 @@ static int32_t parseInsertBodyBottom(SInsertParseContext* pCxt, SVnodeModifOpStm
}
// merge according to vgId
int32_t code = TSDB_CODE_SUCCESS;
if (taosHashGetSize(pStmt->pTableBlockHashObj) > 0) {
code = insMergeTableDataBlocks(pStmt->pTableBlockHashObj, &pStmt->pVgDataBlocks);
}
int32_t code = insMergeTableDataCxt(pStmt->pTableBlockHashObj, &pStmt->pVgDataBlocks);
if (TSDB_CODE_SUCCESS == code) {
code = insBuildOutput(pStmt->pVgroupsHashObj, pStmt->pVgDataBlocks, &pStmt->pDataBlocks);
code = insBuildVgDataBlocks(pStmt->pVgroupsHashObj, pStmt->pVgDataBlocks, &pStmt->pDataBlocks);
}
return code;
}
@ -1593,8 +1477,8 @@ static int32_t createVnodeModifOpStmt(SInsertParseContext* pCxt, bool reentry, S
TSDB_QUERY_SET_TYPE(pStmt->insertType, TSDB_QUERY_TYPE_STMT_INSERT);
}
pStmt->pSql = pCxt->pComCxt->pSql;
pStmt->freeHashFunc = insDestroyBlockHashmap;
pStmt->freeArrayFunc = insDestroyBlockArrayList;
pStmt->freeHashFunc = insDestroyTableDataCxtHashMap;
pStmt->freeArrayFunc = insDestroyVgroupDataCxtList;
if (!reentry) {
pStmt->pVgroupsHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
@ -1797,10 +1681,10 @@ static int32_t parseInsertSqlFromStart(SInsertParseContext* pCxt, SVnodeModifOpS
}
static int32_t parseInsertSqlFromCsv(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt) {
STableDataBlocks* pDataBuf = NULL;
int32_t code = getTableDataBlocks(pCxt, pStmt, &pDataBuf);
STableDataCxt* pTableCxt = NULL;
int32_t code = getTableDataCxt(pCxt, pStmt, &pTableCxt);
if (TSDB_CODE_SUCCESS == code) {
code = parseDataFromFileImpl(pCxt, pStmt, pDataBuf);
code = parseDataFromFileImpl(pCxt, pStmt, pTableCxt);
}
if (TSDB_CODE_SUCCESS == code) {
@ -1924,6 +1808,6 @@ int32_t parseInsertSql(SParseContext* pCxt, SQuery** pQuery, SCatalogReq* pCatal
QUERY_EXEC_STAGE_SCHEDULE == (*pQuery)->execStage) {
code = setRefreshMate(*pQuery);
}
destroyBoundColumnInfo(&context.tags);
// destroyBoundColumnInfo(&context.tags);
return code;
}

View File

@ -955,3 +955,314 @@ int32_t insBuildOutput(SHashObj* pVgroupsHashObj, SArray* pVgDataBlocks, SArray*
}
return TSDB_CODE_SUCCESS;
}
static void initBoundCols(int32_t ncols, int32_t* pBoundCols) {
for (int32_t i = 0; i < ncols; ++i) {
pBoundCols[i] = i;
}
}
static void initColValues(STableMeta* pTableMeta, SArray* pValues) {
SSchema* pSchemas = getTableColumnSchema(pTableMeta);
for (int32_t i = 0; i < pTableMeta->tableInfo.numOfColumns; ++i) {
SColVal val = COL_VAL_NONE(pSchemas[i].colId, pSchemas[i].type);
taosArrayPush(pValues, &val);
}
}
int32_t insInitBoundColsInfo(int32_t numOfBound, SBoundColInfo* pInfo) {
pInfo->numOfCols = numOfBound;
pInfo->numOfBound = numOfBound;
pInfo->pColIndex = taosMemoryCalloc(numOfBound, sizeof(int32_t));
if (NULL == pInfo->pColIndex) {
return TSDB_CODE_OUT_OF_MEMORY;
}
initBoundCols(numOfBound, pInfo->pColIndex);
return TSDB_CODE_SUCCESS;
}
static void destroyBoundColInfo(SBoundColInfo* pInfo) { taosMemoryFreeClear(pInfo->pColIndex); }
static int32_t createTableDataCxt(STableMeta* pTableMeta, SVCreateTbReq** pCreateTbReq, STableDataCxt** pOutput) {
STableDataCxt* pTableCxt = taosMemoryCalloc(1, sizeof(STableDataCxt));
if (NULL == pTableCxt) {
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t code = TSDB_CODE_SUCCESS;
pTableCxt->pMeta = tableMetaDup(pTableMeta);
if (NULL == pTableCxt->pMeta) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
if (TSDB_CODE_SUCCESS == code) {
pTableCxt->pSchema =
tBuildTSchema(getTableColumnSchema(pTableMeta), pTableMeta->tableInfo.numOfColumns, pTableMeta->sversion);
if (NULL == pTableCxt->pSchema) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
}
if (TSDB_CODE_SUCCESS == code) {
code = insInitBoundColsInfo(pTableMeta->tableInfo.numOfColumns, &pTableCxt->boundColsInfo);
}
if (TSDB_CODE_SUCCESS == code) {
pTableCxt->pValues = taosArrayInit(pTableMeta->tableInfo.numOfColumns, sizeof(SColVal));
if (NULL == pTableCxt->pValues) {
code = TSDB_CODE_OUT_OF_MEMORY;
} else {
initColValues(pTableMeta, pTableCxt->pValues);
}
}
if (TSDB_CODE_SUCCESS == code) {
pTableCxt->pData = taosMemoryCalloc(1, sizeof(SSubmitTbData));
if (NULL == pTableCxt->pData) {
code = TSDB_CODE_OUT_OF_MEMORY;
} else {
pTableCxt->pData->suid = pTableMeta->suid;
pTableCxt->pData->uid = pTableMeta->uid;
pTableCxt->pData->sver = pTableMeta->sversion;
pTableCxt->pData->aRowP = taosArrayInit(128, POINTER_BYTES);
if (NULL == pTableCxt->pData->aRowP) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
}
}
if (TSDB_CODE_SUCCESS == code) {
pTableCxt->pCreateTblReq = *pCreateTbReq;
*pCreateTbReq = NULL;
*pOutput = pTableCxt;
} else {
taosMemoryFree(pTableCxt);
}
return code;
}
int32_t insGetTableDataCxt(SHashObj* pHash, void* id, int32_t idLen, STableMeta* pTableMeta,
SVCreateTbReq** pCreateTbReq, STableDataCxt** pTableCxt) {
*pTableCxt = taosHashGet(pHash, id, idLen);
if (NULL != *pTableCxt) {
return TSDB_CODE_SUCCESS;
}
int32_t code = createTableDataCxt(pTableMeta, pCreateTbReq, pTableCxt);
if (TSDB_CODE_SUCCESS == code) {
code = taosHashPut(pHash, id, idLen, pTableCxt, POINTER_BYTES);
}
return code;
}
void insDestroyTableDataCxt(STableDataCxt* pTableCxt) {
if (NULL == pTableCxt) {
return;
}
taosMemoryFreeClear(pTableCxt->pMeta);
tDestroyTSchema(pTableCxt->pSchema);
destroyBoundColInfo(&pTableCxt->boundColsInfo);
taosArrayDestroyEx(pTableCxt->pValues, NULL /*todo*/);
tdDestroySVCreateTbReq(pTableCxt->pCreateTblReq);
taosMemoryFreeClear(pTableCxt->pCreateTblReq);
tDestroySSubmitTbData(pTableCxt->pData);
}
void insDestroyVgroupDataCxt(SVgroupDataCxt* pVgCxt) {
if (NULL == pVgCxt) {
return;
}
tDestroySSubmitReq2(pVgCxt->pData);
}
void insDestroyVgroupDataCxtList(SArray* pVgCxtList) {
if (NULL == pVgCxtList) {
return;
}
size_t size = taosArrayGetSize(pVgCxtList);
for (int32_t i = 0; i < size; i++) {
void* p = taosArrayGetP(pVgCxtList, i);
insDestroyVgroupDataCxt(p);
}
taosArrayDestroy(pVgCxtList);
}
void insDestroyVgroupDataCxtHashMap(SHashObj* pVgCxtHash) {
if (NULL == pVgCxtHash) {
return;
}
void** p = taosHashIterate(pVgCxtHash, NULL);
while (p) {
insDestroyVgroupDataCxt(*(SVgroupDataCxt**)p);
p = taosHashIterate(pVgCxtHash, p);
}
taosHashCleanup(pVgCxtHash);
}
void insDestroyTableDataCxtHashMap(SHashObj* pTableCxtHash) {
if (NULL == pTableCxtHash) {
return;
}
void** p = taosHashIterate(pTableCxtHash, NULL);
while (p) {
insDestroyTableDataCxt(*(STableDataCxt**)p);
p = taosHashIterate(pTableCxtHash, p);
}
taosHashCleanup(pTableCxtHash);
}
static int32_t fillVgroupDataCxt(STableDataCxt* pTableCxt, SVgroupDataCxt* pVgCxt) {
if (NULL != pTableCxt->pCreateTblReq) {
if (NULL == pVgCxt->pData->aCreateTbReq) {
pVgCxt->pData->aCreateTbReq = taosArrayInit(128, sizeof(SVCreateTbReq));
if (NULL == pVgCxt->pData->aCreateTbReq) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
taosArrayPush(pVgCxt->pData->aCreateTbReq, pTableCxt->pCreateTblReq);
}
if (NULL == pVgCxt->pData->aSubmitTbData) {
pVgCxt->pData->aSubmitTbData = taosArrayInit(128, sizeof(SSubmitTbData));
if (NULL == pVgCxt->pData->aSubmitTbData) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
taosArrayPush(pVgCxt->pData->aSubmitTbData, pTableCxt->pData);
pTableCxt->pData = NULL;
return TSDB_CODE_SUCCESS;
}
static int32_t createVgroupDataCxt(STableDataCxt* pTableCxt, SHashObj* pVgroupHash, SArray* pVgroupList,
SVgroupDataCxt** pOutput) {
SVgroupDataCxt* pVgCxt = taosMemoryCalloc(1, sizeof(SVgroupDataCxt));
if (NULL == pVgCxt) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pVgCxt->pData = taosMemoryCalloc(1, sizeof(SSubmitReq2));
if (NULL == pVgCxt->pData) {
insDestroyVgroupDataCxt(pVgCxt);
return TSDB_CODE_OUT_OF_MEMORY;
}
pVgCxt->vgId = pTableCxt->pMeta->vgId;
int32_t code = taosHashPut(pVgroupHash, &pVgCxt->vgId, sizeof(pVgCxt->vgId), &pVgCxt, POINTER_BYTES);
if (TSDB_CODE_SUCCESS == code) {
taosArrayPush(pVgroupList, &pVgCxt);
*pOutput = pVgCxt;
} else {
insDestroyVgroupDataCxt(pVgCxt);
}
return code;
}
int32_t insMergeTableDataCxt(SHashObj* pTableHash, SArray** pVgDataBlocks) {
SHashObj* pVgroupHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false);
SArray* pVgroupList = taosArrayInit(8, POINTER_BYTES);
if (NULL == pVgroupHash || NULL == pVgroupList) {
taosHashCleanup(pVgroupHash);
taosArrayDestroy(pVgroupList);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
int32_t code = TSDB_CODE_SUCCESS;
void* p = taosHashIterate(pTableHash, NULL);
while (TSDB_CODE_SUCCESS == code && NULL != p) {
STableDataCxt* pTableCxt = *(STableDataCxt**)p;
code = tRowMergeSort(pTableCxt->pData->aRowP, pTableCxt->pSchema, 0);
if (TSDB_CODE_SUCCESS == code) {
int32_t vgId = pTableCxt->pMeta->vgId;
SVgroupDataCxt* pVgCxt = taosHashGet(pVgroupHash, &vgId, sizeof(vgId));
if (NULL == pVgCxt) {
code = createVgroupDataCxt(pTableCxt, pVgroupHash, pVgroupList, &pVgCxt);
}
if (TSDB_CODE_SUCCESS == code) {
code = fillVgroupDataCxt(pTableCxt, pVgCxt);
}
}
if (TSDB_CODE_SUCCESS == code) {
p = taosHashIterate(pTableHash, p);
}
}
taosHashCleanup(pVgroupHash);
if (TSDB_CODE_SUCCESS == code) {
*pVgDataBlocks = pVgroupList;
} else {
taosArrayDestroy(pVgroupList);
}
return code;
}
static int32_t buildSubmitReq(int32_t vgId, SSubmitReq2* pReq, void** pData, uint32_t* pLen) {
int32_t code = TSDB_CODE_SUCCESS;
uint32_t len = 0;
void* pBuf = NULL;
tEncodeSize(tEncodeSSubmitReq2, pReq, len, code);
if (TSDB_CODE_SUCCESS == code) {
SEncoder encoder;
len += sizeof(SMsgHead);
pBuf = taosMemoryMalloc(len);
if (NULL == pBuf) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
((SMsgHead*)pBuf)->vgId = htonl(vgId);
((SMsgHead*)pBuf)->contLen = htonl(len);
tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SMsgHead)), len - sizeof(SMsgHead));
code = tEncodeSSubmitReq2(&encoder, pReq);
tEncoderClear(&encoder);
}
if (TSDB_CODE_SUCCESS == code) {
*pData = pBuf;
*pLen = len;
} else {
taosMemoryFree(pBuf);
}
return code;
}
int32_t insBuildVgDataBlocks(SHashObj* pVgroupsHashObj, SArray* pVgDataCxtList, SArray** pVgDataBlocks) {
size_t numOfVg = taosArrayGetSize(pVgDataCxtList);
SArray* pDataBlocks = taosArrayInit(numOfVg, POINTER_BYTES);
if (NULL == pDataBlocks) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
int32_t code = TSDB_CODE_SUCCESS;
for (size_t i = 0; TSDB_CODE_SUCCESS == code && i < numOfVg; ++i) {
SVgroupDataCxt* src = taosArrayGetP(pVgDataCxtList, i);
SVgDataBlocks* dst = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
if (NULL == dst) {
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
}
if (TSDB_CODE_SUCCESS == code) {
dst->numOfTables = taosArrayGetSize(src->pData->aSubmitTbData);
code = taosHashGetDup(pVgroupsHashObj, (const char*)&src->vgId, sizeof(src->vgId), &dst->vg);
}
if (TSDB_CODE_SUCCESS == code) {
code = buildSubmitReq(src->vgId, src->pData, &dst->pData, &dst->size);
}
if (TSDB_CODE_SUCCESS == code) {
code = (NULL == taosArrayPush(pDataBlocks, &dst) ? TSDB_CODE_TSC_OUT_OF_MEMORY : TSDB_CODE_SUCCESS);
}
}
if (TSDB_CODE_SUCCESS == code) {
*pVgDataBlocks = pDataBlocks;
} else {
// todo
}
return code;
}