TD-12506 insert bug fix
This commit is contained in:
parent
d3500665b6
commit
71e2b6b3f2
|
@ -121,6 +121,7 @@ typedef struct SRequestObj {
|
||||||
char *msgBuf;
|
char *msgBuf;
|
||||||
void *pInfo; // sql parse info, generated by parser module
|
void *pInfo; // sql parse info, generated by parser module
|
||||||
int32_t code;
|
int32_t code;
|
||||||
|
uint64_t affectedRows;
|
||||||
SQueryExecMetric metric;
|
SQueryExecMetric metric;
|
||||||
SRequestSendRecvBody body;
|
SRequestSendRecvBody body;
|
||||||
} SRequestObj;
|
} SRequestObj;
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
#include "clientInt.h"
|
#include "clientInt.h"
|
||||||
#include "clientLog.h"
|
#include "clientLog.h"
|
||||||
#include "query.h"
|
#include "query.h"
|
||||||
|
#include "scheduler.h"
|
||||||
#include "tmsg.h"
|
#include "tmsg.h"
|
||||||
#include "tcache.h"
|
#include "tcache.h"
|
||||||
#include "tconfig.h"
|
#include "tconfig.h"
|
||||||
|
@ -230,6 +231,8 @@ void taos_init_imp(void) {
|
||||||
SCatalogCfg cfg = {.maxDBCacheNum = 100, .maxTblCacheNum = 100};
|
SCatalogCfg cfg = {.maxDBCacheNum = 100, .maxTblCacheNum = 100};
|
||||||
catalogInit(&cfg);
|
catalogInit(&cfg);
|
||||||
|
|
||||||
|
SSchedulerCfg scfg = {.maxJobNum = 100};
|
||||||
|
schedulerInit(&scfg);
|
||||||
tscDebug("starting to initialize TAOS driver, local ep: %s", tsLocalEp);
|
tscDebug("starting to initialize TAOS driver, local ep: %s", tsLocalEp);
|
||||||
|
|
||||||
taosSetCoreDump(true);
|
taosSetCoreDump(true);
|
||||||
|
|
|
@ -196,7 +196,15 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQueryNode* pQuery) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t getPlan(SRequestObj* pRequest, SQueryNode* pQuery, SQueryDag** pDag) {
|
||||||
|
pRequest->type = pQuery->type;
|
||||||
|
return qCreateQueryDag(pQuery, pDag);
|
||||||
|
}
|
||||||
|
|
||||||
int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, void** pJob) {
|
int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, void** pJob) {
|
||||||
|
if (TSDB_SQL_INSERT == pRequest->type) {
|
||||||
|
return scheduleExecJob(pRequest->pTscObj->pTransporter, NULL/*todo appInfo.xxx*/, pDag, pJob, &pRequest->affectedRows);
|
||||||
|
}
|
||||||
return scheduleAsyncExecJob(pRequest->pTscObj->pTransporter, NULL/*todo appInfo.xxx*/, pDag, pJob);
|
return scheduleAsyncExecJob(pRequest->pTscObj->pTransporter, NULL/*todo appInfo.xxx*/, pDag, pJob);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -283,7 +291,7 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) {
|
||||||
if (qIsDdlQuery(pQuery)) {
|
if (qIsDdlQuery(pQuery)) {
|
||||||
CHECK_CODE_GOTO(execDdlQuery(pRequest, pQuery), _return);
|
CHECK_CODE_GOTO(execDdlQuery(pRequest, pQuery), _return);
|
||||||
} else {
|
} else {
|
||||||
CHECK_CODE_GOTO(qCreateQueryDag(pQuery, &pDag), _return);
|
CHECK_CODE_GOTO(getPlan(pRequest, pQuery, &pDag), _return);
|
||||||
CHECK_CODE_GOTO(scheduleQuery(pRequest, pDag, &pJob), _return);
|
CHECK_CODE_GOTO(scheduleQuery(pRequest, pDag, &pJob), _return);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -64,64 +64,6 @@ typedef struct SInsertParseContext {
|
||||||
SInsertStmtInfo* pOutput;
|
SInsertStmtInfo* pOutput;
|
||||||
} SInsertParseContext;
|
} SInsertParseContext;
|
||||||
|
|
||||||
static FORCE_INLINE int32_t toDouble(SToken *pToken, double *value, char **endPtr) {
|
|
||||||
errno = 0;
|
|
||||||
*value = strtold(pToken->z, endPtr);
|
|
||||||
|
|
||||||
// not a valid integer number, return error
|
|
||||||
if ((*endPtr - pToken->z) != pToken->n) {
|
|
||||||
return TK_ILLEGAL;
|
|
||||||
}
|
|
||||||
|
|
||||||
return pToken->type;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t toInt64(const char* z, int16_t type, int32_t n, int64_t* value, bool issigned) {
|
|
||||||
errno = 0;
|
|
||||||
int32_t ret = 0;
|
|
||||||
|
|
||||||
char* endPtr = NULL;
|
|
||||||
if (type == TK_FLOAT) {
|
|
||||||
double v = strtod(z, &endPtr);
|
|
||||||
if ((errno == ERANGE && v == HUGE_VALF) || isinf(v) || isnan(v)) {
|
|
||||||
ret = -1;
|
|
||||||
} else if ((issigned && (v < INT64_MIN || v > INT64_MAX)) || ((!issigned) && (v < 0 || v > UINT64_MAX))) {
|
|
||||||
ret = -1;
|
|
||||||
} else {
|
|
||||||
*value = (int64_t) round(v);
|
|
||||||
}
|
|
||||||
|
|
||||||
errno = 0;
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t radix = 10;
|
|
||||||
if (type == TK_HEX) {
|
|
||||||
radix = 16;
|
|
||||||
} else if (type == TK_BIN) {
|
|
||||||
radix = 2;
|
|
||||||
}
|
|
||||||
|
|
||||||
// the string may be overflow according to errno
|
|
||||||
if (!issigned) {
|
|
||||||
const char *p = z;
|
|
||||||
while(*p != 0 && *p == ' ') p++;
|
|
||||||
if (*p != 0 && *p == '-') { return -1;}
|
|
||||||
|
|
||||||
*value = strtoull(z, &endPtr, radix);
|
|
||||||
} else {
|
|
||||||
*value = strtoll(z, &endPtr, radix);
|
|
||||||
}
|
|
||||||
|
|
||||||
// not a valid integer number, return error
|
|
||||||
if (endPtr - z != n || errno == ERANGE) {
|
|
||||||
ret = -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
errno = 0;
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t skipInsertInto(SInsertParseContext* pCxt) {
|
static int32_t skipInsertInto(SInsertParseContext* pCxt) {
|
||||||
SToken sToken;
|
SToken sToken;
|
||||||
NEXT_TOKEN(pCxt->pSql, sToken);
|
NEXT_TOKEN(pCxt->pSql, sToken);
|
||||||
|
@ -159,10 +101,8 @@ static int32_t getTableMeta(SInsertParseContext* pCxt, SToken* pTname) {
|
||||||
|
|
||||||
char tableName[TSDB_TABLE_FNAME_LEN] = {0};
|
char tableName[TSDB_TABLE_FNAME_LEN] = {0};
|
||||||
tNameExtractFullName(&name, tableName);
|
tNameExtractFullName(&name, tableName);
|
||||||
|
|
||||||
SParseBasicCtx* pBasicCtx = &pCxt->pComCxt->ctx;
|
SParseBasicCtx* pBasicCtx = &pCxt->pComCxt->ctx;
|
||||||
CHECK_CODE(catalogGetTableMeta(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, &name, &pCxt->pTableMeta));
|
CHECK_CODE(catalogGetTableMeta(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, &name, &pCxt->pTableMeta));
|
||||||
|
|
||||||
SVgroupInfo vg;
|
SVgroupInfo vg;
|
||||||
CHECK_CODE(catalogGetTableHashVgroup(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, &name, &vg));
|
CHECK_CODE(catalogGetTableHashVgroup(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, &name, &vg));
|
||||||
CHECK_CODE(taosHashPut(pCxt->pVgroupsHashObj, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg)));
|
CHECK_CODE(taosHashPut(pCxt->pVgroupsHashObj, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg)));
|
||||||
|
@ -349,207 +289,6 @@ static FORCE_INLINE int32_t MemRowAppend(const void *value, int32_t len, void *p
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
//static FORCE_INLINE int32_t checkAndTrimValue(SToken* pToken, uint32_t type, char* tmpTokenBuf, SMsgBuf* pMsgBuf) {
|
|
||||||
// if ((type != TK_NOW && type != TK_INTEGER && type != TK_STRING && type != TK_FLOAT && type != TK_BOOL &&
|
|
||||||
// type != TK_NULL && type != TK_HEX && type != TK_OCT && type != TK_BIN) ||
|
|
||||||
// (pToken->n == 0) || (type == TK_RP)) {
|
|
||||||
// return buildSyntaxErrMsg(pMsgBuf, "invalid data or symbol", pToken->z);
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// if (IS_NUMERIC_TYPE(type) && pToken->n == 0) {
|
|
||||||
// return buildSyntaxErrMsg(pMsgBuf, "invalid numeric data", pToken->z);
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// // Remove quotation marks
|
|
||||||
// if (TK_STRING == type) {
|
|
||||||
// if (pToken->n >= TSDB_MAX_BYTES_PER_ROW) {
|
|
||||||
// return buildSyntaxErrMsg(pMsgBuf, "too long string", pToken->z);
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// // delete escape character: \\, \', \"
|
|
||||||
// char delim = pToken->z[0];
|
|
||||||
// int32_t cnt = 0;
|
|
||||||
// int32_t j = 0;
|
|
||||||
// for (uint32_t k = 1; k < pToken->n - 1; ++k) {
|
|
||||||
// if (pToken->z[k] == '\\' || (pToken->z[k] == delim && pToken->z[k + 1] == delim)) {
|
|
||||||
// tmpTokenBuf[j] = pToken->z[k + 1];
|
|
||||||
// cnt++;
|
|
||||||
// j++;
|
|
||||||
// k++;
|
|
||||||
// continue;
|
|
||||||
// }
|
|
||||||
// tmpTokenBuf[j] = pToken->z[k];
|
|
||||||
// j++;
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// tmpTokenBuf[j] = 0;
|
|
||||||
// pToken->z = tmpTokenBuf;
|
|
||||||
// pToken->n -= 2 + cnt;
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// return TSDB_CODE_SUCCESS;
|
|
||||||
//}
|
|
||||||
|
|
||||||
//static FORCE_INLINE int32_t parseValueToken(char** end, SToken* pToken, SSchema* pSchema, int16_t timePrec, char* tmpTokenBuf, _row_append_fn_t func, void* param, SMsgBuf* pMsgBuf) {
|
|
||||||
// int64_t iv;
|
|
||||||
// char *endptr = NULL;
|
|
||||||
// bool isSigned = false;
|
|
||||||
//
|
|
||||||
// CHECK_CODE(checkAndTrimValue(pToken, pSchema->type, tmpTokenBuf, pMsgBuf));
|
|
||||||
//
|
|
||||||
// if (isNullStr(pToken)) {
|
|
||||||
// if (TSDB_DATA_TYPE_TIMESTAMP == pSchema->type && PRIMARYKEY_TIMESTAMP_COL_ID == pSchema->colId) {
|
|
||||||
// int64_t tmpVal = 0;
|
|
||||||
// return func(&tmpVal, pSchema->bytes, param);
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// return func(getNullValue(pSchema->type), 0, param);
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// switch (pSchema->type) {
|
|
||||||
// case TSDB_DATA_TYPE_BOOL: {
|
|
||||||
// if ((pToken->type == TK_BOOL || pToken->type == TK_STRING) && (pToken->n != 0)) {
|
|
||||||
// if (strncmp(pToken->z, "true", pToken->n) == 0) {
|
|
||||||
// return func(&TRUE_VALUE, pSchema->bytes, param);
|
|
||||||
// } else if (strncmp(pToken->z, "false", pToken->n) == 0) {
|
|
||||||
// return func(&FALSE_VALUE, pSchema->bytes, param);
|
|
||||||
// } else {
|
|
||||||
// return buildSyntaxErrMsg(pMsgBuf, "invalid bool data", pToken->z);
|
|
||||||
// }
|
|
||||||
// } else if (pToken->type == TK_INTEGER) {
|
|
||||||
// return func(((strtoll(pToken->z, NULL, 10) == 0) ? &FALSE_VALUE : &TRUE_VALUE), pSchema->bytes, param);
|
|
||||||
// } else if (pToken->type == TK_FLOAT) {
|
|
||||||
// return func(((strtod(pToken->z, NULL) == 0) ? &FALSE_VALUE : &TRUE_VALUE), pSchema->bytes, param);
|
|
||||||
// } else {
|
|
||||||
// return buildSyntaxErrMsg(pMsgBuf, "invalid bool data", pToken->z);
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// case TSDB_DATA_TYPE_TINYINT: {
|
|
||||||
// if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, pToken->type, &iv, &isSigned)) {
|
|
||||||
// return buildSyntaxErrMsg(pMsgBuf, "invalid tinyint data", pToken->z);
|
|
||||||
// } else if (!IS_VALID_TINYINT(iv)) {
|
|
||||||
// return buildSyntaxErrMsg(pMsgBuf, "tinyint data overflow", pToken->z);
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// uint8_t tmpVal = (uint8_t)iv;
|
|
||||||
// return func(&tmpVal, pSchema->bytes, param);
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// case TSDB_DATA_TYPE_UTINYINT:{
|
|
||||||
// if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, pToken->type, &iv, &isSigned)) {
|
|
||||||
// return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned tinyint data", pToken->z);
|
|
||||||
// } else if (!IS_VALID_UTINYINT(iv)) {
|
|
||||||
// return buildSyntaxErrMsg(pMsgBuf, "unsigned tinyint data overflow", pToken->z);
|
|
||||||
// }
|
|
||||||
// uint8_t tmpVal = (uint8_t)iv;
|
|
||||||
// return func(&tmpVal, pSchema->bytes, param);
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// case TSDB_DATA_TYPE_SMALLINT: {
|
|
||||||
// if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, pToken->type, &iv, &isSigned)) {
|
|
||||||
// return buildSyntaxErrMsg(pMsgBuf, "invalid smallint data", pToken->z);
|
|
||||||
// } else if (!IS_VALID_SMALLINT(iv)) {
|
|
||||||
// return buildSyntaxErrMsg(pMsgBuf, "smallint data overflow", pToken->z);
|
|
||||||
// }
|
|
||||||
// int16_t tmpVal = (int16_t)iv;
|
|
||||||
// return func(&tmpVal, pSchema->bytes, param);
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// case TSDB_DATA_TYPE_USMALLINT: {
|
|
||||||
// if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, pToken->type, &iv, &isSigned)) {
|
|
||||||
// return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned smallint data", pToken->z);
|
|
||||||
// } else if (!IS_VALID_USMALLINT(iv)) {
|
|
||||||
// return buildSyntaxErrMsg(pMsgBuf, "unsigned smallint data overflow", pToken->z);
|
|
||||||
// }
|
|
||||||
// uint16_t tmpVal = (uint16_t)iv;
|
|
||||||
// return func(&tmpVal, pSchema->bytes, param);
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// case TSDB_DATA_TYPE_INT: {
|
|
||||||
// if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, pToken->type, &iv, &isSigned)) {
|
|
||||||
// return buildSyntaxErrMsg(pMsgBuf, "invalid int data", pToken->z);
|
|
||||||
// } else if (!IS_VALID_INT(iv)) {
|
|
||||||
// return buildSyntaxErrMsg(pMsgBuf, "int data overflow", pToken->z);
|
|
||||||
// }
|
|
||||||
// int32_t tmpVal = (int32_t)iv;
|
|
||||||
// return func(&tmpVal, pSchema->bytes, param);
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// case TSDB_DATA_TYPE_UINT: {
|
|
||||||
// if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, pToken->type, &iv, &isSigned)) {
|
|
||||||
// return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned int data", pToken->z);
|
|
||||||
// } else if (!IS_VALID_UINT(iv)) {
|
|
||||||
// return buildSyntaxErrMsg(pMsgBuf, "unsigned int data overflow", pToken->z);
|
|
||||||
// }
|
|
||||||
// uint32_t tmpVal = (uint32_t)iv;
|
|
||||||
// return func(&tmpVal, pSchema->bytes, param);
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// case TSDB_DATA_TYPE_BIGINT: {
|
|
||||||
// if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, pToken->type, &iv, &isSigned)) {
|
|
||||||
// return buildSyntaxErrMsg(pMsgBuf, "invalid bigint data", pToken->z);
|
|
||||||
// } else if (!IS_VALID_BIGINT(iv)) {
|
|
||||||
// return buildSyntaxErrMsg(pMsgBuf, "bigint data overflow", pToken->z);
|
|
||||||
// }
|
|
||||||
// return func(&iv, pSchema->bytes, param);
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// case TSDB_DATA_TYPE_UBIGINT: {
|
|
||||||
// if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, pToken->type, &iv, &isSigned)) {
|
|
||||||
// return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned bigint data", pToken->z);
|
|
||||||
// } else if (!IS_VALID_UBIGINT((uint64_t)iv)) {
|
|
||||||
// return buildSyntaxErrMsg(pMsgBuf, "unsigned bigint data overflow", pToken->z);
|
|
||||||
// }
|
|
||||||
// uint64_t tmpVal = (uint64_t)iv;
|
|
||||||
// return func(&tmpVal, pSchema->bytes, param);
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// case TSDB_DATA_TYPE_FLOAT: {
|
|
||||||
// double dv;
|
|
||||||
// if (TK_ILLEGAL == toDouble(pToken, &dv, &endptr)) {
|
|
||||||
// return buildSyntaxErrMsg(pMsgBuf, "illegal float data", pToken->z);
|
|
||||||
// }
|
|
||||||
// if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || dv > FLT_MAX || dv < -FLT_MAX || isinf(dv) || isnan(dv)) {
|
|
||||||
// return buildSyntaxErrMsg(pMsgBuf, "illegal float data", pToken->z);
|
|
||||||
// }
|
|
||||||
// float tmpVal = (float)dv;
|
|
||||||
// return func(&tmpVal, pSchema->bytes, param);
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// case TSDB_DATA_TYPE_DOUBLE: {
|
|
||||||
// double dv;
|
|
||||||
// if (TK_ILLEGAL == toDouble(pToken, &dv, &endptr)) {
|
|
||||||
// return buildSyntaxErrMsg(pMsgBuf, "illegal double data", pToken->z);
|
|
||||||
// }
|
|
||||||
// if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || isinf(dv) || isnan(dv)) {
|
|
||||||
// return buildSyntaxErrMsg(pMsgBuf, "illegal double data", pToken->z);
|
|
||||||
// }
|
|
||||||
// return func(&dv, pSchema->bytes, param);
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// case TSDB_DATA_TYPE_BINARY: {
|
|
||||||
// // too long values will return invalid sql, not be truncated automatically
|
|
||||||
// if (pToken->n + VARSTR_HEADER_SIZE > pSchema->bytes) {
|
|
||||||
// return buildSyntaxErrMsg(pMsgBuf, "string data overflow", pToken->z);
|
|
||||||
// }
|
|
||||||
// return func(pToken->z, pToken->n, param);
|
|
||||||
// }
|
|
||||||
// case TSDB_DATA_TYPE_NCHAR: {
|
|
||||||
// return func(pToken->z, pToken->n, param);
|
|
||||||
// }
|
|
||||||
// case TSDB_DATA_TYPE_TIMESTAMP: {
|
|
||||||
// int64_t tmpVal;
|
|
||||||
// if (parseTime(end, pToken, timePrec, &tmpVal, pMsgBuf) != TSDB_CODE_SUCCESS) {
|
|
||||||
// return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp", pToken->z);
|
|
||||||
// }
|
|
||||||
// return func(&tmpVal, pSchema->bytes, param);
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// return TSDB_CODE_FAILED;
|
|
||||||
//}
|
|
||||||
|
|
||||||
// pSql -> tag1_name, ...)
|
// pSql -> tag1_name, ...)
|
||||||
static int32_t parseBoundColumns(SInsertParseContext* pCxt, SParsedDataColInfo* pColList, SSchema* pSchema) {
|
static int32_t parseBoundColumns(SInsertParseContext* pCxt, SParsedDataColInfo* pColList, SSchema* pSchema) {
|
||||||
int32_t nCols = pColList->numOfCols;
|
int32_t nCols = pColList->numOfCols;
|
||||||
|
|
|
@ -1649,9 +1649,9 @@ static bool isNullStr(SToken *pToken) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE int32_t checkAndTrimValue(SToken* pToken, uint32_t type, char* tmpTokenBuf, SMsgBuf* pMsgBuf) {
|
static FORCE_INLINE int32_t checkAndTrimValue(SToken* pToken, uint32_t type, char* tmpTokenBuf, SMsgBuf* pMsgBuf) {
|
||||||
if ((type != TK_NOW && type != TK_INTEGER && type != TK_STRING && type != TK_FLOAT && type != TK_BOOL &&
|
if ((pToken->type != TK_NOW && pToken->type != TK_INTEGER && pToken->type != TK_STRING && pToken->type != TK_FLOAT && pToken->type != TK_BOOL &&
|
||||||
type != TK_NULL && type != TK_HEX && type != TK_OCT && type != TK_BIN) ||
|
pToken->type != TK_NULL && pToken->type != TK_HEX && pToken->type != TK_OCT && pToken->type != TK_BIN) ||
|
||||||
(pToken->n == 0) || (type == TK_RP)) {
|
(pToken->n == 0) || (pToken->type == TK_RP)) {
|
||||||
return buildSyntaxErrMsg(pMsgBuf, "invalid data or symbol", pToken->z);
|
return buildSyntaxErrMsg(pMsgBuf, "invalid data or symbol", pToken->z);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1795,7 +1795,7 @@ int32_t parseValueToken(char** end, SToken* pToken, SSchema* pSchema, int16_t ti
|
||||||
}
|
}
|
||||||
|
|
||||||
case TSDB_DATA_TYPE_TINYINT: {
|
case TSDB_DATA_TYPE_TINYINT: {
|
||||||
if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, pToken->type, &iv, &isSigned)) {
|
if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv, &isSigned)) {
|
||||||
return buildSyntaxErrMsg(pMsgBuf, "invalid tinyint data", pToken->z);
|
return buildSyntaxErrMsg(pMsgBuf, "invalid tinyint data", pToken->z);
|
||||||
} else if (!IS_VALID_TINYINT(iv)) {
|
} else if (!IS_VALID_TINYINT(iv)) {
|
||||||
return buildSyntaxErrMsg(pMsgBuf, "tinyint data overflow", pToken->z);
|
return buildSyntaxErrMsg(pMsgBuf, "tinyint data overflow", pToken->z);
|
||||||
|
@ -1806,7 +1806,7 @@ int32_t parseValueToken(char** end, SToken* pToken, SSchema* pSchema, int16_t ti
|
||||||
}
|
}
|
||||||
|
|
||||||
case TSDB_DATA_TYPE_UTINYINT:{
|
case TSDB_DATA_TYPE_UTINYINT:{
|
||||||
if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, pToken->type, &iv, &isSigned)) {
|
if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv, &isSigned)) {
|
||||||
return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned tinyint data", pToken->z);
|
return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned tinyint data", pToken->z);
|
||||||
} else if (!IS_VALID_UTINYINT(iv)) {
|
} else if (!IS_VALID_UTINYINT(iv)) {
|
||||||
return buildSyntaxErrMsg(pMsgBuf, "unsigned tinyint data overflow", pToken->z);
|
return buildSyntaxErrMsg(pMsgBuf, "unsigned tinyint data overflow", pToken->z);
|
||||||
|
@ -1816,7 +1816,7 @@ int32_t parseValueToken(char** end, SToken* pToken, SSchema* pSchema, int16_t ti
|
||||||
}
|
}
|
||||||
|
|
||||||
case TSDB_DATA_TYPE_SMALLINT: {
|
case TSDB_DATA_TYPE_SMALLINT: {
|
||||||
if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, pToken->type, &iv, &isSigned)) {
|
if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv, &isSigned)) {
|
||||||
return buildSyntaxErrMsg(pMsgBuf, "invalid smallint data", pToken->z);
|
return buildSyntaxErrMsg(pMsgBuf, "invalid smallint data", pToken->z);
|
||||||
} else if (!IS_VALID_SMALLINT(iv)) {
|
} else if (!IS_VALID_SMALLINT(iv)) {
|
||||||
return buildSyntaxErrMsg(pMsgBuf, "smallint data overflow", pToken->z);
|
return buildSyntaxErrMsg(pMsgBuf, "smallint data overflow", pToken->z);
|
||||||
|
@ -1826,7 +1826,7 @@ int32_t parseValueToken(char** end, SToken* pToken, SSchema* pSchema, int16_t ti
|
||||||
}
|
}
|
||||||
|
|
||||||
case TSDB_DATA_TYPE_USMALLINT: {
|
case TSDB_DATA_TYPE_USMALLINT: {
|
||||||
if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, pToken->type, &iv, &isSigned)) {
|
if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv, &isSigned)) {
|
||||||
return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned smallint data", pToken->z);
|
return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned smallint data", pToken->z);
|
||||||
} else if (!IS_VALID_USMALLINT(iv)) {
|
} else if (!IS_VALID_USMALLINT(iv)) {
|
||||||
return buildSyntaxErrMsg(pMsgBuf, "unsigned smallint data overflow", pToken->z);
|
return buildSyntaxErrMsg(pMsgBuf, "unsigned smallint data overflow", pToken->z);
|
||||||
|
@ -1836,7 +1836,7 @@ int32_t parseValueToken(char** end, SToken* pToken, SSchema* pSchema, int16_t ti
|
||||||
}
|
}
|
||||||
|
|
||||||
case TSDB_DATA_TYPE_INT: {
|
case TSDB_DATA_TYPE_INT: {
|
||||||
if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, pToken->type, &iv, &isSigned)) {
|
if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv, &isSigned)) {
|
||||||
return buildSyntaxErrMsg(pMsgBuf, "invalid int data", pToken->z);
|
return buildSyntaxErrMsg(pMsgBuf, "invalid int data", pToken->z);
|
||||||
} else if (!IS_VALID_INT(iv)) {
|
} else if (!IS_VALID_INT(iv)) {
|
||||||
return buildSyntaxErrMsg(pMsgBuf, "int data overflow", pToken->z);
|
return buildSyntaxErrMsg(pMsgBuf, "int data overflow", pToken->z);
|
||||||
|
@ -1846,7 +1846,7 @@ int32_t parseValueToken(char** end, SToken* pToken, SSchema* pSchema, int16_t ti
|
||||||
}
|
}
|
||||||
|
|
||||||
case TSDB_DATA_TYPE_UINT: {
|
case TSDB_DATA_TYPE_UINT: {
|
||||||
if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, pToken->type, &iv, &isSigned)) {
|
if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv, &isSigned)) {
|
||||||
return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned int data", pToken->z);
|
return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned int data", pToken->z);
|
||||||
} else if (!IS_VALID_UINT(iv)) {
|
} else if (!IS_VALID_UINT(iv)) {
|
||||||
return buildSyntaxErrMsg(pMsgBuf, "unsigned int data overflow", pToken->z);
|
return buildSyntaxErrMsg(pMsgBuf, "unsigned int data overflow", pToken->z);
|
||||||
|
@ -1856,7 +1856,7 @@ int32_t parseValueToken(char** end, SToken* pToken, SSchema* pSchema, int16_t ti
|
||||||
}
|
}
|
||||||
|
|
||||||
case TSDB_DATA_TYPE_BIGINT: {
|
case TSDB_DATA_TYPE_BIGINT: {
|
||||||
if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, pToken->type, &iv, &isSigned)) {
|
if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv, &isSigned)) {
|
||||||
return buildSyntaxErrMsg(pMsgBuf, "invalid bigint data", pToken->z);
|
return buildSyntaxErrMsg(pMsgBuf, "invalid bigint data", pToken->z);
|
||||||
} else if (!IS_VALID_BIGINT(iv)) {
|
} else if (!IS_VALID_BIGINT(iv)) {
|
||||||
return buildSyntaxErrMsg(pMsgBuf, "bigint data overflow", pToken->z);
|
return buildSyntaxErrMsg(pMsgBuf, "bigint data overflow", pToken->z);
|
||||||
|
@ -1865,7 +1865,7 @@ int32_t parseValueToken(char** end, SToken* pToken, SSchema* pSchema, int16_t ti
|
||||||
}
|
}
|
||||||
|
|
||||||
case TSDB_DATA_TYPE_UBIGINT: {
|
case TSDB_DATA_TYPE_UBIGINT: {
|
||||||
if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, pToken->type, &iv, &isSigned)) {
|
if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, 10, &iv, &isSigned)) {
|
||||||
return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned bigint data", pToken->z);
|
return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned bigint data", pToken->z);
|
||||||
} else if (!IS_VALID_UBIGINT((uint64_t)iv)) {
|
} else if (!IS_VALID_UBIGINT((uint64_t)iv)) {
|
||||||
return buildSyntaxErrMsg(pMsgBuf, "unsigned bigint data overflow", pToken->z);
|
return buildSyntaxErrMsg(pMsgBuf, "unsigned bigint data overflow", pToken->z);
|
||||||
|
|
|
@ -97,8 +97,8 @@ public:
|
||||||
int32_t catalogGetTableMeta(const SName* pTableName, STableMeta** pTableMeta) const {
|
int32_t catalogGetTableMeta(const SName* pTableName, STableMeta** pTableMeta) const {
|
||||||
std::unique_ptr<STableMeta> table;
|
std::unique_ptr<STableMeta> table;
|
||||||
|
|
||||||
char db[TSDB_DB_FNAME_LEN] = {0};
|
char db[TSDB_DB_NAME_LEN] = {0};
|
||||||
tNameGetFullDbName(pTableName, db);
|
tNameGetDbName(pTableName, db);
|
||||||
|
|
||||||
const char* tname = tNameGetTableName(pTableName);
|
const char* tname = tNameGetTableName(pTableName);
|
||||||
int32_t code = copyTableSchemaMeta(db, tname, &table);
|
int32_t code = copyTableSchemaMeta(db, tname, &table);
|
||||||
|
@ -111,6 +111,7 @@ public:
|
||||||
|
|
||||||
int32_t catalogGetTableHashVgroup(const SName* pTableName, SVgroupInfo* vgInfo) const {
|
int32_t catalogGetTableHashVgroup(const SName* pTableName, SVgroupInfo* vgInfo) const {
|
||||||
// todo
|
// todo
|
||||||
|
vgInfo->vgId = 1;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -207,6 +207,7 @@ static SSubplan* initSubplan(SPlanContext* pCxt, int32_t type) {
|
||||||
}
|
}
|
||||||
taosArrayPush(currentLevel, &subplan);
|
taosArrayPush(currentLevel, &subplan);
|
||||||
pCxt->pCurrentSubplan = subplan;
|
pCxt->pCurrentSubplan = subplan;
|
||||||
|
++(pCxt->pDag->numOfSubplans);
|
||||||
return subplan;
|
return subplan;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -293,11 +294,14 @@ static void splitInsertSubplan(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
|
||||||
SArray* vgs = (SArray*)pPlanNode->pExtInfo;
|
SArray* vgs = (SArray*)pPlanNode->pExtInfo;
|
||||||
size_t numOfVg = taosArrayGetSize(vgs);
|
size_t numOfVg = taosArrayGetSize(vgs);
|
||||||
for (int32_t i = 0; i < numOfVg; ++i) {
|
for (int32_t i = 0; i < numOfVg; ++i) {
|
||||||
|
STORE_CURRENT_SUBPLAN(pCxt);
|
||||||
SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_MODIFY);
|
SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_MODIFY);
|
||||||
SVgDataBlocks* blocks = (SVgDataBlocks*)taosArrayGetP(vgs, i);
|
SVgDataBlocks* blocks = (SVgDataBlocks*)taosArrayGetP(vgs, i);
|
||||||
vgroupInfoToEpSet(&blocks->vg, &subplan->execEpSet);
|
vgroupInfoToEpSet(&blocks->vg, &subplan->execEpSet);
|
||||||
subplan->pNode = NULL;
|
subplan->pNode = NULL;
|
||||||
subplan->pDataSink = createDataInserter(pCxt, blocks);
|
subplan->pDataSink = createDataInserter(pCxt, blocks);
|
||||||
|
subplan->type = QUERY_TYPE_MODIFY;
|
||||||
|
RECOVERY_CURRENT_SUBPLAN(pCxt);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -166,7 +166,7 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SSchJob *job) {
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t n = 0; n < levelPlanNum; ++n) {
|
for (int32_t n = 0; n < levelPlanNum; ++n) {
|
||||||
SSubplan *plan = taosArrayGet(levelPlans, n);
|
SSubplan *plan = taosArrayGetP(levelPlans, n);
|
||||||
SSchTask task = {0};
|
SSchTask task = {0};
|
||||||
|
|
||||||
if (plan->type == QUERY_TYPE_MODIFY) {
|
if (plan->type == QUERY_TYPE_MODIFY) {
|
||||||
|
|
Loading…
Reference in New Issue