TD-11819 Parsing insert statement and assembling binary objects.
This commit is contained in:
parent
8c7fd12ce4
commit
5e981e43ad
|
@ -12,6 +12,7 @@ debug/
|
||||||
release/
|
release/
|
||||||
target/
|
target/
|
||||||
debs/
|
debs/
|
||||||
|
deps/
|
||||||
rpms/
|
rpms/
|
||||||
mac/
|
mac/
|
||||||
*.pyc
|
*.pyc
|
||||||
|
|
|
@ -44,10 +44,10 @@ int64_t taosTimeAdd(int64_t t, int64_t duration, char unit, int32_t precision);
|
||||||
int64_t taosTimeTruncate(int64_t t, const SInterval* pInterval, int32_t precision);
|
int64_t taosTimeTruncate(int64_t t, const SInterval* pInterval, int32_t precision);
|
||||||
int32_t taosTimeCountInterval(int64_t skey, int64_t ekey, int64_t interval, char unit, int32_t precision);
|
int32_t taosTimeCountInterval(int64_t skey, int64_t ekey, int64_t interval, char unit, int32_t precision);
|
||||||
|
|
||||||
int32_t parseAbsoluteDuration(char* token, int32_t tokenlen, int64_t* ts, char* unit, int32_t timePrecision);
|
int32_t parseAbsoluteDuration(const char* token, int32_t tokenlen, int64_t* ts, char* unit, int32_t timePrecision);
|
||||||
int32_t parseNatualDuration(const char* token, int32_t tokenLen, int64_t* duration, char* unit, int32_t timePrecision);
|
int32_t parseNatualDuration(const char* token, int32_t tokenLen, int64_t* duration, char* unit, int32_t timePrecision);
|
||||||
|
|
||||||
int32_t taosParseTime(char* timestr, int64_t* time, int32_t len, int32_t timePrec, int8_t dayligth);
|
int32_t taosParseTime(const char* timestr, int64_t* time, int32_t len, int32_t timePrec, int8_t dayligth);
|
||||||
void deltaToUtcInitOnce();
|
void deltaToUtcInitOnce();
|
||||||
|
|
||||||
int64_t convertTimePrecision(int64_t time, int32_t fromPrecision, int32_t toPrecision);
|
int64_t convertTimePrecision(int64_t time, int32_t fromPrecision, int32_t toPrecision);
|
||||||
|
|
|
@ -40,7 +40,7 @@ int32_t toInteger(const char* z, int32_t n, int32_t base, int64_t* value, bool*
|
||||||
|
|
||||||
bool taosVariantIsValid(SVariant *pVar);
|
bool taosVariantIsValid(SVariant *pVar);
|
||||||
|
|
||||||
void taosVariantCreate(SVariant *pVar, char* z, int32_t n, int32_t type);
|
void taosVariantCreate(SVariant *pVar, const char* z, int32_t n, int32_t type);
|
||||||
|
|
||||||
void taosVariantCreateFromBinary(SVariant *pVar, const char *pz, size_t len, uint32_t type);
|
void taosVariantCreateFromBinary(SVariant *pVar, const char *pz, size_t len, uint32_t type);
|
||||||
|
|
||||||
|
|
|
@ -131,6 +131,18 @@ struct SInsertStmtInfo;
|
||||||
*/
|
*/
|
||||||
bool qIsInsertSql(const char* pStr, size_t length);
|
bool qIsInsertSql(const char* pStr, size_t length);
|
||||||
|
|
||||||
|
typedef struct SParseContext {
|
||||||
|
const char* pSql; // sql string
|
||||||
|
size_t sqlLen; // length of the sql string
|
||||||
|
int64_t id; // operator id, generated by uuid generator
|
||||||
|
const char* pDbname;
|
||||||
|
const SEpSet* pEpSet;
|
||||||
|
int8_t schemaAttached; // denote if submit block is built with table schema or not
|
||||||
|
|
||||||
|
char* pMsg; // extended error message if exists to help avoid the problem in sql statement.
|
||||||
|
int32_t msgLen; // max length of the msg
|
||||||
|
} SParseContext;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Parse the sql statement and then return the SQueryStmtInfo as the result of bounded AST.
|
* Parse the sql statement and then return the SQueryStmtInfo as the result of bounded AST.
|
||||||
* @param pSql sql string
|
* @param pSql sql string
|
||||||
|
@ -141,16 +153,29 @@ bool qIsInsertSql(const char* pStr, size_t length);
|
||||||
*/
|
*/
|
||||||
int32_t qParseQuerySql(const char* pStr, size_t length, struct SQueryStmtInfo** pQueryInfo, int64_t id, char* msg, int32_t msgLen);
|
int32_t qParseQuerySql(const char* pStr, size_t length, struct SQueryStmtInfo** pQueryInfo, int64_t id, char* msg, int32_t msgLen);
|
||||||
|
|
||||||
|
typedef enum {
|
||||||
|
PAYLOAD_TYPE_KV = 0,
|
||||||
|
PAYLOAD_TYPE_RAW = 1,
|
||||||
|
} EPayloadType;
|
||||||
|
|
||||||
|
typedef struct SInsertStmtInfo {
|
||||||
|
// SHashObj* pTableBlockHashList; // data block for each table
|
||||||
|
SArray* pDataBlocks; // SArray<STableDataBlocks*>. Merged submit block for each vgroup
|
||||||
|
int8_t schemaAttache; // denote if submit block is built with table schema or not
|
||||||
|
uint8_t payloadType; // EPayloadType. 0: K-V payload for non-prepare insert, 1: rawPayload for prepare insert
|
||||||
|
uint32_t insertType; // insert data from [file|sql statement| bound statement]
|
||||||
|
const char* sql; // current sql statement position
|
||||||
|
} SInsertStmtInfo;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Parse the insert sql statement.
|
* Parse the insert sql statement.
|
||||||
* @param pStr sql string
|
* @param pStr sql string
|
||||||
* @param length length of the sql string
|
* @param length length of the sql string
|
||||||
* @param pInsertParam data in binary format to submit to vnode directly.
|
|
||||||
* @param id operator id, generated by uuid generator.
|
* @param id operator id, generated by uuid generator.
|
||||||
* @param msg extended error message if exists to help avoid the problem in sql statement.
|
* @param msg extended error message if exists to help avoid the problem in sql statement.
|
||||||
* @return
|
* @return data in binary format to submit to vnode directly.
|
||||||
*/
|
*/
|
||||||
int32_t qParseInsertSql(const char* pStr, size_t length, struct SInsertStmtInfo** pInsertInfo, int64_t id, char* msg, int32_t msgLen);
|
int32_t qParseInsertSql(SParseContext* pContext, struct SInsertStmtInfo** pInfo);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Convert a normal sql statement to only query tags information to enable that the subscribe client can be aware quickly of the true vgroup ids that
|
* Convert a normal sql statement to only query tags information to enable that the subscribe client can be aware quickly of the true vgroup ids that
|
||||||
|
|
|
@ -38,11 +38,11 @@ extern "C" {
|
||||||
(dst)[(size)-1] = 0; \
|
(dst)[(size)-1] = 0; \
|
||||||
} while (0)
|
} while (0)
|
||||||
|
|
||||||
int64_t taosStr2int64(char *str);
|
int64_t taosStr2int64(const char *str);
|
||||||
|
|
||||||
// USE_LIBICONV
|
// USE_LIBICONV
|
||||||
int32_t taosUcs4ToMbs(void *ucs4, int32_t ucs4_max_len, char *mbs);
|
int32_t taosUcs4ToMbs(void *ucs4, int32_t ucs4_max_len, char *mbs);
|
||||||
bool taosMbsToUcs4(char *mbs, size_t mbs_len, char *ucs4, int32_t ucs4_max_len, int32_t *len);
|
bool taosMbsToUcs4(const char *mbs, size_t mbs_len, char *ucs4, int32_t ucs4_max_len, int32_t *len);
|
||||||
int32_t tasoUcs4Compare(void *f1_ucs4, void *f2_ucs4, int32_t bytes, int8_t ncharSize);
|
int32_t tasoUcs4Compare(void *f1_ucs4, void *f2_ucs4, int32_t bytes, int8_t ncharSize);
|
||||||
bool taosValidateEncodec(const char *encodec);
|
bool taosValidateEncodec(const char *encodec);
|
||||||
char * taosCharsetReplace(char *charsetstr);
|
char * taosCharsetReplace(char *charsetstr);
|
||||||
|
|
|
@ -29,7 +29,7 @@ int32_t strdequote(char *src);
|
||||||
int32_t strndequote(char *dst, const char* z, int32_t len);
|
int32_t strndequote(char *dst, const char* z, int32_t len);
|
||||||
int32_t strRmquote(char *z, int32_t len);
|
int32_t strRmquote(char *z, int32_t len);
|
||||||
size_t strtrim(char *src);
|
size_t strtrim(char *src);
|
||||||
char * strnchr(char *haystack, char needle, int32_t len, bool skipquote);
|
char * strnchr(const char *haystack, char needle, int32_t len, bool skipquote);
|
||||||
char ** strsplit(char *src, const char *delim, int32_t *num);
|
char ** strsplit(char *src, const char *delim, int32_t *num);
|
||||||
char * strtolower(char *dst, const char *src);
|
char * strtolower(char *dst, const char *src);
|
||||||
char * strntolower(char *dst, const char *src, int32_t n);
|
char * strntolower(char *dst, const char *src, int32_t n);
|
||||||
|
|
|
@ -82,18 +82,18 @@ void deltaToUtcInitOnce() {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int64_t parseFraction(char* str, char** end, int32_t timePrec);
|
static int64_t parseFraction(char* str, char** end, int32_t timePrec);
|
||||||
static int32_t parseTimeWithTz(char* timestr, int64_t* time, int32_t timePrec, char delim);
|
static int32_t parseTimeWithTz(const char* timestr, int64_t* time, int32_t timePrec, char delim);
|
||||||
static int32_t parseLocaltime(char* timestr, int64_t* time, int32_t timePrec);
|
static int32_t parseLocaltime(char* timestr, int64_t* time, int32_t timePrec);
|
||||||
static int32_t parseLocaltimeDst(char* timestr, int64_t* time, int32_t timePrec);
|
static int32_t parseLocaltimeDst(char* timestr, int64_t* time, int32_t timePrec);
|
||||||
static char* forwardToTimeStringEnd(char* str);
|
static char* forwardToTimeStringEnd(char* str);
|
||||||
static bool checkTzPresent(char *str, int32_t len);
|
static bool checkTzPresent(const char *str, int32_t len);
|
||||||
|
|
||||||
static int32_t (*parseLocaltimeFp[]) (char* timestr, int64_t* time, int32_t timePrec) = {
|
static int32_t (*parseLocaltimeFp[]) (char* timestr, int64_t* time, int32_t timePrec) = {
|
||||||
parseLocaltime,
|
parseLocaltime,
|
||||||
parseLocaltimeDst
|
parseLocaltimeDst
|
||||||
};
|
};
|
||||||
|
|
||||||
int32_t taosParseTime(char* timestr, int64_t* time, int32_t len, int32_t timePrec, int8_t day_light) {
|
int32_t taosParseTime(const char* timestr, int64_t* time, int32_t len, int32_t timePrec, int8_t day_light) {
|
||||||
/* parse datatime string in with tz */
|
/* parse datatime string in with tz */
|
||||||
if (strnchr(timestr, 'T', len, false) != NULL) {
|
if (strnchr(timestr, 'T', len, false) != NULL) {
|
||||||
return parseTimeWithTz(timestr, time, timePrec, 'T');
|
return parseTimeWithTz(timestr, time, timePrec, 'T');
|
||||||
|
@ -104,7 +104,7 @@ int32_t taosParseTime(char* timestr, int64_t* time, int32_t len, int32_t timePre
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
bool checkTzPresent(char *str, int32_t len) {
|
bool checkTzPresent(const char *str, int32_t len) {
|
||||||
char *seg = forwardToTimeStringEnd(str);
|
char *seg = forwardToTimeStringEnd(str);
|
||||||
int32_t seg_len = len - (int32_t)(seg - str);
|
int32_t seg_len = len - (int32_t)(seg - str);
|
||||||
|
|
||||||
|
@ -237,7 +237,7 @@ int32_t parseTimezone(char* str, int64_t* tzOffset) {
|
||||||
* 2013-04-12T15:52:01+0800
|
* 2013-04-12T15:52:01+0800
|
||||||
* 2013-04-12T15:52:01.123+0800
|
* 2013-04-12T15:52:01.123+0800
|
||||||
*/
|
*/
|
||||||
int32_t parseTimeWithTz(char* timestr, int64_t* time, int32_t timePrec, char delim) {
|
int32_t parseTimeWithTz(const char* timestr, int64_t* time, int32_t timePrec, char delim) {
|
||||||
|
|
||||||
int64_t factor = (timePrec == TSDB_TIME_PRECISION_MILLI) ? 1000 :
|
int64_t factor = (timePrec == TSDB_TIME_PRECISION_MILLI) ? 1000 :
|
||||||
(timePrec == TSDB_TIME_PRECISION_MICRO ? 1000000 : 1000000000);
|
(timePrec == TSDB_TIME_PRECISION_MICRO ? 1000000 : 1000000000);
|
||||||
|
@ -432,7 +432,7 @@ static int32_t getDuration(int64_t val, char unit, int64_t* result, int32_t time
|
||||||
* d - Days (24 hours)
|
* d - Days (24 hours)
|
||||||
* w - Weeks (7 days)
|
* w - Weeks (7 days)
|
||||||
*/
|
*/
|
||||||
int32_t parseAbsoluteDuration(char* token, int32_t tokenlen, int64_t* duration, char* unit, int32_t timePrecision) {
|
int32_t parseAbsoluteDuration(const char* token, int32_t tokenlen, int64_t* duration, char* unit, int32_t timePrecision) {
|
||||||
errno = 0;
|
errno = 0;
|
||||||
char* endPtr = NULL;
|
char* endPtr = NULL;
|
||||||
|
|
||||||
|
|
|
@ -75,7 +75,7 @@ int32_t toInteger(const char* z, int32_t n, int32_t base, int64_t* value, bool*
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void taosVariantCreate(SVariant *pVar, char* z, int32_t n, int32_t type) {
|
void taosVariantCreate(SVariant *pVar, const char* z, int32_t n, int32_t type) {
|
||||||
int32_t ret = 0;
|
int32_t ret = 0;
|
||||||
memset(pVar, 0, sizeof(SVariant));
|
memset(pVar, 0, sizeof(SVariant));
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,204 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef TDENGINE_DATABLOCKMGT_H
|
||||||
|
#define TDENGINE_DATABLOCKMGT_H
|
||||||
|
|
||||||
|
#include "catalog.h"
|
||||||
|
#include "os.h"
|
||||||
|
#include "ttypes.h"
|
||||||
|
#include "tname.h"
|
||||||
|
|
||||||
|
#define IS_DATA_COL_ORDERED(spd) ((spd->orderStatus) == (int8_t)ORDER_STATUS_ORDERED)
|
||||||
|
|
||||||
|
typedef enum EOrderStatus {
|
||||||
|
ORDER_STATUS_UNKNOWN = 0,
|
||||||
|
ORDER_STATUS_ORDERED = 1,
|
||||||
|
ORDER_STATUS_DISORDERED = 2,
|
||||||
|
} EOrderStatus;
|
||||||
|
|
||||||
|
typedef enum EValStat {
|
||||||
|
VAL_STAT_HAS = 0x0, // 0 means has val
|
||||||
|
VAL_STAT_NONE = 0x01, // 1 means no val
|
||||||
|
} EValStat;
|
||||||
|
|
||||||
|
typedef enum ERowCompareStat {
|
||||||
|
ROW_COMPARE_NO_NEED = 0,
|
||||||
|
ROW_COMPARE_NEED = 1,
|
||||||
|
} ERowCompareStat;
|
||||||
|
|
||||||
|
typedef struct SBoundColumn {
|
||||||
|
int32_t offset; // all column offset value
|
||||||
|
int32_t toffset; // first part offset for SDataRow TODO: get offset from STSchema on future
|
||||||
|
uint8_t valStat; // denote if current column bound or not(0 means has val, 1 means no val)
|
||||||
|
} SBoundColumn;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
uint16_t schemaColIdx;
|
||||||
|
uint16_t boundIdx;
|
||||||
|
uint16_t finalIdx;
|
||||||
|
} SBoundIdxInfo;
|
||||||
|
|
||||||
|
typedef struct SParsedDataColInfo {
|
||||||
|
int16_t numOfCols;
|
||||||
|
int16_t numOfBound;
|
||||||
|
uint16_t flen; // TODO: get from STSchema
|
||||||
|
uint16_t allNullLen; // TODO: get from STSchema
|
||||||
|
uint16_t extendedVarLen;
|
||||||
|
int32_t *boundedColumns; // bound column idx according to schema
|
||||||
|
SBoundColumn *cols;
|
||||||
|
SBoundIdxInfo *colIdxInfo;
|
||||||
|
int8_t orderStatus; // bound columns
|
||||||
|
} SParsedDataColInfo;
|
||||||
|
|
||||||
|
typedef struct SParamInfo {
|
||||||
|
int32_t idx;
|
||||||
|
uint8_t type;
|
||||||
|
uint8_t timePrec;
|
||||||
|
int16_t bytes;
|
||||||
|
uint32_t offset;
|
||||||
|
} SParamInfo;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int32_t dataLen; // len of SDataRow
|
||||||
|
int32_t kvLen; // len of SKVRow
|
||||||
|
} SMemRowInfo;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
uint8_t memRowType; // default is 0, that is SDataRow
|
||||||
|
uint8_t compareStat; // 0 no need, 1 need compare
|
||||||
|
TDRowTLenT kvRowInitLen;
|
||||||
|
SMemRowInfo *rowInfo;
|
||||||
|
} SMemRowBuilder;
|
||||||
|
|
||||||
|
typedef struct SBlockKeyTuple {
|
||||||
|
TSKEY skey;
|
||||||
|
void* payloadAddr;
|
||||||
|
} SBlockKeyTuple;
|
||||||
|
|
||||||
|
typedef struct SBlockKeyInfo {
|
||||||
|
int32_t maxBytesAlloc;
|
||||||
|
SBlockKeyTuple* pKeyTuple;
|
||||||
|
} SBlockKeyInfo;
|
||||||
|
|
||||||
|
typedef struct STableDataBlocks {
|
||||||
|
SName tableName;
|
||||||
|
int8_t tsSource; // where does the UNIX timestamp come from, server or client
|
||||||
|
bool ordered; // if current rows are ordered or not
|
||||||
|
int64_t vgId; // virtual group id
|
||||||
|
int64_t prevTS; // previous timestamp, recorded to decide if the records array is ts ascending
|
||||||
|
int32_t numOfTables; // number of tables in current submit block
|
||||||
|
int32_t rowSize; // row size for current table
|
||||||
|
uint32_t nAllocSize;
|
||||||
|
uint32_t headerSize; // header for table info (uid, tid, submit metadata)
|
||||||
|
uint32_t size;
|
||||||
|
STableMeta *pTableMeta; // the tableMeta of current table, the table meta will be used during submit, keep a ref to avoid to be removed from cache
|
||||||
|
char *pData;
|
||||||
|
bool cloned;
|
||||||
|
STagData tagData;
|
||||||
|
|
||||||
|
SParsedDataColInfo boundColumnInfo;
|
||||||
|
|
||||||
|
// for parameter ('?') binding
|
||||||
|
uint32_t numOfAllocedParams;
|
||||||
|
uint32_t numOfParams;
|
||||||
|
SParamInfo * params;
|
||||||
|
SMemRowBuilder rowBuilder;
|
||||||
|
} STableDataBlocks;
|
||||||
|
|
||||||
|
static FORCE_INLINE void initSMemRow(SMemRow row, uint8_t memRowType, STableDataBlocks *pBlock, int16_t nBoundCols) {
|
||||||
|
memRowSetType(row, memRowType);
|
||||||
|
if (isDataRowT(memRowType)) {
|
||||||
|
dataRowSetVersion(memRowDataBody(row), pBlock->pTableMeta->sversion);
|
||||||
|
dataRowSetLen(memRowDataBody(row), (TDRowLenT)(TD_DATA_ROW_HEAD_SIZE + pBlock->boundColumnInfo.flen));
|
||||||
|
} else {
|
||||||
|
ASSERT(nBoundCols > 0);
|
||||||
|
memRowSetKvVersion(row, pBlock->pTableMeta->sversion);
|
||||||
|
kvRowSetNCols(memRowKvBody(row), nBoundCols);
|
||||||
|
kvRowSetLen(memRowKvBody(row), (TDRowLenT)(TD_KV_ROW_HEAD_SIZE + sizeof(SColIdx) * nBoundCols));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static FORCE_INLINE int32_t getExtendedRowSize(STableDataBlocks *pBlock) {
|
||||||
|
ASSERT(pBlock->rowSize == pBlock->pTableMeta->tableInfo.rowSize);
|
||||||
|
return pBlock->rowSize + TD_MEM_ROW_DATA_HEAD_SIZE + pBlock->boundColumnInfo.extendedVarLen;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Applicable to consume by one row
|
||||||
|
static FORCE_INLINE void appendMemRowColValEx(SMemRow row, const void *value, bool isCopyVarData, int16_t colId,
|
||||||
|
int8_t colType, int32_t toffset, int32_t *dataLen, int32_t *kvLen,
|
||||||
|
uint8_t compareStat) {
|
||||||
|
tdAppendMemRowColVal(row, value, isCopyVarData, colId, colType, toffset);
|
||||||
|
if (compareStat == ROW_COMPARE_NEED) {
|
||||||
|
tdGetColAppendDeltaLen(value, colType, dataLen, kvLen);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static FORCE_INLINE void getMemRowAppendInfo(SSchema *pSchema, uint8_t memRowType, SParsedDataColInfo *spd,
|
||||||
|
int32_t idx, int32_t *toffset, int16_t *colId) {
|
||||||
|
int32_t schemaIdx = 0;
|
||||||
|
if (IS_DATA_COL_ORDERED(spd)) {
|
||||||
|
schemaIdx = spd->boundedColumns[idx];
|
||||||
|
if (isDataRowT(memRowType)) {
|
||||||
|
*toffset = (spd->cols + schemaIdx)->toffset; // the offset of firstPart
|
||||||
|
} else {
|
||||||
|
*toffset = idx * sizeof(SColIdx); // the offset of SColIdx
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
ASSERT(idx == (spd->colIdxInfo + idx)->boundIdx);
|
||||||
|
schemaIdx = (spd->colIdxInfo + idx)->schemaColIdx;
|
||||||
|
if (isDataRowT(memRowType)) {
|
||||||
|
*toffset = (spd->cols + schemaIdx)->toffset;
|
||||||
|
} else {
|
||||||
|
*toffset = ((spd->colIdxInfo + idx)->finalIdx) * sizeof(SColIdx);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
*colId = pSchema[schemaIdx].colId;
|
||||||
|
}
|
||||||
|
|
||||||
|
static FORCE_INLINE void checkAndConvertMemRow(SMemRow row, int32_t dataLen, int32_t kvLen) {
|
||||||
|
if (isDataRow(row)) {
|
||||||
|
if (kvLen < (dataLen * KVRatioConvert)) {
|
||||||
|
memRowSetConvert(row);
|
||||||
|
}
|
||||||
|
} else if (kvLen > dataLen) {
|
||||||
|
memRowSetConvert(row);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static FORCE_INLINE int32_t setBlockInfo(SSubmitBlk *pBlocks, const STableMeta *pTableMeta, int32_t numOfRows) {
|
||||||
|
pBlocks->tid = pTableMeta->suid;
|
||||||
|
pBlocks->uid = pTableMeta->uid;
|
||||||
|
pBlocks->sversion = pTableMeta->sversion;
|
||||||
|
|
||||||
|
if (pBlocks->numOfRows + numOfRows >= INT16_MAX) {
|
||||||
|
return TSDB_CODE_TSC_INVALID_OPERATION;
|
||||||
|
} else {
|
||||||
|
pBlocks->numOfRows += numOfRows;
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t schemaIdxCompar(const void *lhs, const void *rhs);
|
||||||
|
int32_t boundIdxCompar(const void *lhs, const void *rhs);
|
||||||
|
void setBoundColumnInfo(SParsedDataColInfo* pColList, SSchema* pSchema, int32_t numOfCols);
|
||||||
|
void destroyBoundColumnInfo(SParsedDataColInfo* pColList);
|
||||||
|
int32_t initMemRowBuilder(SMemRowBuilder *pBuilder, uint32_t nRows, uint32_t nCols, uint32_t nBoundCols, int32_t allNullLen);
|
||||||
|
int32_t allocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int32_t * numOfRows);
|
||||||
|
int32_t getDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, int32_t startOffset, int32_t rowSize,
|
||||||
|
SName* name, const STableMeta* pTableMeta, STableDataBlocks** dataBlocks, SArray* pBlockList);
|
||||||
|
int32_t mergeTableDataBlocks(SHashObj* pHashObj, int8_t schemaAttached, uint8_t payloadType, bool freeBlockMap);
|
||||||
|
|
||||||
|
#endif // TDENGINE_DATABLOCKMGT_H
|
|
@ -16,4 +16,8 @@
|
||||||
#ifndef TDENGINE_INSERTPARSER_H
|
#ifndef TDENGINE_INSERTPARSER_H
|
||||||
#define TDENGINE_INSERTPARSER_H
|
#define TDENGINE_INSERTPARSER_H
|
||||||
|
|
||||||
|
#include "parser.h"
|
||||||
|
|
||||||
|
int32_t parseInsertSql(SParseContext* pContext, SInsertStmtInfo** pInfo);
|
||||||
|
|
||||||
#endif // TDENGINE_INSERTPARSER_H
|
#endif // TDENGINE_INSERTPARSER_H
|
||||||
|
|
|
@ -26,14 +26,6 @@ extern "C" {
|
||||||
|
|
||||||
struct SSqlNode;
|
struct SSqlNode;
|
||||||
|
|
||||||
typedef struct SInsertStmtInfo {
|
|
||||||
SHashObj *pTableBlockHashList; // data block for each table
|
|
||||||
SArray *pDataBlocks; // SArray<STableDataBlocks*>. Merged submit block for each vgroup
|
|
||||||
int8_t schemaAttached; // denote if submit block is built with table schema or not
|
|
||||||
uint8_t payloadType; // EPayloadType. 0: K-V payload for non-prepare insert, 1: rawPayload for prepare insert
|
|
||||||
uint32_t insertType; // insert data from [file|sql statement| bound statement]
|
|
||||||
char *sql; // current sql statement position
|
|
||||||
} SInsertStmtInfo;
|
|
||||||
|
|
||||||
typedef struct SInternalField {
|
typedef struct SInternalField {
|
||||||
TAOS_FIELD field;
|
TAOS_FIELD field;
|
||||||
|
|
|
@ -46,7 +46,7 @@ SInternalField* getInternalField(SFieldInfo* pFieldInfo, int32_t index);
|
||||||
|
|
||||||
int32_t parserValidateIdToken(SToken* pToken);
|
int32_t parserValidateIdToken(SToken* pToken);
|
||||||
int32_t buildInvalidOperationMsg(SMsgBuf* pMsgBuf, const char* msg);
|
int32_t buildInvalidOperationMsg(SMsgBuf* pMsgBuf, const char* msg);
|
||||||
int32_t buildSyntaxErrMsg(char* dst, int32_t dstBufLen, const char* additionalInfo, const char* sourceStr);
|
int32_t buildSyntaxErrMsg(SMsgBuf* pBuf, const char* additionalInfo, const char* sourceStr);
|
||||||
|
|
||||||
STableMetaInfo* addEmptyMetaInfo(SQueryStmtInfo* pQueryInfo);
|
STableMetaInfo* addEmptyMetaInfo(SQueryStmtInfo* pQueryInfo);
|
||||||
|
|
||||||
|
@ -61,6 +61,8 @@ void cleanupColumnCond(SArray** pCond);
|
||||||
uint32_t convertRelationalOperator(SToken *pToken);
|
uint32_t convertRelationalOperator(SToken *pToken);
|
||||||
int32_t getExprFunctionId(SExprInfo *pExprInfo);
|
int32_t getExprFunctionId(SExprInfo *pExprInfo);
|
||||||
|
|
||||||
|
STableMeta* tableMetaDup(const STableMeta* pTableMeta);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -44,7 +44,7 @@ typedef struct SToken {
|
||||||
* @param tokenType
|
* @param tokenType
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
uint32_t tGetToken(char *z, uint32_t *tokenType);
|
uint32_t tGetToken(const char *z, uint32_t *tokenType);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* enhanced tokenizer for sql string.
|
* enhanced tokenizer for sql string.
|
||||||
|
@ -54,7 +54,7 @@ uint32_t tGetToken(char *z, uint32_t *tokenType);
|
||||||
* @param isPrevOptr
|
* @param isPrevOptr
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
SToken tStrGetToken(char *str, int32_t *i, bool isPrevOptr);
|
SToken tStrGetToken(const char *str, int32_t *i, bool isPrevOptr);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* check if it is a keyword or not
|
* check if it is a keyword or not
|
||||||
|
|
|
@ -191,7 +191,7 @@ tSqlExpr *tSqlExprCreate(tSqlExpr *pLeft, tSqlExpr *pRight, int32_t optrType) {
|
||||||
pExpr->type = SQL_NODE_EXPR;
|
pExpr->type = SQL_NODE_EXPR;
|
||||||
|
|
||||||
if (pLeft != NULL && pRight != NULL && (optrType != TK_IN)) {
|
if (pLeft != NULL && pRight != NULL && (optrType != TK_IN)) {
|
||||||
char* endPos = pRight->exprToken.z + pRight->exprToken.n;
|
const char* endPos = pRight->exprToken.z + pRight->exprToken.n;
|
||||||
pExpr->exprToken.z = pLeft->exprToken.z;
|
pExpr->exprToken.z = pLeft->exprToken.z;
|
||||||
pExpr->exprToken.n = (uint32_t)(endPos - pExpr->exprToken.z);
|
pExpr->exprToken.n = (uint32_t)(endPos - pExpr->exprToken.z);
|
||||||
pExpr->exprToken.type = pLeft->exprToken.type;
|
pExpr->exprToken.type = pLeft->exprToken.type;
|
||||||
|
|
|
@ -0,0 +1,685 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "dataBlockMgt.h"
|
||||||
|
|
||||||
|
// #include "astGenerator.h"
|
||||||
|
// #include "parserInt.h"
|
||||||
|
#include "catalog.h"
|
||||||
|
#include "parserUtil.h"
|
||||||
|
#include "queryInfoUtil.h"
|
||||||
|
// #include "ttoken.h"
|
||||||
|
// #include "function.h"
|
||||||
|
// #include "ttime.h"
|
||||||
|
// #include "tglobal.h"
|
||||||
|
#include "taosmsg.h"
|
||||||
|
|
||||||
|
#define IS_RAW_PAYLOAD(t) \
|
||||||
|
(((int)(t)) == PAYLOAD_TYPE_RAW) // 0: K-V payload for non-prepare insert, 1: rawPayload for prepare insert
|
||||||
|
|
||||||
|
static int32_t rowDataCompar(const void *lhs, const void *rhs) {
|
||||||
|
TSKEY left = *(TSKEY *)lhs;
|
||||||
|
TSKEY right = *(TSKEY *)rhs;
|
||||||
|
|
||||||
|
if (left == right) {
|
||||||
|
return 0;
|
||||||
|
} else {
|
||||||
|
return left > right ? 1 : -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void setBoundColumnInfo(SParsedDataColInfo* pColList, SSchema* pSchema, int32_t numOfCols) {
|
||||||
|
pColList->numOfCols = numOfCols;
|
||||||
|
pColList->numOfBound = numOfCols;
|
||||||
|
pColList->orderStatus = ORDER_STATUS_ORDERED; // default is ORDERED for non-bound mode
|
||||||
|
pColList->boundedColumns = calloc(pColList->numOfCols, sizeof(int32_t));
|
||||||
|
pColList->cols = calloc(pColList->numOfCols, sizeof(SBoundColumn));
|
||||||
|
pColList->colIdxInfo = NULL;
|
||||||
|
pColList->flen = 0;
|
||||||
|
pColList->allNullLen = 0;
|
||||||
|
|
||||||
|
int32_t nVar = 0;
|
||||||
|
for (int32_t i = 0; i < pColList->numOfCols; ++i) {
|
||||||
|
uint8_t type = pSchema[i].type;
|
||||||
|
if (i > 0) {
|
||||||
|
pColList->cols[i].offset = pColList->cols[i - 1].offset + pSchema[i - 1].bytes;
|
||||||
|
pColList->cols[i].toffset = pColList->flen;
|
||||||
|
}
|
||||||
|
pColList->flen += TYPE_BYTES[type];
|
||||||
|
switch (type) {
|
||||||
|
case TSDB_DATA_TYPE_BINARY:
|
||||||
|
pColList->allNullLen += (VARSTR_HEADER_SIZE + CHAR_BYTES);
|
||||||
|
++nVar;
|
||||||
|
break;
|
||||||
|
case TSDB_DATA_TYPE_NCHAR:
|
||||||
|
pColList->allNullLen += (VARSTR_HEADER_SIZE + TSDB_NCHAR_SIZE);
|
||||||
|
++nVar;
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
pColList->boundedColumns[i] = pSchema[i].colId;
|
||||||
|
}
|
||||||
|
pColList->allNullLen += pColList->flen;
|
||||||
|
pColList->extendedVarLen = (uint16_t)(nVar * sizeof(VarDataOffsetT));
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t schemaIdxCompar(const void *lhs, const void *rhs) {
|
||||||
|
uint16_t left = *(uint16_t *)lhs;
|
||||||
|
uint16_t right = *(uint16_t *)rhs;
|
||||||
|
|
||||||
|
if (left == right) {
|
||||||
|
return 0;
|
||||||
|
} else {
|
||||||
|
return left > right ? 1 : -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t boundIdxCompar(const void *lhs, const void *rhs) {
|
||||||
|
uint16_t left = *(uint16_t *)POINTER_SHIFT(lhs, sizeof(uint16_t));
|
||||||
|
uint16_t right = *(uint16_t *)POINTER_SHIFT(rhs, sizeof(uint16_t));
|
||||||
|
|
||||||
|
if (left == right) {
|
||||||
|
return 0;
|
||||||
|
} else {
|
||||||
|
return left > right ? 1 : -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void destroyBoundColumnInfo(SParsedDataColInfo* pColList) {
|
||||||
|
tfree(pColList->boundedColumns);
|
||||||
|
tfree(pColList->cols);
|
||||||
|
tfree(pColList->colIdxInfo);
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t createDataBlock(size_t defaultSize, int32_t rowSize, int32_t startOffset, SName* name,
|
||||||
|
const STableMeta* pTableMeta, STableDataBlocks** dataBlocks) {
|
||||||
|
STableDataBlocks* dataBuf = (STableDataBlocks*)calloc(1, sizeof(STableDataBlocks));
|
||||||
|
if (dataBuf == NULL) {
|
||||||
|
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
|
dataBuf->nAllocSize = (uint32_t)defaultSize;
|
||||||
|
dataBuf->headerSize = startOffset;
|
||||||
|
|
||||||
|
// the header size will always be the startOffset value, reserved for the subumit block header
|
||||||
|
if (dataBuf->nAllocSize <= dataBuf->headerSize) {
|
||||||
|
dataBuf->nAllocSize = dataBuf->headerSize * 2;
|
||||||
|
}
|
||||||
|
|
||||||
|
//dataBuf->pData = calloc(1, dataBuf->nAllocSize);
|
||||||
|
dataBuf->pData = malloc(dataBuf->nAllocSize);
|
||||||
|
if (dataBuf->pData == NULL) {
|
||||||
|
tfree(dataBuf);
|
||||||
|
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
memset(dataBuf->pData, 0, sizeof(SSubmitBlk));
|
||||||
|
|
||||||
|
//Here we keep the tableMeta to avoid it to be remove by other threads.
|
||||||
|
dataBuf->pTableMeta = tableMetaDup(pTableMeta);
|
||||||
|
|
||||||
|
SParsedDataColInfo* pColInfo = &dataBuf->boundColumnInfo;
|
||||||
|
SSchema* pSchema = getTableColumnSchema(dataBuf->pTableMeta);
|
||||||
|
setBoundColumnInfo(pColInfo, pSchema, dataBuf->pTableMeta->tableInfo.numOfColumns);
|
||||||
|
|
||||||
|
dataBuf->ordered = true;
|
||||||
|
dataBuf->prevTS = INT64_MIN;
|
||||||
|
dataBuf->rowSize = rowSize;
|
||||||
|
dataBuf->size = startOffset;
|
||||||
|
dataBuf->tsSource = -1;
|
||||||
|
dataBuf->vgId = dataBuf->pTableMeta->vgId;
|
||||||
|
|
||||||
|
tNameAssign(&dataBuf->tableName, name);
|
||||||
|
|
||||||
|
assert(defaultSize > 0 && pTableMeta != NULL && dataBuf->pTableMeta != NULL);
|
||||||
|
|
||||||
|
*dataBlocks = dataBuf;
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t getDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, int32_t startOffset, int32_t rowSize,
|
||||||
|
SName* name, const STableMeta* pTableMeta, STableDataBlocks** dataBlocks,
|
||||||
|
SArray* pBlockList) {
|
||||||
|
*dataBlocks = NULL;
|
||||||
|
STableDataBlocks** t1 = (STableDataBlocks**)taosHashGet(pHashList, (const char*)&id, sizeof(id));
|
||||||
|
if (t1 != NULL) {
|
||||||
|
*dataBlocks = *t1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (*dataBlocks == NULL) {
|
||||||
|
int32_t ret = createDataBlock((size_t)size, rowSize, startOffset, name, pTableMeta, dataBlocks);
|
||||||
|
if (ret != TSDB_CODE_SUCCESS) {
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosHashPut(pHashList, (const char*)&id, sizeof(int64_t), (char*)dataBlocks, POINTER_BYTES);
|
||||||
|
if (pBlockList) {
|
||||||
|
taosArrayPush(pBlockList, dataBlocks);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t getRowExpandSize(STableMeta* pTableMeta) {
|
||||||
|
int32_t result = TD_MEM_ROW_DATA_HEAD_SIZE;
|
||||||
|
int32_t columns = getNumOfColumns(pTableMeta);
|
||||||
|
SSchema* pSchema = getTableColumnSchema(pTableMeta);
|
||||||
|
for (int32_t i = 0; i < columns; i++) {
|
||||||
|
if (IS_VAR_DATA_TYPE((pSchema + i)->type)) {
|
||||||
|
result += TYPE_BYTES[TSDB_DATA_TYPE_BINARY];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* TODO: Move to tdataformat.h and refactor when STSchema available.
|
||||||
|
* - fetch flen and toffset from STSChema and remove param spd
|
||||||
|
*/
|
||||||
|
static FORCE_INLINE void convertToSDataRow(SMemRow dest, SMemRow src, SSchema *pSchema, int nCols,
|
||||||
|
SParsedDataColInfo *spd) {
|
||||||
|
ASSERT(isKvRow(src));
|
||||||
|
SKVRow kvRow = memRowKvBody(src);
|
||||||
|
SDataRow dataRow = memRowDataBody(dest);
|
||||||
|
|
||||||
|
memRowSetType(dest, SMEM_ROW_DATA);
|
||||||
|
dataRowSetVersion(dataRow, memRowKvVersion(src));
|
||||||
|
dataRowSetLen(dataRow, (TDRowLenT)(TD_DATA_ROW_HEAD_SIZE + spd->flen));
|
||||||
|
|
||||||
|
int32_t kvIdx = 0;
|
||||||
|
for (int i = 0; i < nCols; ++i) {
|
||||||
|
SSchema *schema = pSchema + i;
|
||||||
|
void * val = tdGetKVRowValOfColEx(kvRow, schema->colId, &kvIdx);
|
||||||
|
tdAppendDataColVal(dataRow, val != NULL ? val : getNullValue(schema->type), true, schema->type,
|
||||||
|
(spd->cols + i)->toffset);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: Move to tdataformat.h and refactor when STSchema available.
|
||||||
|
static FORCE_INLINE void convertToSKVRow(SMemRow dest, SMemRow src, SSchema *pSchema, int nCols, int nBoundCols,
|
||||||
|
SParsedDataColInfo *spd) {
|
||||||
|
ASSERT(isDataRow(src));
|
||||||
|
|
||||||
|
SDataRow dataRow = memRowDataBody(src);
|
||||||
|
SKVRow kvRow = memRowKvBody(dest);
|
||||||
|
|
||||||
|
memRowSetType(dest, SMEM_ROW_KV);
|
||||||
|
memRowSetKvVersion(kvRow, dataRowVersion(dataRow));
|
||||||
|
kvRowSetNCols(kvRow, nBoundCols);
|
||||||
|
kvRowSetLen(kvRow, (TDRowLenT)(TD_KV_ROW_HEAD_SIZE + sizeof(SColIdx) * nBoundCols));
|
||||||
|
|
||||||
|
int32_t toffset = 0, kvOffset = 0;
|
||||||
|
for (int i = 0; i < nCols; ++i) {
|
||||||
|
if ((spd->cols + i)->valStat == VAL_STAT_HAS) {
|
||||||
|
SSchema *schema = pSchema + i;
|
||||||
|
toffset = (spd->cols + i)->toffset;
|
||||||
|
void *val = tdGetRowDataOfCol(dataRow, schema->type, toffset + TD_DATA_ROW_HEAD_SIZE);
|
||||||
|
tdAppendKvColVal(kvRow, val, true, schema->colId, schema->type, kvOffset);
|
||||||
|
kvOffset += sizeof(SColIdx);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: Move to tdataformat.h and refactor when STSchema available.
|
||||||
|
static FORCE_INLINE void convertSMemRow(SMemRow dest, SMemRow src, STableDataBlocks *pBlock) {
|
||||||
|
STableMeta * pTableMeta = pBlock->pTableMeta;
|
||||||
|
STableComInfo tinfo = getTableInfo(pTableMeta);
|
||||||
|
SSchema * pSchema = getTableColumnSchema(pTableMeta);
|
||||||
|
SParsedDataColInfo *spd = &pBlock->boundColumnInfo;
|
||||||
|
|
||||||
|
ASSERT(dest != src);
|
||||||
|
|
||||||
|
if (isDataRow(src)) {
|
||||||
|
// TODO: Can we use pBlock -> numOfParam directly?
|
||||||
|
ASSERT(spd->numOfBound > 0);
|
||||||
|
convertToSKVRow(dest, src, pSchema, tinfo.numOfColumns, spd->numOfBound, spd);
|
||||||
|
} else {
|
||||||
|
convertToSDataRow(dest, src, pSchema, tinfo.numOfColumns, spd);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void destroyDataBlock(STableDataBlocks* pDataBlock, bool removeMeta) {
|
||||||
|
if (pDataBlock == NULL) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
tfree(pDataBlock->pData);
|
||||||
|
|
||||||
|
if (removeMeta) {
|
||||||
|
char name[TSDB_TABLE_FNAME_LEN] = {0};
|
||||||
|
tNameExtractFullName(&pDataBlock->tableName, name);
|
||||||
|
|
||||||
|
// taosHashRemove(tscTableMetaMap, name, strnlen(name, TSDB_TABLE_FNAME_LEN));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!pDataBlock->cloned) {
|
||||||
|
tfree(pDataBlock->params);
|
||||||
|
|
||||||
|
// free the refcount for metermeta
|
||||||
|
if (pDataBlock->pTableMeta != NULL) {
|
||||||
|
tfree(pDataBlock->pTableMeta);
|
||||||
|
}
|
||||||
|
|
||||||
|
destroyBoundColumnInfo(&pDataBlock->boundColumnInfo);
|
||||||
|
}
|
||||||
|
|
||||||
|
tfree(pDataBlock);
|
||||||
|
}
|
||||||
|
|
||||||
|
void* destroyBlockArrayList(SArray* pDataBlockList) {
|
||||||
|
if (pDataBlockList == NULL) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t size = taosArrayGetSize(pDataBlockList);
|
||||||
|
for (int32_t i = 0; i < size; i++) {
|
||||||
|
void* d = taosArrayGetP(pDataBlockList, i);
|
||||||
|
destroyDataBlock(d, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
taosArrayDestroy(pDataBlockList);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
// data block is disordered, sort it in ascending order
|
||||||
|
void sortRemoveDataBlockDupRowsRaw(STableDataBlocks *dataBuf) {
|
||||||
|
SSubmitBlk *pBlocks = (SSubmitBlk *)dataBuf->pData;
|
||||||
|
|
||||||
|
// size is less than the total size, since duplicated rows may be removed yet.
|
||||||
|
assert(pBlocks->numOfRows * dataBuf->rowSize + sizeof(SSubmitBlk) == dataBuf->size);
|
||||||
|
|
||||||
|
if (!dataBuf->ordered) {
|
||||||
|
char *pBlockData = pBlocks->data;
|
||||||
|
qsort(pBlockData, pBlocks->numOfRows, dataBuf->rowSize, rowDataCompar);
|
||||||
|
|
||||||
|
int32_t i = 0;
|
||||||
|
int32_t j = 1;
|
||||||
|
|
||||||
|
while (j < pBlocks->numOfRows) {
|
||||||
|
TSKEY ti = *(TSKEY *)(pBlockData + dataBuf->rowSize * i);
|
||||||
|
TSKEY tj = *(TSKEY *)(pBlockData + dataBuf->rowSize * j);
|
||||||
|
|
||||||
|
if (ti == tj) {
|
||||||
|
++j;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t nextPos = (++i);
|
||||||
|
if (nextPos != j) {
|
||||||
|
memmove(pBlockData + dataBuf->rowSize * nextPos, pBlockData + dataBuf->rowSize * j, dataBuf->rowSize);
|
||||||
|
}
|
||||||
|
|
||||||
|
++j;
|
||||||
|
}
|
||||||
|
|
||||||
|
dataBuf->ordered = true;
|
||||||
|
|
||||||
|
pBlocks->numOfRows = i + 1;
|
||||||
|
dataBuf->size = sizeof(SSubmitBlk) + dataBuf->rowSize * pBlocks->numOfRows;
|
||||||
|
}
|
||||||
|
|
||||||
|
dataBuf->prevTS = INT64_MIN;
|
||||||
|
}
|
||||||
|
|
||||||
|
// data block is disordered, sort it in ascending order
|
||||||
|
int sortRemoveDataBlockDupRows(STableDataBlocks *dataBuf, SBlockKeyInfo *pBlkKeyInfo) {
|
||||||
|
SSubmitBlk *pBlocks = (SSubmitBlk *)dataBuf->pData;
|
||||||
|
int16_t nRows = pBlocks->numOfRows;
|
||||||
|
|
||||||
|
// size is less than the total size, since duplicated rows may be removed yet.
|
||||||
|
|
||||||
|
// allocate memory
|
||||||
|
size_t nAlloc = nRows * sizeof(SBlockKeyTuple);
|
||||||
|
if (pBlkKeyInfo->pKeyTuple == NULL || pBlkKeyInfo->maxBytesAlloc < nAlloc) {
|
||||||
|
char *tmp = realloc(pBlkKeyInfo->pKeyTuple, nAlloc);
|
||||||
|
if (tmp == NULL) {
|
||||||
|
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
pBlkKeyInfo->pKeyTuple = (SBlockKeyTuple *)tmp;
|
||||||
|
pBlkKeyInfo->maxBytesAlloc = (int32_t)nAlloc;
|
||||||
|
}
|
||||||
|
memset(pBlkKeyInfo->pKeyTuple, 0, nAlloc);
|
||||||
|
|
||||||
|
int32_t extendedRowSize = getExtendedRowSize(dataBuf);
|
||||||
|
SBlockKeyTuple *pBlkKeyTuple = pBlkKeyInfo->pKeyTuple;
|
||||||
|
char * pBlockData = pBlocks->data;
|
||||||
|
int n = 0;
|
||||||
|
while (n < nRows) {
|
||||||
|
pBlkKeyTuple->skey = memRowKey(pBlockData);
|
||||||
|
pBlkKeyTuple->payloadAddr = pBlockData;
|
||||||
|
|
||||||
|
// next loop
|
||||||
|
pBlockData += extendedRowSize;
|
||||||
|
++pBlkKeyTuple;
|
||||||
|
++n;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!dataBuf->ordered) {
|
||||||
|
pBlkKeyTuple = pBlkKeyInfo->pKeyTuple;
|
||||||
|
qsort(pBlkKeyTuple, nRows, sizeof(SBlockKeyTuple), rowDataCompar);
|
||||||
|
|
||||||
|
pBlkKeyTuple = pBlkKeyInfo->pKeyTuple;
|
||||||
|
int32_t i = 0;
|
||||||
|
int32_t j = 1;
|
||||||
|
while (j < nRows) {
|
||||||
|
TSKEY ti = (pBlkKeyTuple + i)->skey;
|
||||||
|
TSKEY tj = (pBlkKeyTuple + j)->skey;
|
||||||
|
|
||||||
|
if (ti == tj) {
|
||||||
|
++j;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t nextPos = (++i);
|
||||||
|
if (nextPos != j) {
|
||||||
|
memmove(pBlkKeyTuple + nextPos, pBlkKeyTuple + j, sizeof(SBlockKeyTuple));
|
||||||
|
}
|
||||||
|
++j;
|
||||||
|
}
|
||||||
|
|
||||||
|
dataBuf->ordered = true;
|
||||||
|
pBlocks->numOfRows = i + 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
dataBuf->size = sizeof(SSubmitBlk) + pBlocks->numOfRows * extendedRowSize;
|
||||||
|
dataBuf->prevTS = INT64_MIN;
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Erase the empty space reserved for binary data
|
||||||
|
static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, SBlockKeyTuple* blkKeyTuple, int8_t schemaAttached, bool isRawPayload) {
|
||||||
|
// TODO: optimize this function, handle the case while binary is not presented
|
||||||
|
STableMeta* pTableMeta = pTableDataBlock->pTableMeta;
|
||||||
|
STableComInfo tinfo = getTableInfo(pTableMeta);
|
||||||
|
SSchema* pSchema = getTableColumnSchema(pTableMeta);
|
||||||
|
|
||||||
|
SSubmitBlk* pBlock = pDataBlock;
|
||||||
|
memcpy(pDataBlock, pTableDataBlock->pData, sizeof(SSubmitBlk));
|
||||||
|
pDataBlock = (char*)pDataBlock + sizeof(SSubmitBlk);
|
||||||
|
|
||||||
|
int32_t flen = 0; // original total length of row
|
||||||
|
|
||||||
|
// schema needs to be included into the submit data block
|
||||||
|
if (schemaAttached) {
|
||||||
|
int32_t numOfCols = getNumOfColumns(pTableDataBlock->pTableMeta);
|
||||||
|
for(int32_t j = 0; j < numOfCols; ++j) {
|
||||||
|
STColumn* pCol = (STColumn*) pDataBlock;
|
||||||
|
pCol->colId = htons(pSchema[j].colId);
|
||||||
|
pCol->type = pSchema[j].type;
|
||||||
|
pCol->bytes = htons(pSchema[j].bytes);
|
||||||
|
pCol->offset = 0;
|
||||||
|
|
||||||
|
pDataBlock = (char*)pDataBlock + sizeof(STColumn);
|
||||||
|
flen += TYPE_BYTES[pSchema[j].type];
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t schemaSize = sizeof(STColumn) * numOfCols;
|
||||||
|
pBlock->schemaLen = schemaSize;
|
||||||
|
} else {
|
||||||
|
if (isRawPayload) {
|
||||||
|
for (int32_t j = 0; j < tinfo.numOfColumns; ++j) {
|
||||||
|
flen += TYPE_BYTES[pSchema[j].type];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pBlock->schemaLen = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
char* p = pTableDataBlock->pData + sizeof(SSubmitBlk);
|
||||||
|
pBlock->dataLen = 0;
|
||||||
|
int32_t numOfRows = htons(pBlock->numOfRows);
|
||||||
|
|
||||||
|
if (isRawPayload) {
|
||||||
|
for (int32_t i = 0; i < numOfRows; ++i) {
|
||||||
|
SMemRow memRow = (SMemRow)pDataBlock;
|
||||||
|
memRowSetType(memRow, SMEM_ROW_DATA);
|
||||||
|
SDataRow trow = memRowDataBody(memRow);
|
||||||
|
dataRowSetLen(trow, (uint16_t)(TD_DATA_ROW_HEAD_SIZE + flen));
|
||||||
|
dataRowSetVersion(trow, pTableMeta->sversion);
|
||||||
|
|
||||||
|
int toffset = 0;
|
||||||
|
for (int32_t j = 0; j < tinfo.numOfColumns; j++) {
|
||||||
|
tdAppendColVal(trow, p, pSchema[j].type, toffset);
|
||||||
|
toffset += TYPE_BYTES[pSchema[j].type];
|
||||||
|
p += pSchema[j].bytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
pDataBlock = (char*)pDataBlock + memRowTLen(memRow);
|
||||||
|
pBlock->dataLen += memRowTLen(memRow);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
for (int32_t i = 0; i < numOfRows; ++i) {
|
||||||
|
char* payload = (blkKeyTuple + i)->payloadAddr;
|
||||||
|
if (isNeedConvertRow(payload)) {
|
||||||
|
convertSMemRow(pDataBlock, payload, pTableDataBlock);
|
||||||
|
TDRowTLenT rowTLen = memRowTLen(pDataBlock);
|
||||||
|
pDataBlock = POINTER_SHIFT(pDataBlock, rowTLen);
|
||||||
|
pBlock->dataLen += rowTLen;
|
||||||
|
} else {
|
||||||
|
TDRowTLenT rowTLen = memRowTLen(payload);
|
||||||
|
memcpy(pDataBlock, payload, rowTLen);
|
||||||
|
pDataBlock = POINTER_SHIFT(pDataBlock, rowTLen);
|
||||||
|
pBlock->dataLen += rowTLen;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t len = pBlock->dataLen + pBlock->schemaLen;
|
||||||
|
pBlock->dataLen = htonl(pBlock->dataLen);
|
||||||
|
pBlock->schemaLen = htonl(pBlock->schemaLen);
|
||||||
|
|
||||||
|
return len;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void extractTableNameList(SHashObj* pHashObj, bool freeBlockMap) {
|
||||||
|
// pInsertParam->numOfTables = (int32_t) taosHashGetSize(pInsertParam->pTableBlockHashList);
|
||||||
|
// if (pInsertParam->pTableNameList == NULL) {
|
||||||
|
// pInsertParam->pTableNameList = malloc(pInsertParam->numOfTables * POINTER_BYTES);
|
||||||
|
// }
|
||||||
|
|
||||||
|
// STableDataBlocks **p1 = taosHashIterate(pInsertParam->pTableBlockHashList, NULL);
|
||||||
|
// int32_t i = 0;
|
||||||
|
// while(p1) {
|
||||||
|
// STableDataBlocks* pBlocks = *p1;
|
||||||
|
// pInsertParam->pTableNameList[i++] = tNameDup(&pBlocks->tableName);
|
||||||
|
// p1 = taosHashIterate(pInsertParam->pTableBlockHashList, p1);
|
||||||
|
// }
|
||||||
|
|
||||||
|
// if (freeBlockMap) {
|
||||||
|
// pInsertParam->pTableBlockHashList = tscDestroyBlockHashTable(pInsertParam->pTableBlockHashList, false);
|
||||||
|
// }
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t mergeTableDataBlocks(SHashObj* pHashObj, int8_t schemaAttached, uint8_t payloadType, bool freeBlockMap) {
|
||||||
|
const int INSERT_HEAD_SIZE = sizeof(SMsgDesc) + sizeof(SSubmitMsg);
|
||||||
|
int code = 0;
|
||||||
|
bool isRawPayload = IS_RAW_PAYLOAD(payloadType);
|
||||||
|
void* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
|
||||||
|
SArray* pVnodeDataBlockList = taosArrayInit(8, POINTER_BYTES);
|
||||||
|
|
||||||
|
STableDataBlocks** p = taosHashIterate(pHashObj, NULL);
|
||||||
|
|
||||||
|
STableDataBlocks* pOneTableBlock = *p;
|
||||||
|
|
||||||
|
SBlockKeyInfo blkKeyInfo = {0}; // share by pOneTableBlock
|
||||||
|
|
||||||
|
while(pOneTableBlock) {
|
||||||
|
SSubmitBlk* pBlocks = (SSubmitBlk*) pOneTableBlock->pData;
|
||||||
|
if (pBlocks->numOfRows > 0) {
|
||||||
|
// the maximum expanded size in byte when a row-wise data is converted to SDataRow format
|
||||||
|
int32_t expandSize = isRawPayload ? getRowExpandSize(pOneTableBlock->pTableMeta) : 0;
|
||||||
|
STableDataBlocks* dataBuf = NULL;
|
||||||
|
|
||||||
|
int32_t ret = getDataBlockFromList(pVnodeDataBlockHashList, pOneTableBlock->vgId, TSDB_PAYLOAD_SIZE,
|
||||||
|
INSERT_HEAD_SIZE, 0, &pOneTableBlock->tableName, pOneTableBlock->pTableMeta, &dataBuf, pVnodeDataBlockList);
|
||||||
|
if (ret != TSDB_CODE_SUCCESS) {
|
||||||
|
taosHashCleanup(pVnodeDataBlockHashList);
|
||||||
|
destroyBlockArrayList(pVnodeDataBlockList);
|
||||||
|
tfree(blkKeyInfo.pKeyTuple);
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t destSize = dataBuf->size + pOneTableBlock->size + pBlocks->numOfRows * expandSize +
|
||||||
|
sizeof(STColumn) * getNumOfColumns(pOneTableBlock->pTableMeta);
|
||||||
|
|
||||||
|
if (dataBuf->nAllocSize < destSize) {
|
||||||
|
dataBuf->nAllocSize = (uint32_t)(destSize * 1.5);
|
||||||
|
|
||||||
|
char* tmp = realloc(dataBuf->pData, dataBuf->nAllocSize);
|
||||||
|
if (tmp != NULL) {
|
||||||
|
dataBuf->pData = tmp;
|
||||||
|
//memset(dataBuf->pData + dataBuf->size, 0, dataBuf->nAllocSize - dataBuf->size);
|
||||||
|
} else { // failed to allocate memory, free already allocated memory and return error code
|
||||||
|
taosHashCleanup(pVnodeDataBlockHashList);
|
||||||
|
destroyBlockArrayList(pVnodeDataBlockList);
|
||||||
|
tfree(dataBuf->pData);
|
||||||
|
tfree(blkKeyInfo.pKeyTuple);
|
||||||
|
|
||||||
|
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (isRawPayload) {
|
||||||
|
sortRemoveDataBlockDupRowsRaw(pOneTableBlock);
|
||||||
|
} else {
|
||||||
|
if ((code = sortRemoveDataBlockDupRows(pOneTableBlock, &blkKeyInfo)) != 0) {
|
||||||
|
taosHashCleanup(pVnodeDataBlockHashList);
|
||||||
|
destroyBlockArrayList(pVnodeDataBlockList);
|
||||||
|
tfree(dataBuf->pData);
|
||||||
|
tfree(blkKeyInfo.pKeyTuple);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
ASSERT(blkKeyInfo.pKeyTuple != NULL && pBlocks->numOfRows > 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t len = pBlocks->numOfRows *
|
||||||
|
(isRawPayload ? (pOneTableBlock->rowSize + expandSize) : getExtendedRowSize(pOneTableBlock)) +
|
||||||
|
sizeof(STColumn) * getNumOfColumns(pOneTableBlock->pTableMeta);
|
||||||
|
|
||||||
|
pBlocks->tid = htonl(pBlocks->tid);
|
||||||
|
pBlocks->uid = htobe64(pBlocks->uid);
|
||||||
|
pBlocks->sversion = htonl(pBlocks->sversion);
|
||||||
|
pBlocks->numOfRows = htons(pBlocks->numOfRows);
|
||||||
|
pBlocks->schemaLen = 0;
|
||||||
|
|
||||||
|
// erase the empty space reserved for binary data
|
||||||
|
int32_t finalLen = trimDataBlock(dataBuf->pData + dataBuf->size, pOneTableBlock, blkKeyInfo.pKeyTuple, schemaAttached, isRawPayload);
|
||||||
|
assert(finalLen <= len);
|
||||||
|
|
||||||
|
dataBuf->size += (finalLen + sizeof(SSubmitBlk));
|
||||||
|
assert(dataBuf->size <= dataBuf->nAllocSize);
|
||||||
|
|
||||||
|
// the length does not include the SSubmitBlk structure
|
||||||
|
pBlocks->dataLen = htonl(finalLen);
|
||||||
|
dataBuf->numOfTables += 1;
|
||||||
|
|
||||||
|
pBlocks->numOfRows = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
p = taosHashIterate(pHashObj, p);
|
||||||
|
if (p == NULL) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
pOneTableBlock = *p;
|
||||||
|
}
|
||||||
|
|
||||||
|
extractTableNameList(pHashObj, freeBlockMap);
|
||||||
|
|
||||||
|
// free the table data blocks;
|
||||||
|
// pInsertParam->pDataBlocks = pVnodeDataBlockList;
|
||||||
|
taosHashCleanup(pVnodeDataBlockHashList);
|
||||||
|
tfree(blkKeyInfo.pKeyTuple);
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t allocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int32_t * numOfRows) {
|
||||||
|
size_t remain = pDataBlock->nAllocSize - pDataBlock->size;
|
||||||
|
const int factor = 5;
|
||||||
|
uint32_t nAllocSizeOld = pDataBlock->nAllocSize;
|
||||||
|
|
||||||
|
// expand the allocated size
|
||||||
|
if (remain < rowSize * factor) {
|
||||||
|
while (remain < rowSize * factor) {
|
||||||
|
pDataBlock->nAllocSize = (uint32_t)(pDataBlock->nAllocSize * 1.5);
|
||||||
|
remain = pDataBlock->nAllocSize - pDataBlock->size;
|
||||||
|
}
|
||||||
|
|
||||||
|
char *tmp = realloc(pDataBlock->pData, (size_t)pDataBlock->nAllocSize);
|
||||||
|
if (tmp != NULL) {
|
||||||
|
pDataBlock->pData = tmp;
|
||||||
|
memset(pDataBlock->pData + pDataBlock->size, 0, pDataBlock->nAllocSize - pDataBlock->size);
|
||||||
|
} else {
|
||||||
|
// do nothing, if allocate more memory failed
|
||||||
|
pDataBlock->nAllocSize = nAllocSizeOld;
|
||||||
|
*numOfRows = (int32_t)(pDataBlock->nAllocSize - pDataBlock->headerSize) / rowSize;
|
||||||
|
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
*numOfRows = (int32_t)(pDataBlock->nAllocSize - pDataBlock->headerSize) / rowSize;
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t initMemRowBuilder(SMemRowBuilder *pBuilder, uint32_t nRows, uint32_t nCols, uint32_t nBoundCols, int32_t allNullLen) {
|
||||||
|
ASSERT(nRows >= 0 && nCols > 0 && (nBoundCols <= nCols));
|
||||||
|
if (nRows > 0) {
|
||||||
|
// already init(bind multiple rows by single column)
|
||||||
|
if (pBuilder->compareStat == ROW_COMPARE_NEED && (pBuilder->rowInfo != NULL)) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// default compareStat is ROW_COMPARE_NO_NEED
|
||||||
|
if (nBoundCols == 0) { // file input
|
||||||
|
pBuilder->memRowType = SMEM_ROW_DATA;
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
} else {
|
||||||
|
float boundRatio = ((float)nBoundCols / (float)nCols);
|
||||||
|
|
||||||
|
if (boundRatio < KVRatioKV) {
|
||||||
|
pBuilder->memRowType = SMEM_ROW_KV;
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
} else if (boundRatio > KVRatioData) {
|
||||||
|
pBuilder->memRowType = SMEM_ROW_DATA;
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
pBuilder->compareStat = ROW_COMPARE_NEED;
|
||||||
|
|
||||||
|
if (boundRatio < KVRatioPredict) {
|
||||||
|
pBuilder->memRowType = SMEM_ROW_KV;
|
||||||
|
} else {
|
||||||
|
pBuilder->memRowType = SMEM_ROW_DATA;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pBuilder->kvRowInitLen = TD_MEM_ROW_KV_HEAD_SIZE + nBoundCols * sizeof(SColIdx);
|
||||||
|
|
||||||
|
if (nRows > 0) {
|
||||||
|
pBuilder->rowInfo = calloc(nRows, sizeof(SMemRowInfo));
|
||||||
|
if (pBuilder->rowInfo == NULL) {
|
||||||
|
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = 0; i < nRows; ++i) {
|
||||||
|
(pBuilder->rowInfo + i)->dataLen = TD_MEM_ROW_DATA_HEAD_SIZE + allNullLen;
|
||||||
|
(pBuilder->rowInfo + i)->kvLen = pBuilder->kvRowInitLen;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
File diff suppressed because it is too large
Load Diff
|
@ -18,6 +18,7 @@
|
||||||
#include "parserUtil.h"
|
#include "parserUtil.h"
|
||||||
#include "ttoken.h"
|
#include "ttoken.h"
|
||||||
#include "function.h"
|
#include "function.h"
|
||||||
|
#include "insertParser.h"
|
||||||
|
|
||||||
bool qIsInsertSql(const char* pStr, size_t length) {
|
bool qIsInsertSql(const char* pStr, size_t length) {
|
||||||
int32_t index = 0;
|
int32_t index = 0;
|
||||||
|
@ -46,8 +47,8 @@ int32_t qParseQuerySql(const char* pStr, size_t length, struct SQueryStmtInfo**
|
||||||
return qParserValidateSqlNode(pCatalog, &info, *pQueryInfo, id, msg, msgLen);
|
return qParserValidateSqlNode(pCatalog, &info, *pQueryInfo, id, msg, msgLen);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t qParseInsertSql(const char* pStr, size_t length, struct SInsertStmtInfo** pInsertInfo, int64_t id, char* msg, int32_t msgLen) {
|
int32_t qParseInsertSql(SParseContext* pContext, SInsertStmtInfo** pInfo) {
|
||||||
return 0;
|
return parseInsertSql(pContext, pInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t qParserConvertSql(const char* pStr, size_t length, char** pConvertSql) {
|
int32_t qParserConvertSql(const char* pStr, size_t length, char** pConvertSql) {
|
||||||
|
@ -173,7 +174,7 @@ int32_t qParserExtractRequestedMetaInfo(const SSqlInfo* pSqlInfo, SMetaReq* pMet
|
||||||
assert(t != NULL);
|
assert(t != NULL);
|
||||||
|
|
||||||
if (t->n >= TSDB_FUNC_NAME_LEN) {
|
if (t->n >= TSDB_FUNC_NAME_LEN) {
|
||||||
return buildSyntaxErrMsg(msg, msgBufLen, "too long function name", t->z);
|
return buildSyntaxErrMsg(&msgBuf, "too long function name", t->z);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Let's assume that it is an UDF/UDAF, if it is not a built-in function.
|
// Let's assume that it is an UDF/UDAF, if it is not a built-in function.
|
||||||
|
|
|
@ -1,3 +1,19 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
#include "parserUtil.h"
|
||||||
|
|
||||||
#include "taosmsg.h"
|
#include "taosmsg.h"
|
||||||
#include "parser.h"
|
#include "parser.h"
|
||||||
#include "taoserror.h"
|
#include "taoserror.h"
|
||||||
|
@ -18,7 +34,6 @@ typedef struct STableFilterCond {
|
||||||
|
|
||||||
static STableMetaInfo* addTableMetaInfo(SQueryStmtInfo* pQueryInfo, SName* name, STableMeta* pTableMeta,
|
static STableMetaInfo* addTableMetaInfo(SQueryStmtInfo* pQueryInfo, SName* name, STableMeta* pTableMeta,
|
||||||
SVgroupsInfo* vgroupList, SArray* pTagCols, SArray* pVgroupTables);
|
SVgroupsInfo* vgroupList, SArray* pTagCols, SArray* pVgroupTables);
|
||||||
STableMeta* tableMetaDup(STableMeta* pTableMeta);
|
|
||||||
|
|
||||||
int32_t parserValidateIdToken(SToken* pToken) {
|
int32_t parserValidateIdToken(SToken* pToken) {
|
||||||
if (pToken == NULL || pToken->z == NULL || pToken->type != TK_ID) {
|
if (pToken == NULL || pToken->z == NULL || pToken->type != TK_ID) {
|
||||||
|
@ -87,7 +102,7 @@ int32_t buildInvalidOperationMsg(SMsgBuf* pBuf, const char* msg) {
|
||||||
return TSDB_CODE_TSC_INVALID_OPERATION;
|
return TSDB_CODE_TSC_INVALID_OPERATION;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t buildSyntaxErrMsg(char* dst, int32_t dstBufLen, const char* additionalInfo, const char* sourceStr) {
|
int32_t buildSyntaxErrMsg(SMsgBuf* pBuf, const char* additionalInfo, const char* sourceStr) {
|
||||||
const char* msgFormat1 = "syntax error near \'%s\'";
|
const char* msgFormat1 = "syntax error near \'%s\'";
|
||||||
const char* msgFormat2 = "syntax error near \'%s\' (%s)";
|
const char* msgFormat2 = "syntax error near \'%s\' (%s)";
|
||||||
const char* msgFormat3 = "%s";
|
const char* msgFormat3 = "%s";
|
||||||
|
@ -95,7 +110,7 @@ int32_t buildSyntaxErrMsg(char* dst, int32_t dstBufLen, const char* additionalIn
|
||||||
const char* prefix = "syntax error";
|
const char* prefix = "syntax error";
|
||||||
if (sourceStr == NULL) {
|
if (sourceStr == NULL) {
|
||||||
assert(additionalInfo != NULL);
|
assert(additionalInfo != NULL);
|
||||||
snprintf(dst, dstBufLen, msgFormat1, additionalInfo);
|
snprintf(pBuf->buf, pBuf->len, msgFormat1, additionalInfo);
|
||||||
return TSDB_CODE_TSC_SQL_SYNTAX_ERROR;
|
return TSDB_CODE_TSC_SQL_SYNTAX_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -103,10 +118,10 @@ int32_t buildSyntaxErrMsg(char* dst, int32_t dstBufLen, const char* additionalIn
|
||||||
strncpy(buf, sourceStr, tListLen(buf) - 1);
|
strncpy(buf, sourceStr, tListLen(buf) - 1);
|
||||||
|
|
||||||
if (additionalInfo != NULL) {
|
if (additionalInfo != NULL) {
|
||||||
snprintf(dst, dstBufLen, msgFormat2, buf, additionalInfo);
|
snprintf(pBuf->buf, pBuf->len, msgFormat2, buf, additionalInfo);
|
||||||
} else {
|
} else {
|
||||||
const char* msgFormat = (0 == strncmp(sourceStr, prefix, strlen(prefix))) ? msgFormat3 : msgFormat1;
|
const char* msgFormat = (0 == strncmp(sourceStr, prefix, strlen(prefix))) ? msgFormat3 : msgFormat1;
|
||||||
snprintf(dst, dstBufLen, msgFormat, buf);
|
snprintf(pBuf->buf, pBuf->len, msgFormat, buf);
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_TSC_SQL_SYNTAX_ERROR;
|
return TSDB_CODE_TSC_SQL_SYNTAX_ERROR;
|
||||||
|
@ -1490,7 +1505,7 @@ STableMeta* createSuperTableMeta(STableMetaMsg* pChild) {
|
||||||
return pTableMeta;
|
return pTableMeta;
|
||||||
}
|
}
|
||||||
|
|
||||||
uint32_t getTableMetaSize(STableMeta* pTableMeta) {
|
uint32_t getTableMetaSize(const STableMeta* pTableMeta) {
|
||||||
assert(pTableMeta != NULL);
|
assert(pTableMeta != NULL);
|
||||||
|
|
||||||
int32_t totalCols = 0;
|
int32_t totalCols = 0;
|
||||||
|
@ -1505,7 +1520,7 @@ uint32_t getTableMetaMaxSize() {
|
||||||
return sizeof(STableMeta) + TSDB_MAX_COLUMNS * sizeof(SSchema);
|
return sizeof(STableMeta) + TSDB_MAX_COLUMNS * sizeof(SSchema);
|
||||||
}
|
}
|
||||||
|
|
||||||
STableMeta* tableMetaDup(STableMeta* pTableMeta) {
|
STableMeta* tableMetaDup(const STableMeta* pTableMeta) {
|
||||||
assert(pTableMeta != NULL);
|
assert(pTableMeta != NULL);
|
||||||
size_t size = getTableMetaSize(pTableMeta);
|
size_t size = getTableMetaSize(pTableMeta);
|
||||||
|
|
||||||
|
|
|
@ -284,7 +284,7 @@ static int32_t tKeywordCode(const char* z, int n) {
|
||||||
* Return the length of the token that begins at z[0].
|
* Return the length of the token that begins at z[0].
|
||||||
* Store the token type in *type before returning.
|
* Store the token type in *type before returning.
|
||||||
*/
|
*/
|
||||||
uint32_t tGetToken(char* z, uint32_t* tokenId) {
|
uint32_t tGetToken(const char* z, uint32_t* tokenId) {
|
||||||
uint32_t i;
|
uint32_t i;
|
||||||
switch (*z) {
|
switch (*z) {
|
||||||
case ' ':
|
case ' ':
|
||||||
|
@ -595,7 +595,7 @@ SToken tscReplaceStrToken(char **str, SToken *token, const char* newToken) {
|
||||||
return ntoken;
|
return ntoken;
|
||||||
}
|
}
|
||||||
|
|
||||||
SToken tStrGetToken(char* str, int32_t* i, bool isPrevOptr) {
|
SToken tStrGetToken(const char* str, int32_t* i, bool isPrevOptr) {
|
||||||
SToken t0 = {0};
|
SToken t0 = {0};
|
||||||
|
|
||||||
// here we reach the end of sql string, null-terminated string
|
// here we reach the end of sql string, null-terminated string
|
||||||
|
@ -689,13 +689,12 @@ void taosCleanupKeywordsTable() {
|
||||||
}
|
}
|
||||||
|
|
||||||
SToken taosTokenDup(SToken* pToken, char* buf, int32_t len) {
|
SToken taosTokenDup(SToken* pToken, char* buf, int32_t len) {
|
||||||
assert(pToken != NULL && buf != NULL);
|
assert(pToken != NULL && buf != NULL && len > pToken->n);
|
||||||
|
|
||||||
|
strncpy(buf, pToken->z, pToken->n);
|
||||||
|
buf[pToken->n] = 0;
|
||||||
|
|
||||||
SToken token = *pToken;
|
SToken token = *pToken;
|
||||||
token.z = buf;
|
token.z = buf;
|
||||||
|
|
||||||
assert(len > token.n);
|
|
||||||
strncpy(token.z, pToken->z, pToken->n);
|
|
||||||
token.z[token.n] = 0;
|
|
||||||
|
|
||||||
return token;
|
return token;
|
||||||
}
|
}
|
||||||
|
|
|
@ -37,7 +37,7 @@ char *taosCharsetReplace(char *charsetstr) {
|
||||||
return strdup(charsetstr);
|
return strdup(charsetstr);
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t taosStr2int64(char *str) {
|
int64_t taosStr2int64(const char *str) {
|
||||||
char *endptr = NULL;
|
char *endptr = NULL;
|
||||||
return strtoll(str, &endptr, 10);
|
return strtoll(str, &endptr, 10);
|
||||||
}
|
}
|
||||||
|
@ -107,7 +107,7 @@ int32_t taosUcs4ToMbs(void *ucs4, int32_t ucs4_max_len, char *mbs) {
|
||||||
return len;
|
return len;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool taosMbsToUcs4(char *mbs, size_t mbsLength, char *ucs4, int32_t ucs4_max_len, int32_t *len) {
|
bool taosMbsToUcs4(const char *mbs, size_t mbsLength, char *ucs4, int32_t ucs4_max_len, int32_t *len) {
|
||||||
memset(ucs4, 0, ucs4_max_len);
|
memset(ucs4, 0, ucs4_max_len);
|
||||||
mbstate_t state = {0};
|
mbstate_t state = {0};
|
||||||
int32_t retlen = mbsnrtowcs((wchar_t *)ucs4, (const char **)&mbs, mbsLength, ucs4_max_len / 4, &state);
|
int32_t retlen = mbsnrtowcs((wchar_t *)ucs4, (const char **)&mbs, mbsLength, ucs4_max_len / 4, &state);
|
||||||
|
|
|
@ -166,7 +166,7 @@ char **strsplit(char *z, const char *delim, int32_t *num) {
|
||||||
return split;
|
return split;
|
||||||
}
|
}
|
||||||
|
|
||||||
char *strnchr(char *haystack, char needle, int32_t len, bool skipquote) {
|
char *strnchr(const char *haystack, char needle, int32_t len, bool skipquote) {
|
||||||
for (int32_t i = 0; i < len; ++i) {
|
for (int32_t i = 0; i < len; ++i) {
|
||||||
|
|
||||||
// skip the needle in quote, jump to the end of quoted string
|
// skip the needle in quote, jump to the end of quoted string
|
||||||
|
@ -179,7 +179,7 @@ char *strnchr(char *haystack, char needle, int32_t len, bool skipquote) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (haystack[i] == needle) {
|
if (haystack[i] == needle) {
|
||||||
return &haystack[i];
|
return (char *)&haystack[i];
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue