Merge remote-tracking branch 'origin/feat/TS-4243-3.0' into feat/TS-4243-3.0

This commit is contained in:
Haojun Liao 2024-03-26 17:34:43 +08:00
commit fd57ded11e
8 changed files with 178 additions and 154 deletions

View File

@ -197,7 +197,7 @@ typedef struct STableDataCxt {
SBoundColInfo boundColsInfo; SBoundColInfo boundColsInfo;
SArray* pValues; SArray* pValues;
SSubmitTbData* pData; SSubmitTbData* pData;
TSKEY lastTs; SRowKey lastKey;
bool ordered; bool ordered;
bool duplicateTs; bool duplicateTs;
} STableDataCxt; } STableDataCxt;

View File

@ -1632,7 +1632,7 @@ static int32_t translateIrate(SFunctionNode* pFunc, char* pErrBuf, int32_t len)
static int32_t translateIrateImpl(SFunctionNode* pFunc, char* pErrBuf, int32_t len, bool isPartial) { static int32_t translateIrateImpl(SFunctionNode* pFunc, char* pErrBuf, int32_t len, bool isPartial) {
uint8_t colType = getSDataTypeFromNode(nodesListGetNode(pFunc->pParameterList, 0))->type; uint8_t colType = getSDataTypeFromNode(nodesListGetNode(pFunc->pParameterList, 0))->type;
if (isPartial) { if (isPartial) {
if (3 != LIST_LENGTH(pFunc->pParameterList)) { if (3 != LIST_LENGTH(pFunc->pParameterList) && 4 != LIST_LENGTH(pFunc->pParameterList)) {
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
} }
if (!IS_NUMERIC_TYPE(colType)) { if (!IS_NUMERIC_TYPE(colType)) {

View File

@ -2343,6 +2343,11 @@ EFuncDataRequired firstDynDataReq(void* pRes, SDataBlockInfo* pBlockInfo) {
SFirstLastRes* pResult = GET_ROWCELL_INTERBUF(pEntry); SFirstLastRes* pResult = GET_ROWCELL_INTERBUF(pEntry);
if (pResult->hasResult) { if (pResult->hasResult) {
if (pResult->pkBytes > 0) {
pResult->pkData = pResult->buf + pResult->bytes;
} else {
pResult->pkData = NULL;
}
if (pResult->ts < pBlockInfo->window.skey) { if (pResult->ts < pBlockInfo->window.skey) {
return FUNC_DATA_REQUIRED_NOT_LOAD; return FUNC_DATA_REQUIRED_NOT_LOAD;
} else if (pResult->ts == pBlockInfo->window.skey && pResult->pkData) { } else if (pResult->ts == pBlockInfo->window.skey && pResult->pkData) {
@ -2366,6 +2371,11 @@ EFuncDataRequired lastDynDataReq(void* pRes, SDataBlockInfo* pBlockInfo) {
SFirstLastRes* pResult = GET_ROWCELL_INTERBUF(pEntry); SFirstLastRes* pResult = GET_ROWCELL_INTERBUF(pEntry);
if (pResult->hasResult) { if (pResult->hasResult) {
if (pResult->pkBytes > 0) {
pResult->pkData = pResult->buf + pResult->bytes;
} else {
pResult->pkData = NULL;
}
if (pResult->ts > pBlockInfo->window.ekey) { if (pResult->ts > pBlockInfo->window.ekey) {
return FUNC_DATA_REQUIRED_NOT_LOAD; return FUNC_DATA_REQUIRED_NOT_LOAD;
} else if (pResult->ts == pBlockInfo->window.ekey && pResult->pkData) { } else if (pResult->ts == pBlockInfo->window.ekey && pResult->pkData) {
@ -6213,12 +6223,17 @@ static void doSaveRateInfo(SRateInfo* pRateInfo, bool isFirst, int64_t ts, char*
} }
} }
static void initializeRateInfo(SqlFunctionCtx* pCtx, SRateInfo* pRateInfo) { static void initializeRateInfo(SqlFunctionCtx* pCtx, SRateInfo* pRateInfo, bool isMerge) {
if (pCtx->hasPrimaryKey) { if (pCtx->hasPrimaryKey) {
pRateInfo->pkType = pCtx->input.pPrimaryKey->info.type; if (!isMerge) {
pRateInfo->pkBytes = pCtx->input.pPrimaryKey->info.bytes; pRateInfo->pkType = pCtx->input.pPrimaryKey->info.type;
pRateInfo->firstPk = pRateInfo->pkData; pRateInfo->pkBytes = pCtx->input.pPrimaryKey->info.bytes;
pRateInfo->lastPk = pRateInfo->pkData + pRateInfo->pkBytes; pRateInfo->firstPk = pRateInfo->pkData;
pRateInfo->lastPk = pRateInfo->pkData + pRateInfo->pkBytes;
} else {
pRateInfo->firstPk = pRateInfo->pkData;
pRateInfo->lastPk = pRateInfo->pkData + pRateInfo->pkBytes;
}
} else { } else {
pRateInfo->firstPk = NULL; pRateInfo->firstPk = NULL;
pRateInfo->lastPk = NULL; pRateInfo->lastPk = NULL;
@ -6236,7 +6251,7 @@ int32_t irateFunction(SqlFunctionCtx* pCtx) {
funcInputUpdate(pCtx); funcInputUpdate(pCtx);
initializeRateInfo(pCtx, pRateInfo); initializeRateInfo(pCtx, pRateInfo, false);
int32_t numOfElems = 0; int32_t numOfElems = 0;
int32_t type = pInputCol->info.type; int32_t type = pInputCol->info.type;
@ -6358,13 +6373,13 @@ int32_t irateFunctionMerge(SqlFunctionCtx* pCtx) {
} }
SRateInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); SRateInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
initializeRateInfo(pCtx, pInfo); initializeRateInfo(pCtx, pInfo, true);
int32_t start = pInput->startRowIndex; int32_t start = pInput->startRowIndex;
for (int32_t i = start; i < start + pInput->numOfRows; ++i) { for (int32_t i = start; i < start + pInput->numOfRows; ++i) {
char* data = colDataGetData(pCol, i); char* data = colDataGetData(pCol, i);
SRateInfo* pInputInfo = (SRateInfo*)varDataVal(data); SRateInfo* pInputInfo = (SRateInfo*)varDataVal(data);
initializeRateInfo(pCtx, pInfo); initializeRateInfo(pCtx, pInfo, true);
if (pInputInfo->hasResult) { if (pInputInfo->hasResult) {
int32_t code = irateTransferInfo(pInputInfo, pInfo); int32_t code = irateTransferInfo(pInputInfo, pInfo);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {

View File

@ -415,6 +415,8 @@ static int32_t createPartialFunction(const SFunctionNode* pSrcFunc, SFunctionNod
int32_t len = snprintf(name, sizeof(name) - 1, "%s.%p", (*pPartialFunc)->functionName, pSrcFunc); int32_t len = snprintf(name, sizeof(name) - 1, "%s.%p", (*pPartialFunc)->functionName, pSrcFunc);
taosCreateMD5Hash(name, len); taosCreateMD5Hash(name, len);
strncpy((*pPartialFunc)->node.aliasName, name, TSDB_COL_NAME_LEN - 1); strncpy((*pPartialFunc)->node.aliasName, name, TSDB_COL_NAME_LEN - 1);
(*pPartialFunc)->hasPk = pSrcFunc->hasPk;
(*pPartialFunc)->pkBytes = pSrcFunc->pkBytes;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -453,7 +455,8 @@ static int32_t createMidFunction(const SFunctionNode* pSrcFunc, const SFunctionN
} else { } else {
nodesDestroyList(pParameterList); nodesDestroyList(pParameterList);
} }
(*pMidFunc)->hasPk = pPartialFunc->hasPk;
(*pMidFunc)->pkBytes = pPartialFunc->pkBytes;
return code; return code;
} }
@ -482,7 +485,8 @@ static int32_t createMergeFunction(const SFunctionNode* pSrcFunc, const SFunctio
} else { } else {
nodesDestroyList(pParameterList); nodesDestroyList(pParameterList);
} }
(*pMergeFunc)->hasPk = pPartialFunc->hasPk;
(*pMergeFunc)->pkBytes = pPartialFunc->pkBytes;
return code; return code;
} }

View File

@ -45,8 +45,8 @@ int16_t insFindCol(struct SToken *pColname, int16_t start, int16_t end, SSchema
void insBuildCreateTbReq(SVCreateTbReq *pTbReq, const char *tname, STag *pTag, int64_t suid, const char *sname, void insBuildCreateTbReq(SVCreateTbReq *pTbReq, const char *tname, STag *pTag, int64_t suid, const char *sname,
SArray *tagName, uint8_t tagNum, int32_t ttl); SArray *tagName, uint8_t tagNum, int32_t ttl);
int32_t insInitBoundColsInfo(int32_t numOfBound, SBoundColInfo *pInfo); int32_t insInitBoundColsInfo(int32_t numOfBound, SBoundColInfo *pInfo);
void insInitColValues(STableMeta* pTableMeta, SArray* aColValues); void insInitColValues(STableMeta *pTableMeta, SArray *aColValues);
void insCheckTableDataOrder(STableDataCxt *pTableCxt, TSKEY tsKey); void insCheckTableDataOrder(STableDataCxt *pTableCxt, SRowKey *rowKey);
int32_t insGetTableDataCxt(SHashObj *pHash, void *id, int32_t idLen, STableMeta *pTableMeta, int32_t insGetTableDataCxt(SHashObj *pHash, void *id, int32_t idLen, STableMeta *pTableMeta,
SVCreateTbReq **pCreateTbReq, STableDataCxt **pTableCxt, bool colMode, bool ignoreColVals); SVCreateTbReq **pCreateTbReq, STableDataCxt **pTableCxt, bool colMode, bool ignoreColVals);
int32_t initTableColSubmitData(STableDataCxt *pTableCxt); int32_t initTableColSubmitData(STableDataCxt *pTableCxt);

View File

@ -113,7 +113,8 @@ static int32_t smlBuildTagRow(SArray* cols, SBoundColInfo* tags, SSchema* pSchem
SSchema* pTagSchema = &pSchema[tags->pColIndex[i]]; SSchema* pTagSchema = &pSchema[tags->pColIndex[i]];
SSmlKv* kv = taosArrayGet(cols, i); SSmlKv* kv = taosArrayGet(cols, i);
if(kv->keyLen != strlen(pTagSchema->name) || memcmp(kv->key, pTagSchema->name, kv->keyLen) != 0 || kv->type != pTagSchema->type){ if (kv->keyLen != strlen(pTagSchema->name) || memcmp(kv->key, pTagSchema->name, kv->keyLen) != 0 ||
kv->type != pTagSchema->type) {
code = TSDB_CODE_SML_INVALID_DATA; code = TSDB_CODE_SML_INVALID_DATA;
uError("SML smlBuildTagRow error col not same %s", pTagSchema->name); uError("SML smlBuildTagRow error col not same %s", pTagSchema->name);
goto end; goto end;
@ -200,7 +201,9 @@ int32_t smlBuildRow(STableDataCxt* pTableCxt) {
if (TSDB_CODE_SUCCESS != ret) { if (TSDB_CODE_SUCCESS != ret) {
return ret; return ret;
} }
insCheckTableDataOrder(pTableCxt, TD_ROW_KEY(*pRow)); SRowKey key;
tRowGetKey(*pRow, &key);
insCheckTableDataOrder(pTableCxt, &key);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -209,15 +212,16 @@ int32_t smlBuildCol(STableDataCxt* pTableCxt, SSchema* schema, void* data, int32
SSchema* pColSchema = schema + index; SSchema* pColSchema = schema + index;
SColVal* pVal = taosArrayGet(pTableCxt->pValues, index); SColVal* pVal = taosArrayGet(pTableCxt->pValues, index);
SSmlKv* kv = (SSmlKv*)data; SSmlKv* kv = (SSmlKv*)data;
if(kv->keyLen != strlen(pColSchema->name) || memcmp(kv->key, pColSchema->name, kv->keyLen) != 0 || kv->type != pColSchema->type){ if (kv->keyLen != strlen(pColSchema->name) || memcmp(kv->key, pColSchema->name, kv->keyLen) != 0 ||
kv->type != pColSchema->type) {
ret = TSDB_CODE_SML_INVALID_DATA; ret = TSDB_CODE_SML_INVALID_DATA;
char* tmp = taosMemoryCalloc(kv->keyLen + 1, 1); char* tmp = taosMemoryCalloc(kv->keyLen + 1, 1);
if(tmp){ if (tmp) {
memcpy(tmp, kv->key, kv->keyLen); memcpy(tmp, kv->key, kv->keyLen);
uInfo("SML data(name:%s type:%s) is not same like the db data(name:%s type:%s)", uInfo("SML data(name:%s type:%s) is not same like the db data(name:%s type:%s)", tmp, tDataTypes[kv->type].name,
tmp, tDataTypes[kv->type].name, pColSchema->name, tDataTypes[pColSchema->type].name); pColSchema->name, tDataTypes[pColSchema->type].name);
taosMemoryFree(tmp); taosMemoryFree(tmp);
}else{ } else {
uError("SML smlBuildCol out of memory"); uError("SML smlBuildCol out of memory");
} }
goto end; goto end;
@ -225,11 +229,11 @@ int32_t smlBuildCol(STableDataCxt* pTableCxt, SSchema* schema, void* data, int32
if (kv->type == TSDB_DATA_TYPE_NCHAR) { if (kv->type == TSDB_DATA_TYPE_NCHAR) {
int32_t len = 0; int32_t len = 0;
int64_t size = pColSchema->bytes - VARSTR_HEADER_SIZE; int64_t size = pColSchema->bytes - VARSTR_HEADER_SIZE;
if(size <= 0){ if (size <= 0) {
ret = TSDB_CODE_SML_INVALID_DATA; ret = TSDB_CODE_SML_INVALID_DATA;
goto end; goto end;
} }
char* pUcs4 = taosMemoryCalloc(1, size); char* pUcs4 = taosMemoryCalloc(1, size);
if (NULL == pUcs4) { if (NULL == pUcs4) {
ret = TSDB_CODE_OUT_OF_MEMORY; ret = TSDB_CODE_OUT_OF_MEMORY;
goto end; goto end;
@ -351,7 +355,7 @@ int32_t smlBindData(SQuery* query, bool dataFormat, SArray* tags, SArray* colsSc
continue; continue;
} }
SSmlKv* kv = *(SSmlKv**)p; SSmlKv* kv = *(SSmlKv**)p;
if(kv->type != pColSchema->type){ if (kv->type != pColSchema->type) {
ret = buildInvalidOperationMsg(&pBuf, "kv type not equal to col type"); ret = buildInvalidOperationMsg(&pBuf, "kv type not equal to col type");
goto end; goto end;
} }
@ -367,7 +371,8 @@ int32_t smlBindData(SQuery* query, bool dataFormat, SArray* tags, SArray* colsSc
} }
if (!taosMbsToUcs4(kv->value, kv->length, (TdUcs4*)pUcs4, pColSchema->bytes - VARSTR_HEADER_SIZE, &len)) { if (!taosMbsToUcs4(kv->value, kv->length, (TdUcs4*)pUcs4, pColSchema->bytes - VARSTR_HEADER_SIZE, &len)) {
if (errno == E2BIG) { if (errno == E2BIG) {
uError("sml bind taosMbsToUcs4 error, kv length:%d, bytes:%d, kv->value:%s", (int)kv->length, pColSchema->bytes, kv->value); uError("sml bind taosMbsToUcs4 error, kv length:%d, bytes:%d, kv->value:%s", (int)kv->length,
pColSchema->bytes, kv->value);
buildInvalidOperationMsg(&pBuf, "value too long"); buildInvalidOperationMsg(&pBuf, "value too long");
ret = TSDB_CODE_PAR_VALUE_TOO_LONG; ret = TSDB_CODE_PAR_VALUE_TOO_LONG;
goto end; goto end;
@ -396,7 +401,9 @@ int32_t smlBindData(SQuery* query, bool dataFormat, SArray* tags, SArray* colsSc
buildInvalidOperationMsg(&pBuf, "tRowBuild error"); buildInvalidOperationMsg(&pBuf, "tRowBuild error");
goto end; goto end;
} }
insCheckTableDataOrder(pTableCxt, TD_ROW_KEY(*pRow)); SRowKey key;
tRowGetKey(*pRow, &key);
insCheckTableDataOrder(pTableCxt, &key);
clearColValArraySml(pTableCxt->pValues); clearColValArraySml(pTableCxt->pValues);
} }

View File

@ -13,12 +13,12 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "geosWrapper.h"
#include "parInsertUtil.h" #include "parInsertUtil.h"
#include "parToken.h" #include "parToken.h"
#include "scalar.h" #include "scalar.h"
#include "tglobal.h" #include "tglobal.h"
#include "ttime.h" #include "ttime.h"
#include "geosWrapper.h"
typedef struct SInsertParseContext { typedef struct SInsertParseContext {
SParseContext* pComCxt; SParseContext* pComCxt;
@ -29,7 +29,7 @@ typedef struct SInsertParseContext {
bool usingDuplicateTable; bool usingDuplicateTable;
bool forceUpdate; bool forceUpdate;
bool needTableTagVal; bool needTableTagVal;
bool needRequest; // whether or not request server bool needRequest; // whether or not request server
} SInsertParseContext; } SInsertParseContext;
typedef int32_t (*_row_append_fn_t)(SMsgBuf* pMsgBuf, const void* value, int32_t len, void* param); typedef int32_t (*_row_append_fn_t)(SMsgBuf* pMsgBuf, const void* value, int32_t len, void* param);
@ -154,19 +154,15 @@ static int32_t parseDuplicateUsingClause(SInsertParseContext* pCxt, SVnodeModify
return code; return code;
} }
typedef enum { typedef enum { BOUND_TAGS, BOUND_COLUMNS, BOUND_ALL_AND_TBNAME } EBoundColumnsType;
BOUND_TAGS,
BOUND_COLUMNS,
BOUND_ALL_AND_TBNAME
} EBoundColumnsType;
static int32_t getTbnameSchemaIndex(STableMeta* pTableMeta) { static int32_t getTbnameSchemaIndex(STableMeta* pTableMeta) {
return pTableMeta->tableInfo.numOfTags + pTableMeta->tableInfo.numOfColumns; return pTableMeta->tableInfo.numOfTags + pTableMeta->tableInfo.numOfColumns;
} }
// pStmt->pSql -> field1_name, ...) // pStmt->pSql -> field1_name, ...)
static int32_t parseBoundColumns(SInsertParseContext* pCxt, const char** pSql, EBoundColumnsType boundColsType, STableMeta* pTableMeta, static int32_t parseBoundColumns(SInsertParseContext* pCxt, const char** pSql, EBoundColumnsType boundColsType,
SBoundColInfo* pBoundInfo) { STableMeta* pTableMeta, SBoundColInfo* pBoundInfo) {
SSchema* pSchema = NULL; SSchema* pSchema = NULL;
if (boundColsType == BOUND_TAGS) { if (boundColsType == BOUND_TAGS) {
pSchema = getTableTagSchema(pTableMeta); pSchema = getTableTagSchema(pTableMeta);
@ -202,8 +198,7 @@ static int32_t parseBoundColumns(SInsertParseContext* pCxt, const char** pSql, E
token.z = tmpTokenBuf; token.z = tmpTokenBuf;
token.n = strdequote(token.z); token.n = strdequote(token.z);
if (boundColsType == BOUND_ALL_AND_TBNAME && if (boundColsType == BOUND_ALL_AND_TBNAME && token.n == strlen("tbname") && (strcasecmp(token.z, "tbname") == 0)) {
token.n == strlen("tbname") && (strcasecmp(token.z, "tbname") == 0)) {
pBoundInfo->pColIndex[pBoundInfo->numOfBound] = tbnameSchemaIndex; pBoundInfo->pColIndex[pBoundInfo->numOfBound] = tbnameSchemaIndex;
pUseCols[tbnameSchemaIndex] = true; pUseCols[tbnameSchemaIndex] = true;
++pBoundInfo->numOfBound; ++pBoundInfo->numOfBound;
@ -230,7 +225,7 @@ static int32_t parseBoundColumns(SInsertParseContext* pCxt, const char** pSql, E
if (TSDB_CODE_SUCCESS == code && (BOUND_TAGS != boundColsType) && !pUseCols[0]) { if (TSDB_CODE_SUCCESS == code && (BOUND_TAGS != boundColsType) && !pUseCols[0]) {
code = buildInvalidOperationMsg(&pCxt->msg, "primary timestamp column can not be null"); code = buildInvalidOperationMsg(&pCxt->msg, "primary timestamp column can not be null");
} }
if (TSDB_CODE_SUCCESS == code && (BOUND_ALL_AND_TBNAME == boundColsType) &&!pUseCols[tbnameSchemaIndex]) { if (TSDB_CODE_SUCCESS == code && (BOUND_ALL_AND_TBNAME == boundColsType) && !pUseCols[tbnameSchemaIndex]) {
code = buildInvalidOperationMsg(&pCxt->msg, "tbname column can not be null"); code = buildInvalidOperationMsg(&pCxt->msg, "tbname column can not be null");
} }
taosMemoryFree(pUseCols); taosMemoryFree(pUseCols);
@ -285,10 +280,11 @@ static int parseTime(const char** end, SToken* pToken, int16_t timePrec, int64_t
int32_t index = 0, i = 0; int32_t index = 0, i = 0;
int64_t interval = 0, tempInterval = 0; int64_t interval = 0, tempInterval = 0;
int64_t ts = 0, tempTs = 0; int64_t ts = 0, tempTs = 0;
bool firstIsTS = false, secondIsTs = false; bool firstIsTS = false, secondIsTs = false;
const char* pTokenEnd = *end; const char* pTokenEnd = *end;
if (TSDB_CODE_SUCCESS != parseTimestampOrInterval(&pTokenEnd, pToken, timePrec, &ts, &interval, pMsgBuf, &firstIsTS)) { if (TSDB_CODE_SUCCESS !=
parseTimestampOrInterval(&pTokenEnd, pToken, timePrec, &ts, &interval, pMsgBuf, &firstIsTS)) {
return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp format", pToken->z); return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp format", pToken->z);
} }
@ -330,8 +326,7 @@ static int parseTime(const char** end, SToken* pToken, int16_t timePrec, int64_t
if (pTokenEnd[i] == ' ' || pTokenEnd[i] == '\t') { if (pTokenEnd[i] == ' ' || pTokenEnd[i] == '\t') {
i++; i++;
continue; continue;
} } else if (pTokenEnd[i] == ',' || pTokenEnd[i] == ')') {
else if (pTokenEnd[i] == ',' || pTokenEnd[i] == ')') {
*end = pTokenEnd + i; *end = pTokenEnd + i;
if (!firstIsTS) { if (!firstIsTS) {
return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp format", pToken->z); return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp format", pToken->z);
@ -362,7 +357,8 @@ static int parseTime(const char** end, SToken* pToken, int16_t timePrec, int64_t
valueToken.n = len; valueToken.n = len;
} }
if (TSDB_CODE_SUCCESS != parseTimestampOrInterval(&pTokenEnd, &valueToken, timePrec, &tempTs, &tempInterval, pMsgBuf, &secondIsTs)) { if (TSDB_CODE_SUCCESS !=
parseTimestampOrInterval(&pTokenEnd, &valueToken, timePrec, &tempTs, &tempInterval, pMsgBuf, &secondIsTs)) {
return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp format", pToken->z); return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp format", pToken->z);
} }
@ -376,7 +372,7 @@ static int parseTime(const char** end, SToken* pToken, int16_t timePrec, int64_t
return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp format", pToken->z); return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp format", pToken->z);
} }
ts = tempTs; ts = tempTs;
}else { } else {
// not support operator between tow interval, such as 2h + 3s // not support operator between tow interval, such as 2h + 3s
if (!firstIsTS) { if (!firstIsTS) {
return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp format", pToken->z); return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp format", pToken->z);
@ -413,7 +409,7 @@ static int parseTime(const char** end, SToken* pToken, int16_t timePrec, int64_t
} }
// need to call geosFreeBuffer(*output) later // need to call geosFreeBuffer(*output) later
static int parseGeometry(SToken *pToken, unsigned char **output, size_t *size) { static int parseGeometry(SToken* pToken, unsigned char** output, size_t* size) {
int32_t code = TSDB_CODE_FAILED; int32_t code = TSDB_CODE_FAILED;
//[ToDo] support to parse WKB as well as WKT //[ToDo] support to parse WKB as well as WKT
@ -432,19 +428,19 @@ static int parseGeometry(SToken *pToken, unsigned char **output, size_t *size) {
return code; return code;
} }
static int32_t parseVarbinary(SToken* pToken, uint8_t **pData, uint32_t *nData, int32_t bytes){ static int32_t parseVarbinary(SToken* pToken, uint8_t** pData, uint32_t* nData, int32_t bytes) {
if(pToken->type != TK_NK_STRING){ if (pToken->type != TK_NK_STRING) {
return TSDB_CODE_PAR_INVALID_VARBINARY; return TSDB_CODE_PAR_INVALID_VARBINARY;
} }
if(isHex(pToken->z + 1, pToken->n - 2)){ if (isHex(pToken->z + 1, pToken->n - 2)) {
if(!isValidateHex(pToken->z + 1, pToken->n - 2)){ if (!isValidateHex(pToken->z + 1, pToken->n - 2)) {
return TSDB_CODE_PAR_INVALID_VARBINARY; return TSDB_CODE_PAR_INVALID_VARBINARY;
} }
void* data = NULL; void* data = NULL;
uint32_t size = 0; uint32_t size = 0;
if(taosHex2Ascii(pToken->z + 1, pToken->n - 2, &data, &size) < 0){ if (taosHex2Ascii(pToken->z + 1, pToken->n - 2, &data, &size) < 0) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
@ -454,7 +450,7 @@ static int32_t parseVarbinary(SToken* pToken, uint8_t **pData, uint32_t *nData,
} }
*pData = data; *pData = data;
*nData = size; *nData = size;
}else{ } else {
*pData = taosMemoryCalloc(1, pToken->n); *pData = taosMemoryCalloc(1, pToken->n);
int32_t len = trimString(pToken->z, pToken->n, *pData, pToken->n); int32_t len = trimString(pToken->z, pToken->n, *pData, pToken->n);
*nData = len; *nData = len;
@ -633,7 +629,7 @@ static int32_t parseTagToken(const char** end, SToken* pToken, SSchema* pSchema,
} }
case TSDB_DATA_TYPE_VARBINARY: { case TSDB_DATA_TYPE_VARBINARY: {
code = parseVarbinary(pToken, &val->pData, &val->nData, pSchema->bytes); code = parseVarbinary(pToken, &val->pData, &val->nData, pSchema->bytes);
if(code != TSDB_CODE_SUCCESS){ if (code != TSDB_CODE_SUCCESS) {
return generateSyntaxErrMsg(pMsgBuf, code, pSchema->name); return generateSyntaxErrMsg(pMsgBuf, code, pSchema->name);
} }
break; break;
@ -759,8 +755,8 @@ static int32_t buildCreateTbReq(SVnodeModifyOpStmt* pStmt, STag* pTag, SArray* p
int32_t checkAndTrimValue(SToken* pToken, char* tmpTokenBuf, SMsgBuf* pMsgBuf, int8_t type) { int32_t checkAndTrimValue(SToken* pToken, char* tmpTokenBuf, SMsgBuf* pMsgBuf, int8_t type) {
if ((pToken->type != TK_NOW && pToken->type != TK_TODAY && pToken->type != TK_NK_INTEGER && if ((pToken->type != TK_NOW && pToken->type != TK_TODAY && pToken->type != TK_NK_INTEGER &&
pToken->type != TK_NK_STRING && pToken->type != TK_NK_FLOAT && pToken->type != TK_NK_BOOL && pToken->type != TK_NK_STRING && pToken->type != TK_NK_FLOAT && pToken->type != TK_NK_BOOL &&
pToken->type != TK_NULL && pToken->type != TK_NK_HEX && pToken->type != TK_NK_OCT && pToken->type != TK_NULL && pToken->type != TK_NK_HEX && pToken->type != TK_NK_OCT && pToken->type != TK_NK_BIN &&
pToken->type != TK_NK_BIN && pToken->type != TK_NK_VARIABLE) || pToken->type != TK_NK_VARIABLE) ||
(pToken->n == 0) || (pToken->type == TK_NK_RP)) { (pToken->n == 0) || (pToken->type == TK_NK_RP)) {
return buildSyntaxErrMsg(pMsgBuf, "invalid data or symbol", pToken->z); return buildSyntaxErrMsg(pMsgBuf, "invalid data or symbol", pToken->z);
} }
@ -1114,8 +1110,8 @@ static int32_t checkAuth(SParseContext* pCxt, SName* pTbName, bool* pMissCache,
return code; return code;
} }
static int32_t getTableMeta(SInsertParseContext* pCxt, SName* pTbName, STableMeta** pTableMeta, static int32_t getTableMeta(SInsertParseContext* pCxt, SName* pTbName, STableMeta** pTableMeta, bool* pMissCache,
bool* pMissCache, bool bUsingTable) { bool bUsingTable) {
SParseContext* pComCxt = pCxt->pComCxt; SParseContext* pComCxt = pCxt->pComCxt;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
if (pComCxt->async) { if (pComCxt->async) {
@ -1220,7 +1216,7 @@ static int32_t getTargetTableSchema(SInsertParseContext* pCxt, SVnodeModifyOpStm
pCxt->missCache = true; pCxt->missCache = true;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SNode* pTagCond = NULL; SNode* pTagCond = NULL;
int32_t code = checkAuth(pCxt->pComCxt, &pStmt->targetTableName, &pCxt->missCache, &pTagCond); int32_t code = checkAuth(pCxt->pComCxt, &pStmt->targetTableName, &pCxt->missCache, &pTagCond);
if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) { if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
code = getTargetTableMetaAndVgroup(pCxt, pStmt, &pCxt->missCache); code = getTargetTableMetaAndVgroup(pCxt, pStmt, &pCxt->missCache);
@ -1365,13 +1361,11 @@ static int32_t parseBoundColumnsClause(SInsertParseContext* pCxt, SVnodeModifyOp
return buildSyntaxErrMsg(&pCxt->msg, "keyword VALUES or FILE is expected", token.z); return buildSyntaxErrMsg(&pCxt->msg, "keyword VALUES or FILE is expected", token.z);
} }
// pStmt->pSql -> field1_name, ...) // pStmt->pSql -> field1_name, ...)
return parseBoundColumns(pCxt, &pStmt->pSql, BOUND_COLUMNS, pStmt->pTableMeta, return parseBoundColumns(pCxt, &pStmt->pSql, BOUND_COLUMNS, pStmt->pTableMeta, &pTableCxt->boundColsInfo);
&pTableCxt->boundColsInfo);
} }
if (NULL != pStmt->pBoundCols) { if (NULL != pStmt->pBoundCols) {
return parseBoundColumns(pCxt, &pStmt->pBoundCols, BOUND_COLUMNS, pStmt->pTableMeta, return parseBoundColumns(pCxt, &pStmt->pBoundCols, BOUND_COLUMNS, pStmt->pTableMeta, &pTableCxt->boundColsInfo);
&pTableCxt->boundColsInfo);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -1679,34 +1673,35 @@ static void clearColValArray(SArray* pCols) {
typedef struct SStbRowsDataContext { typedef struct SStbRowsDataContext {
SName stbName; SName stbName;
STableMeta* pStbMeta; STableMeta* pStbMeta;
SNode* pTagCond; SNode* pTagCond;
SBoundColInfo boundColsInfo; SBoundColInfo boundColsInfo;
// the following fields are for each stb row // the following fields are for each stb row
SArray* aTagVals; SArray* aTagVals;
SArray* aColVals; SArray* aColVals;
SArray* aTagNames; SArray* aTagNames;
SName ctbName; SName ctbName;
STag* pTag; STag* pTag;
STableMeta* pCtbMeta; STableMeta* pCtbMeta;
SVCreateTbReq* pCreateCtbReq; SVCreateTbReq* pCreateCtbReq;
bool hasTimestampTag; bool hasTimestampTag;
bool isJsonTag; bool isJsonTag;
} SStbRowsDataContext; } SStbRowsDataContext;
typedef union SRowsDataContext{ typedef union SRowsDataContext {
STableDataCxt* pTableDataCxt; STableDataCxt* pTableDataCxt;
SStbRowsDataContext* pStbRowsCxt; SStbRowsDataContext* pStbRowsCxt;
} SRowsDataContext; } SRowsDataContext;
static int32_t parseTbnameToken(SInsertParseContext* pCxt, SStbRowsDataContext* pStbRowsCxt, SToken* pToken, bool* pFoundCtbName) { static int32_t parseTbnameToken(SInsertParseContext* pCxt, SStbRowsDataContext* pStbRowsCxt, SToken* pToken,
bool* pFoundCtbName) {
*pFoundCtbName = false; *pFoundCtbName = false;
int32_t code = checkAndTrimValue(pToken, pCxt->tmpTokenBuf, &pCxt->msg, TSDB_DATA_TYPE_BINARY); int32_t code = checkAndTrimValue(pToken, pCxt->tmpTokenBuf, &pCxt->msg, TSDB_DATA_TYPE_BINARY);
if (TK_NK_VARIABLE == pToken->type) { if (TK_NK_VARIABLE == pToken->type) {
code = buildInvalidOperationMsg(&pCxt->msg, "not expected tbname"); code = buildInvalidOperationMsg(&pCxt->msg, "not expected tbname");
} }
if (code == TSDB_CODE_SUCCESS){ if (code == TSDB_CODE_SUCCESS) {
if (isNullValue(TSDB_DATA_TYPE_BINARY, pToken)) { if (isNullValue(TSDB_DATA_TYPE_BINARY, pToken)) {
return buildInvalidOperationMsg(&pCxt->msg, "tbname can not be null value"); return buildInvalidOperationMsg(&pCxt->msg, "tbname can not be null value");
} }
@ -1733,9 +1728,8 @@ static int32_t parseTbnameToken(SInsertParseContext* pCxt, SStbRowsDataContext*
} }
static int32_t processCtbTagsAfterCtbName(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, static int32_t processCtbTagsAfterCtbName(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt,
SStbRowsDataContext* pStbRowsCxt, bool ctbFirst, SStbRowsDataContext* pStbRowsCxt, bool ctbFirst, const SToken* tagTokens,
const SToken* tagTokens, SSchema* const* tagSchemas, SSchema* const* tagSchemas, int numOfTagTokens) {
int numOfTagTokens) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
uint8_t precision = pStmt->pTableMeta->tableInfo.precision; uint8_t precision = pStmt->pTableMeta->tableInfo.precision;
@ -1749,8 +1743,8 @@ static int32_t processCtbTagsAfterCtbName(SInsertParseContext* pCxt, SVnodeModif
} }
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
code = parseTagValue(&pCxt->msg, NULL, precision, pTagSchema, pTagToken, pStbRowsCxt->aTagNames, pStbRowsCxt->aTagVals, code = parseTagValue(&pCxt->msg, NULL, precision, pTagSchema, pTagToken, pStbRowsCxt->aTagNames,
&pStbRowsCxt->pTag); pStbRowsCxt->aTagVals, &pStbRowsCxt->pTag);
} }
} }
if (code == TSDB_CODE_SUCCESS && !pStbRowsCxt->isJsonTag) { if (code == TSDB_CODE_SUCCESS && !pStbRowsCxt->isJsonTag) {
@ -1765,9 +1759,9 @@ static int32_t processCtbTagsAfterCtbName(SInsertParseContext* pCxt, SVnodeModif
} }
static int32_t doGetStbRowValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, const char** ppSql, static int32_t doGetStbRowValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, const char** ppSql,
SStbRowsDataContext* pStbRowsCxt, SToken* pToken, SStbRowsDataContext* pStbRowsCxt, SToken* pToken, const SBoundColInfo* pCols,
const SBoundColInfo* pCols, const SSchema* pSchemas, const SSchema* pSchemas, SToken* tagTokens, SSchema** tagSchemas, int* pNumOfTagTokens,
SToken* tagTokens, SSchema** tagSchemas, int* pNumOfTagTokens, bool* bFoundTbName) { bool* bFoundTbName) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SArray* pTagNames = pStbRowsCxt->aTagNames; SArray* pTagNames = pStbRowsCxt->aTagNames;
SArray* pTagVals = pStbRowsCxt->aTagVals; SArray* pTagVals = pStbRowsCxt->aTagVals;
@ -1791,10 +1785,10 @@ static int32_t doGetStbRowValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt*
} }
if (pCols->pColIndex[i] < numOfCols) { if (pCols->pColIndex[i] < numOfCols) {
const SSchema* pSchema = &pSchemas[pCols->pColIndex[i]]; const SSchema* pSchema = &pSchemas[pCols->pColIndex[i]];
SColVal* pVal = taosArrayGet(pStbRowsCxt->aColVals, pCols->pColIndex[i]); SColVal* pVal = taosArrayGet(pStbRowsCxt->aColVals, pCols->pColIndex[i]);
code = parseValueToken(pCxt, ppSql, pToken, (SSchema*)pSchema, precision, pVal); code = parseValueToken(pCxt, ppSql, pToken, (SSchema*)pSchema, precision, pVal);
if (TK_NK_VARIABLE == pToken->type) { if (TK_NK_VARIABLE == pToken->type) {
code = buildInvalidOperationMsg(&pCxt->msg, "not expected row value"); code = buildInvalidOperationMsg(&pCxt->msg, "not expected row value");
} }
} else if (pCols->pColIndex[i] < tbnameIdx) { } else if (pCols->pColIndex[i] < tbnameIdx) {
const SSchema* pTagSchema = &pSchemas[pCols->pColIndex[i]]; const SSchema* pTagSchema = &pSchemas[pCols->pColIndex[i]];
@ -1808,11 +1802,11 @@ static int32_t doGetStbRowValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt*
code = buildInvalidOperationMsg(&pCxt->msg, "not expected row value"); code = buildInvalidOperationMsg(&pCxt->msg, "not expected row value");
} }
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
code = parseTagValue(&pCxt->msg, ppSql, precision, (SSchema*)pTagSchema, pToken, pTagNames, pTagVals, &pStbRowsCxt->pTag); code = parseTagValue(&pCxt->msg, ppSql, precision, (SSchema*)pTagSchema, pToken, pTagNames, pTagVals,
&pStbRowsCxt->pTag);
} }
} }
} } else if (pCols->pColIndex[i] == tbnameIdx) {
else if (pCols->pColIndex[i] == tbnameIdx) {
code = parseTbnameToken(pCxt, pStbRowsCxt, pToken, bFoundTbName); code = parseTbnameToken(pCxt, pStbRowsCxt, pToken, bFoundTbName);
} }
@ -1826,22 +1820,21 @@ static int32_t doGetStbRowValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt*
return code; return code;
} }
static int32_t getStbRowValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, const char** ppSql, static int32_t getStbRowValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, const char** ppSql,
SStbRowsDataContext* pStbRowsCxt, bool* pGotRow, SStbRowsDataContext* pStbRowsCxt, bool* pGotRow, SToken* pToken, bool* pCtbFirst) {
SToken* pToken, bool *pCtbFirst) {
SBoundColInfo* pCols = &pStbRowsCxt->boundColsInfo; SBoundColInfo* pCols = &pStbRowsCxt->boundColsInfo;
SSchema* pSchemas = getTableColumnSchema(pStbRowsCxt->pStbMeta); SSchema* pSchemas = getTableColumnSchema(pStbRowsCxt->pStbMeta);
bool bFoundTbName = false; bool bFoundTbName = false;
const char* pOrigSql = *ppSql; const char* pOrigSql = *ppSql;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SToken tagTokens[TSDB_MAX_TAGS] = {0}; SToken tagTokens[TSDB_MAX_TAGS] = {0};
SSchema* tagSchemas[TSDB_MAX_TAGS] = {0}; SSchema* tagSchemas[TSDB_MAX_TAGS] = {0};
int numOfTagTokens = 0; int numOfTagTokens = 0;
code = doGetStbRowValues(pCxt, pStmt, ppSql, pStbRowsCxt, pToken, pCols, pSchemas, tagTokens, code = doGetStbRowValues(pCxt, pStmt, ppSql, pStbRowsCxt, pToken, pCols, pSchemas, tagTokens, tagSchemas,
tagSchemas, &numOfTagTokens, &bFoundTbName); &numOfTagTokens, &bFoundTbName);
if (code == TSDB_CODE_SUCCESS && !bFoundTbName) { if (code == TSDB_CODE_SUCCESS && !bFoundTbName) {
code = buildSyntaxErrMsg(&pCxt->msg, "tbname value expected", pOrigSql); code = buildSyntaxErrMsg(&pCxt->msg, "tbname value expected", pOrigSql);
@ -1870,7 +1863,8 @@ static int32_t getStbRowValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pS
return code; return code;
} }
static int32_t processCtbAutoCreationAndCtbMeta(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, SStbRowsDataContext* pStbRowsCxt) { static int32_t processCtbAutoCreationAndCtbMeta(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt,
SStbRowsDataContext* pStbRowsCxt) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
pStbRowsCxt->pCreateCtbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq)); pStbRowsCxt->pCreateCtbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
@ -1878,16 +1872,16 @@ static int32_t processCtbAutoCreationAndCtbMeta(SInsertParseContext* pCxt, SVnod
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
} }
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
insBuildCreateTbReq(pStbRowsCxt->pCreateCtbReq, pStbRowsCxt->ctbName.tname, pStbRowsCxt->pTag, pStbRowsCxt->pStbMeta->uid, insBuildCreateTbReq(pStbRowsCxt->pCreateCtbReq, pStbRowsCxt->ctbName.tname, pStbRowsCxt->pTag,
pStbRowsCxt->stbName.tname, pStbRowsCxt->aTagNames, getNumOfTags(pStbRowsCxt->pStbMeta), pStbRowsCxt->pStbMeta->uid, pStbRowsCxt->stbName.tname, pStbRowsCxt->aTagNames,
TSDB_DEFAULT_TABLE_TTL); getNumOfTags(pStbRowsCxt->pStbMeta), TSDB_DEFAULT_TABLE_TTL);
pStbRowsCxt->pTag = NULL; pStbRowsCxt->pTag = NULL;
} }
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
char ctbFName[TSDB_TABLE_FNAME_LEN]; char ctbFName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(&pStbRowsCxt->ctbName, ctbFName); tNameExtractFullName(&pStbRowsCxt->ctbName, ctbFName);
SVgroupInfo vg; SVgroupInfo vg;
SRequestConnInfo conn = {.pTrans = pCxt->pComCxt->pTransporter, SRequestConnInfo conn = {.pTrans = pCxt->pComCxt->pTransporter,
.requestId = pCxt->pComCxt->requestId, .requestId = pCxt->pComCxt->requestId,
.requestObjRefId = pCxt->pComCxt->requestRid, .requestObjRefId = pCxt->pComCxt->requestRid,
@ -1907,7 +1901,6 @@ static int32_t processCtbAutoCreationAndCtbMeta(SInsertParseContext* pCxt, SVnod
return code; return code;
} }
static void clearStbRowsDataContext(SStbRowsDataContext* pStbRowsCxt) { static void clearStbRowsDataContext(SStbRowsDataContext* pStbRowsCxt) {
if (pStbRowsCxt == NULL) return; if (pStbRowsCxt == NULL) return;
@ -1938,7 +1931,7 @@ static int32_t parseOneStbRow(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pSt
} }
if (code == TSDB_CODE_SUCCESS && bFirstTable) { if (code == TSDB_CODE_SUCCESS && bFirstTable) {
code = processCtbAutoCreationAndCtbMeta(pCxt, pStmt, pStbRowsCxt); code = processCtbAutoCreationAndCtbMeta(pCxt, pStmt, pStbRowsCxt);
} }
code = insGetTableDataCxt(pStmt->pTableBlockHashObj, &pStbRowsCxt->pCtbMeta->uid, sizeof(pStbRowsCxt->pCtbMeta->uid), code = insGetTableDataCxt(pStmt->pTableBlockHashObj, &pStbRowsCxt->pCtbMeta->uid, sizeof(pStbRowsCxt->pCtbMeta->uid),
@ -1948,7 +1941,9 @@ static int32_t parseOneStbRow(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pSt
SRow** pRow = taosArrayReserve((*ppTableDataCxt)->pData->aRowP, 1); SRow** pRow = taosArrayReserve((*ppTableDataCxt)->pData->aRowP, 1);
code = tRowBuild(pStbRowsCxt->aColVals, (*ppTableDataCxt)->pSchema, pRow); code = tRowBuild(pStbRowsCxt->aColVals, (*ppTableDataCxt)->pSchema, pRow);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
insCheckTableDataOrder(*ppTableDataCxt, TD_ROW_KEY(*pRow)); SRowKey key;
tRowGetKey(*pRow, &key);
insCheckTableDataOrder(*ppTableDataCxt, &key);
} }
} }
@ -1961,7 +1956,8 @@ static int32_t parseOneStbRow(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pSt
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int parseOneRow(SInsertParseContext* pCxt, const char** pSql, STableDataCxt* pTableCxt, bool* pGotRow, SToken* pToken) { static int parseOneRow(SInsertParseContext* pCxt, const char** pSql, STableDataCxt* pTableCxt, bool* pGotRow,
SToken* pToken) {
SBoundColInfo* pCols = &pTableCxt->boundColsInfo; SBoundColInfo* pCols = &pTableCxt->boundColsInfo;
bool isParseBindParam = false; bool isParseBindParam = false;
SSchema* pSchemas = getTableColumnSchema(pTableCxt->pMeta); SSchema* pSchemas = getTableColumnSchema(pTableCxt->pMeta);
@ -2014,7 +2010,9 @@ static int parseOneRow(SInsertParseContext* pCxt, const char** pSql, STableDataC
SRow** pRow = taosArrayReserve(pTableCxt->pData->aRowP, 1); SRow** pRow = taosArrayReserve(pTableCxt->pData->aRowP, 1);
code = tRowBuild(pTableCxt->pValues, pTableCxt->pSchema, pRow); code = tRowBuild(pTableCxt->pValues, pTableCxt->pSchema, pRow);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
insCheckTableDataOrder(pTableCxt, TD_ROW_KEY(*pRow)); SRowKey key;
tRowGetKey(*pRow, &key);
insCheckTableDataOrder(pTableCxt, &key);
} }
} }
@ -2112,10 +2110,11 @@ static int32_t parseCsvFile(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt
code = parseOneRow(pCxt, (const char**)&pRow, rowsDataCxt.pTableDataCxt, &gotRow, &token); code = parseOneRow(pCxt, (const char**)&pRow, rowsDataCxt.pTableDataCxt, &gotRow, &token);
} else { } else {
STableDataCxt* pTableDataCxt = NULL; STableDataCxt* pTableDataCxt = NULL;
code = parseOneStbRow(pCxt, pStmt, (const char**)&pRow, rowsDataCxt.pStbRowsCxt, &gotRow, &token, &pTableDataCxt); code =
parseOneStbRow(pCxt, pStmt, (const char**)&pRow, rowsDataCxt.pStbRowsCxt, &gotRow, &token, &pTableDataCxt);
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
SStbRowsDataContext* pStbRowsCxt = rowsDataCxt.pStbRowsCxt; SStbRowsDataContext* pStbRowsCxt = rowsDataCxt.pStbRowsCxt;
void* pData = pTableDataCxt; void* pData = pTableDataCxt;
taosHashPut(pStmt->pTableCxtHashObj, &pStbRowsCxt->pCtbMeta->uid, sizeof(pStbRowsCxt->pCtbMeta->uid), &pData, taosHashPut(pStmt->pTableCxtHashObj, &pStbRowsCxt->pCtbMeta->uid, sizeof(pStbRowsCxt->pCtbMeta->uid), &pData,
POINTER_BYTES); POINTER_BYTES);
} }
@ -2149,11 +2148,11 @@ static int32_t parseCsvFile(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt
return code; return code;
} }
static int32_t parseDataFromFileImpl(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, SRowsDataContext rowsDataCxt) { static int32_t parseDataFromFileImpl(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt,
SRowsDataContext rowsDataCxt) {
// init only for file // init only for file
if (NULL == pStmt->pTableCxtHashObj) { if (NULL == pStmt->pTableCxtHashObj) {
pStmt->pTableCxtHashObj = pStmt->pTableCxtHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
} }
int32_t numOfRows = 0; int32_t numOfRows = 0;
int32_t code = parseCsvFile(pCxt, pStmt, rowsDataCxt, &numOfRows); int32_t code = parseCsvFile(pCxt, pStmt, rowsDataCxt, &numOfRows);
@ -2285,7 +2284,8 @@ static int32_t constructStbRowsDataContext(SVnodeModifyOpStmt* pStmt, SStbRowsDa
static int32_t parseInsertStbClauseBottom(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) { static int32_t parseInsertStbClauseBottom(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
if (!pStmt->pBoundCols) { if (!pStmt->pBoundCols) {
return buildSyntaxErrMsg(&pCxt->msg, "(...tbname, ts...) bounded cols is expected for supertable insertion", pStmt->pSql); return buildSyntaxErrMsg(&pCxt->msg, "(...tbname, ts...) bounded cols is expected for supertable insertion",
pStmt->pSql);
} }
SStbRowsDataContext* pStbRowsCxt = NULL; SStbRowsDataContext* pStbRowsCxt = NULL;
@ -2297,7 +2297,7 @@ static int32_t parseInsertStbClauseBottom(SInsertParseContext* pCxt, SVnodeModif
pStbRowsCxt->hasTimestampTag = false; pStbRowsCxt->hasTimestampTag = false;
for (int32_t i = 0; i < pStbRowsCxt->boundColsInfo.numOfBound; ++i) { for (int32_t i = 0; i < pStbRowsCxt->boundColsInfo.numOfBound; ++i) {
int16_t schemaIndex = pStbRowsCxt->boundColsInfo.pColIndex[i]; int16_t schemaIndex = pStbRowsCxt->boundColsInfo.pColIndex[i];
if (schemaIndex != getTbnameSchemaIndex(pStmt->pTableMeta) && schemaIndex >= getNumOfColumns(pStmt->pTableMeta) ) { if (schemaIndex != getTbnameSchemaIndex(pStmt->pTableMeta) && schemaIndex >= getNumOfColumns(pStmt->pTableMeta)) {
if (pStmt->pTableMeta->schema[schemaIndex].type == TSDB_DATA_TYPE_TIMESTAMP) { if (pStmt->pTableMeta->schema[schemaIndex].type == TSDB_DATA_TYPE_TIMESTAMP) {
pStbRowsCxt->hasTimestampTag = true; pStbRowsCxt->hasTimestampTag = true;
} }
@ -2326,8 +2326,8 @@ static int32_t parseInsertTableClauseBottom(SInsertParseContext* pCxt, SVnodeMod
return buildInvalidOperationMsg(&pCxt->msg, "insert into super table syntax is not supported for stmt"); return buildInvalidOperationMsg(&pCxt->msg, "insert into super table syntax is not supported for stmt");
} }
if (!pStmt->stbSyntax) { if (!pStmt->stbSyntax) {
STableDataCxt* pTableCxt = NULL; STableDataCxt* pTableCxt = NULL;
int32_t code = parseSchemaClauseBottom(pCxt, pStmt, &pTableCxt); int32_t code = parseSchemaClauseBottom(pCxt, pStmt, &pTableCxt);
SRowsDataContext rowsDataCxt; SRowsDataContext rowsDataCxt;
rowsDataCxt.pTableDataCxt = pTableCxt; rowsDataCxt.pTableDataCxt = pTableCxt;
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
@ -2625,7 +2625,7 @@ static int32_t checkSubtablePrivilegeForTable(const SArray* pTables, SVnodeModif
} }
static int32_t processTableSchemaFromMetaData(SInsertParseContext* pCxt, const SMetaData* pMetaData, static int32_t processTableSchemaFromMetaData(SInsertParseContext* pCxt, const SMetaData* pMetaData,
SVnodeModifyOpStmt* pStmt, bool isStb) { SVnodeModifyOpStmt* pStmt, bool isStb) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
if (!isStb && TSDB_SUPER_TABLE == pStmt->pTableMeta->tableType) { if (!isStb && TSDB_SUPER_TABLE == pStmt->pTableMeta->tableType) {
code = buildInvalidOperationMsg(&pCxt->msg, "insert data into super table is not supported"); code = buildInvalidOperationMsg(&pCxt->msg, "insert data into super table is not supported");
@ -2767,11 +2767,11 @@ static int32_t parseInsertSqlFromStart(SInsertParseContext* pCxt, SVnodeModifyOp
} }
static int32_t parseInsertSqlFromCsv(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) { static int32_t parseInsertSqlFromCsv(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SRowsDataContext rowsDataCxt; SRowsDataContext rowsDataCxt;
if (!pStmt->stbSyntax) { if (!pStmt->stbSyntax) {
STableDataCxt* pTableCxt = NULL; STableDataCxt* pTableCxt = NULL;
code = getTableDataCxt(pCxt, pStmt, &pTableCxt); code = getTableDataCxt(pCxt, pStmt, &pTableCxt);
rowsDataCxt.pTableDataCxt = pTableCxt; rowsDataCxt.pTableDataCxt = pTableCxt;
} else { } else {

View File

@ -170,9 +170,7 @@ static void initColValues(STableMeta* pTableMeta, SArray* pValues) {
} }
} }
void insInitColValues(STableMeta* pTableMeta, SArray* aColValues) { void insInitColValues(STableMeta* pTableMeta, SArray* aColValues) { initColValues(pTableMeta, aColValues); }
initColValues(pTableMeta, aColValues);
}
int32_t insInitBoundColsInfo(int32_t numOfBound, SBoundColInfo* pInfo) { int32_t insInitBoundColsInfo(int32_t numOfBound, SBoundColInfo* pInfo) {
pInfo->numOfCols = numOfBound; pInfo->numOfCols = numOfBound;
@ -187,21 +185,22 @@ int32_t insInitBoundColsInfo(int32_t numOfBound, SBoundColInfo* pInfo) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void insCheckTableDataOrder(STableDataCxt* pTableCxt, TSKEY tsKey) { void insCheckTableDataOrder(STableDataCxt* pTableCxt, SRowKey* rowKey) {
// once the data block is disordered, we do NOT keep last timestamp any more // once the data block is disordered, we do NOT keep last timestamp any more
if (!pTableCxt->ordered) { if (!pTableCxt->ordered) {
return; return;
} }
if (tsKey < pTableCxt->lastTs) { if (tRowKeyCompare(rowKey, &pTableCxt->lastKey) < 0) {
pTableCxt->ordered = false; pTableCxt->ordered = false;
} }
if (tsKey == pTableCxt->lastTs) { if (tRowKeyCompare(rowKey, &pTableCxt->lastKey) == 0) {
pTableCxt->duplicateTs = true; pTableCxt->duplicateTs = true;
} }
pTableCxt->lastTs = tsKey; // TODO: for variable length data type, we need to copy it out
pTableCxt->lastKey = *rowKey;
return; return;
} }
@ -217,7 +216,7 @@ static int32_t createTableDataCxt(STableMeta* pTableMeta, SVCreateTbReq** pCreat
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
pTableCxt->lastTs = 0; pTableCxt->lastKey = (SRowKey){0};
pTableCxt->ordered = true; pTableCxt->ordered = true;
pTableCxt->duplicateTs = false; pTableCxt->duplicateTs = false;
@ -254,7 +253,7 @@ static int32_t createTableDataCxt(STableMeta* pTableMeta, SVCreateTbReq** pCreat
pTableCxt->pData->uid = pTableMeta->uid; pTableCxt->pData->uid = pTableMeta->uid;
pTableCxt->pData->sver = pTableMeta->sversion; pTableCxt->pData->sver = pTableMeta->sversion;
pTableCxt->pData->pCreateTbReq = pCreateTbReq != NULL ? *pCreateTbReq : NULL; pTableCxt->pData->pCreateTbReq = pCreateTbReq != NULL ? *pCreateTbReq : NULL;
if(pCreateTbReq != NULL) *pCreateTbReq = NULL; if (pCreateTbReq != NULL) *pCreateTbReq = NULL;
if (pTableCxt->pData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) { if (pTableCxt->pData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
pTableCxt->pData->aCol = taosArrayInit(128, sizeof(SColData)); pTableCxt->pData->aCol = taosArrayInit(128, sizeof(SColData));
if (NULL == pTableCxt->pData->aCol) { if (NULL == pTableCxt->pData->aCol) {
@ -276,7 +275,7 @@ static int32_t createTableDataCxt(STableMeta* pTableMeta, SVCreateTbReq** pCreat
} }
static int32_t rebuildTableData(SSubmitTbData* pSrc, SSubmitTbData** pDst) { static int32_t rebuildTableData(SSubmitTbData* pSrc, SSubmitTbData** pDst) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SSubmitTbData* pTmp = taosMemoryCalloc(1, sizeof(SSubmitTbData)); SSubmitTbData* pTmp = taosMemoryCalloc(1, sizeof(SSubmitTbData));
if (NULL == pTmp) { if (NULL == pTmp) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
@ -317,7 +316,6 @@ static int32_t rebuildTableData(SSubmitTbData* pSrc, SSubmitTbData** pDst) {
return code; return code;
} }
static void resetColValues(SArray* pValues) { static void resetColValues(SArray* pValues) {
int32_t num = taosArrayGetSize(pValues); int32_t num = taosArrayGetSize(pValues);
for (int32_t i = 0; i < num; ++i) { for (int32_t i = 0; i < num; ++i) {
@ -463,7 +461,7 @@ static int32_t createVgroupDataCxt(STableDataCxt* pTableCxt, SHashObj* pVgroupHa
int32_t code = taosHashPut(pVgroupHash, &pVgCxt->vgId, sizeof(pVgCxt->vgId), &pVgCxt, POINTER_BYTES); int32_t code = taosHashPut(pVgroupHash, &pVgCxt->vgId, sizeof(pVgCxt->vgId), &pVgCxt, POINTER_BYTES);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
taosArrayPush(pVgroupList, &pVgCxt); taosArrayPush(pVgroupList, &pVgCxt);
// uDebug("td23101 2vgId:%d, uid:%" PRIu64, pVgCxt->vgId, pTableCxt->pMeta->uid); // uDebug("td23101 2vgId:%d, uid:%" PRIu64, pVgCxt->vgId, pTableCxt->pMeta->uid);
*pOutput = pVgCxt; *pOutput = pVgCxt;
} else { } else {
insDestroyVgroupDataCxt(pVgCxt); insDestroyVgroupDataCxt(pVgCxt);
@ -613,7 +611,7 @@ int32_t insBuildVgDataBlocks(SHashObj* pVgroupsHashObj, SArray* pVgDataCxtList,
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
dst->numOfTables = taosArrayGetSize(src->pData->aSubmitTbData); dst->numOfTables = taosArrayGetSize(src->pData->aSubmitTbData);
code = taosHashGetDup(pVgroupsHashObj, (const char*)&src->vgId, sizeof(src->vgId), &dst->vg); code = taosHashGetDup(pVgroupsHashObj, (const char*)&src->vgId, sizeof(src->vgId), &dst->vg);
// uError("td23101 3vgId:%d, numEps:%d", src->vgId, dst->vg.epSet.numOfEps); // uError("td23101 3vgId:%d, numEps:%d", src->vgId, dst->vg.epSet.numOfEps);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = buildSubmitReq(src->vgId, src->pData, &dst->pData, &dst->size); code = buildSubmitReq(src->vgId, src->pData, &dst->pData, &dst->size);
@ -634,7 +632,7 @@ int32_t insBuildVgDataBlocks(SHashObj* pVgroupsHashObj, SArray* pVgDataCxtList,
static bool findFileds(SSchema* pSchema, TAOS_FIELD* fields, int numFields) { static bool findFileds(SSchema* pSchema, TAOS_FIELD* fields, int numFields) {
for (int i = 0; i < numFields; i++) { for (int i = 0; i < numFields; i++) {
if(strcmp(pSchema->name, fields[i].name) == 0){ if (strcmp(pSchema->name, fields[i].name) == 0) {
return true; return true;
} }
} }
@ -644,7 +642,8 @@ static bool findFileds(SSchema* pSchema, TAOS_FIELD* fields, int numFields) {
int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreateTbReq** pCreateTb, TAOS_FIELD* tFields, int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreateTbReq** pCreateTb, TAOS_FIELD* tFields,
int numFields, bool needChangeLength) { int numFields, bool needChangeLength) {
void* tmp = taosHashGet(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid, sizeof(pTableMeta->uid)); void* tmp =
taosHashGet(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid, sizeof(pTableMeta->uid));
STableDataCxt* pTableCxt = NULL; STableDataCxt* pTableCxt = NULL;
int ret = insGetTableDataCxt(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid, int ret = insGetTableDataCxt(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid,
sizeof(pTableMeta->uid), pTableMeta, pCreateTb, &pTableCxt, true, false); sizeof(pTableMeta->uid), pTableMeta, pCreateTb, &pTableCxt, true, false);
@ -654,7 +653,7 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate
} }
pTableCxt->pData->flags |= TD_REQ_FROM_TAOX; pTableCxt->pData->flags |= TD_REQ_FROM_TAOX;
if(tmp == NULL){ if (tmp == NULL) {
ret = initTableColSubmitData(pTableCxt); ret = initTableColSubmitData(pTableCxt);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
uError("initTableColSubmitData error"); uError("initTableColSubmitData error");
@ -663,8 +662,8 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate
} }
char* p = (char*)data; char* p = (char*)data;
// | version | total length | total rows | blankFill | total columns | flag seg| block group id | column schema | each column // | version | total length | total rows | blankFill | total columns | flag seg| block group id | column schema | each
// length | // column length |
int32_t version = *(int32_t*)data; int32_t version = *(int32_t*)data;
p += sizeof(int32_t); p += sizeof(int32_t);
p += sizeof(int32_t); p += sizeof(int32_t);
@ -699,8 +698,8 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate
ret = TSDB_CODE_INVALID_PARA; ret = TSDB_CODE_INVALID_PARA;
goto end; goto end;
} }
if(tFields == NULL){ if (tFields == NULL) {
for (int j = 0; j < boundInfo->numOfBound; j++){ for (int j = 0; j < boundInfo->numOfBound; j++) {
SSchema* pColSchema = &pSchema[j]; SSchema* pColSchema = &pSchema[j];
SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, j); SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, j);
if (*fields != pColSchema->type && *(int32_t*)(fields + sizeof(int8_t)) != pColSchema->bytes) { if (*fields != pColSchema->type && *(int32_t*)(fields + sizeof(int8_t)) != pColSchema->bytes) {
@ -717,7 +716,7 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate
} }
char* pData = pStart; char* pData = pStart;
ret = tColDataAddValueByDataBlock(pCol, pColSchema->type, pColSchema->bytes, numOfRows, offset, pData); ret = tColDataAddValueByDataBlock(pCol, pColSchema->type, pColSchema->bytes, numOfRows, offset, pData);
if(ret != 0){ if (ret != 0) {
goto end; goto end;
} }
fields += sizeof(int8_t) + sizeof(int32_t); fields += sizeof(int8_t) + sizeof(int32_t);
@ -727,11 +726,11 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate
pStart += colLength[j]; pStart += colLength[j];
} }
} }
}else{ } else {
for (int i = 0; i < numFields; i++) { for (int i = 0; i < numFields; i++) {
for (int j = 0; j < boundInfo->numOfBound; j++){ for (int j = 0; j < boundInfo->numOfBound; j++) {
SSchema* pColSchema = &pSchema[j]; SSchema* pColSchema = &pSchema[j];
if(strcmp(pColSchema->name, tFields[i].name) == 0){ if (strcmp(pColSchema->name, tFields[i].name) == 0) {
if (*fields != pColSchema->type && *(int32_t*)(fields + sizeof(int8_t)) != pColSchema->bytes) { if (*fields != pColSchema->type && *(int32_t*)(fields + sizeof(int8_t)) != pColSchema->bytes) {
uError("type or bytes not equal"); uError("type or bytes not equal");
ret = TSDB_CODE_INVALID_PARA; ret = TSDB_CODE_INVALID_PARA;
@ -748,7 +747,7 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate
SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, j); SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, j);
ret = tColDataAddValueByDataBlock(pCol, pColSchema->type, pColSchema->bytes, numOfRows, offset, pData); ret = tColDataAddValueByDataBlock(pCol, pColSchema->type, pColSchema->bytes, numOfRows, offset, pData);
if(ret != 0){ if (ret != 0) {
goto end; goto end;
} }
fields += sizeof(int8_t) + sizeof(int32_t); fields += sizeof(int8_t) + sizeof(int32_t);
@ -761,17 +760,16 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate
break; break;
} }
} }
} }
for (int c = 0; c < boundInfo->numOfBound; ++c) { for (int c = 0; c < boundInfo->numOfBound; ++c) {
if( boundInfo->pColIndex[c] != -1){ if (boundInfo->pColIndex[c] != -1) {
SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, c); SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, c);
ret = tColDataAddValueByDataBlock(pCol, 0, 0, numOfRows, NULL, NULL); ret = tColDataAddValueByDataBlock(pCol, 0, 0, numOfRows, NULL, NULL);
if(ret != 0){ if (ret != 0) {
goto end; goto end;
} }
}else{ } else {
boundInfo->pColIndex[c] = c; // restore for next block boundInfo->pColIndex[c] = c; // restore for next block
} }
} }