Merge pull request #17606 from taosdata/enh/3.0_planner_optimize
enh: insert parser refactor
This commit is contained in:
commit
ca5ecdcc4e
|
@ -169,7 +169,7 @@ typedef struct {
|
|||
// #define TD_ROW_VER(r) ((r)->ver)
|
||||
#define TD_ROW_KEY_ADDR(r) (r)
|
||||
|
||||
// N.B. If without STSchema, getExtendedRowSize() is used to get the rowMaxBytes and
|
||||
// N.B. If without STSchema, insGetExtendedRowSize() is used to get the rowMaxBytes and
|
||||
// (int32_t)ceil((double)nCols/TD_VTYPE_PARTS) should be added if TD_SUPPORT_BITMAP defined.
|
||||
#define TD_ROW_MAX_BYTES_FROM_SCHEMA(s) ((s)->tlen + TD_ROW_HEAD_LEN)
|
||||
|
||||
|
|
|
@ -46,7 +46,6 @@ extern "C" {
|
|||
|
||||
#define ERROR_MSG_BUF_DEFAULT_SIZE 512
|
||||
#define HEARTBEAT_INTERVAL 1500 // ms
|
||||
#define SYNC_ON_TOP_OF_ASYNC 1
|
||||
|
||||
enum {
|
||||
RES_TYPE__QUERY = 1,
|
||||
|
@ -271,9 +270,8 @@ void doFreeReqResultInfo(SReqResultInfo* pResInfo);
|
|||
int32_t transferTableNameList(const char* tbList, int32_t acctId, char* dbName, SArray** pReq);
|
||||
void syncCatalogFn(SMetaData* pResult, void* param, int32_t code);
|
||||
|
||||
SRequestObj* execQuery(uint64_t connId, const char* sql, int sqlLen, bool validateOnly);
|
||||
TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly);
|
||||
void taosAsyncQueryImpl(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly);
|
||||
TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly);
|
||||
void taosAsyncQueryImpl(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly);
|
||||
|
||||
int32_t getVersion1BlockMetaSize(const char* p, int32_t numOfCols);
|
||||
|
||||
|
@ -349,8 +347,6 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet);
|
|||
STscObj* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db,
|
||||
uint16_t port, int connType);
|
||||
|
||||
SRequestObj* launchQuery(uint64_t connId, const char* sql, int sqlLen, bool validateOnly, bool inRetry);
|
||||
|
||||
int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtCallback* pStmtCb);
|
||||
|
||||
int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList);
|
||||
|
|
|
@ -1013,28 +1013,6 @@ SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQue
|
|||
return pRequest;
|
||||
}
|
||||
|
||||
SRequestObj* launchQuery(uint64_t connId, const char* sql, int sqlLen, bool validateOnly, bool inRetry) {
|
||||
SRequestObj* pRequest = NULL;
|
||||
SQuery* pQuery = NULL;
|
||||
|
||||
int32_t code = buildRequest(connId, sql, sqlLen, NULL, validateOnly, &pRequest);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
terrno = code;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
code = parseSql(pRequest, false, &pQuery, NULL);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
pRequest->code = code;
|
||||
return pRequest;
|
||||
}
|
||||
|
||||
pRequest->inRetry = inRetry;
|
||||
pRequest->stableQuery = pQuery->stableQuery;
|
||||
|
||||
return launchQueryImpl(pRequest, pQuery, false, NULL);
|
||||
}
|
||||
|
||||
static int32_t asyncExecSchQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultMeta,
|
||||
SSqlCallbackWrapper* pWrapper) {
|
||||
pRequest->type = pQuery->msgType;
|
||||
|
@ -1197,32 +1175,6 @@ int32_t removeMeta(STscObj* pTscObj, SArray* tbList) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
// todo remove it soon
|
||||
SRequestObj* execQuery(uint64_t connId, const char* sql, int sqlLen, bool validateOnly) {
|
||||
SRequestObj* pRequest = NULL;
|
||||
int32_t retryNum = 0;
|
||||
int32_t code = 0;
|
||||
bool inRetry = false;
|
||||
|
||||
do {
|
||||
destroyRequest(pRequest);
|
||||
pRequest = launchQuery(connId, sql, sqlLen, validateOnly, inRetry);
|
||||
if (pRequest == NULL || TSDB_CODE_SUCCESS == pRequest->code || !NEED_CLIENT_HANDLE_ERROR(pRequest->code)) {
|
||||
break;
|
||||
}
|
||||
|
||||
code = refreshMeta(pRequest->pTscObj, pRequest);
|
||||
if (code) {
|
||||
pRequest->code = code;
|
||||
break;
|
||||
}
|
||||
|
||||
inRetry = true;
|
||||
} while (retryNum++ < REQUEST_TOTAL_EXEC_TIMES);
|
||||
|
||||
return pRequest;
|
||||
}
|
||||
|
||||
int initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet) {
|
||||
pEpSet->version = 0;
|
||||
|
||||
|
@ -2291,7 +2243,6 @@ TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
#if SYNC_ON_TOP_OF_ASYNC
|
||||
SSyncQueryParam* param = taosMemoryCalloc(1, sizeof(SSyncQueryParam));
|
||||
tsem_init(¶m->sem, 0, 0);
|
||||
|
||||
|
@ -2301,15 +2252,4 @@ TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly) {
|
|||
param->pRequest->syncQuery = true;
|
||||
}
|
||||
return param->pRequest;
|
||||
#else
|
||||
size_t sqlLen = strlen(sql);
|
||||
if (sqlLen > (size_t)TSDB_MAX_ALLOWED_SQL_LEN) {
|
||||
tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN);
|
||||
terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
TAOS_RES* pRes = execQuery(*(int64_t*)taos, sql, sqlLen, validateOnly);
|
||||
return pRes;
|
||||
#endif
|
||||
}
|
||||
|
|
|
@ -261,12 +261,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
#if SYNC_ON_TOP_OF_ASYNC
|
||||
return doAsyncFetchRows(pRequest, true, true);
|
||||
#else
|
||||
return doFetchRows(pRequest, true, true);
|
||||
#endif
|
||||
|
||||
} else if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res)) {
|
||||
SMqRspObj *msg = ((SMqRspObj *)res);
|
||||
SReqResultInfo *pResultInfo;
|
||||
|
@ -549,11 +544,7 @@ int taos_fetch_block_s(TAOS_RES *res, int *numOfRows, TAOS_ROW *rows) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
#if SYNC_ON_TOP_OF_ASYNC
|
||||
doAsyncFetchRows(pRequest, false, true);
|
||||
#else
|
||||
doFetchRows(pRequest, true, true);
|
||||
#endif
|
||||
|
||||
// TODO refactor
|
||||
SReqResultInfo *pResultInfo = &pRequest->body.resInfo;
|
||||
|
@ -601,11 +592,7 @@ int taos_fetch_raw_block(TAOS_RES *res, int *numOfRows, void **pData) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
#if SYNC_ON_TOP_OF_ASYNC
|
||||
doAsyncFetchRows(pRequest, false, false);
|
||||
#else
|
||||
doFetchRows(pRequest, false, false);
|
||||
#endif
|
||||
|
||||
SReqResultInfo *pResultInfo = &pRequest->body.resInfo;
|
||||
|
||||
|
@ -989,7 +976,7 @@ const void *taos_get_raw_block(TAOS_RES *res) {
|
|||
return pRequest->body.resInfo.pData;
|
||||
}
|
||||
|
||||
int taos_get_db_route_info(TAOS* taos, const char* db, TAOS_DB_ROUTE_INFO* dbInfo) {
|
||||
int taos_get_db_route_info(TAOS *taos, const char *db, TAOS_DB_ROUTE_INFO *dbInfo) {
|
||||
if (NULL == taos) {
|
||||
terrno = TSDB_CODE_TSC_DISCONNECTED;
|
||||
return terrno;
|
||||
|
@ -1001,16 +988,16 @@ int taos_get_db_route_info(TAOS* taos, const char* db, TAOS_DB_ROUTE_INFO* dbInf
|
|||
return terrno;
|
||||
}
|
||||
|
||||
int64_t connId = *(int64_t *)taos;
|
||||
SRequestObj *pRequest = NULL;
|
||||
char *sql = "taos_get_db_route_info";
|
||||
int32_t code = buildRequest(connId, sql, strlen(sql), NULL, false, &pRequest);
|
||||
int64_t connId = *(int64_t *)taos;
|
||||
SRequestObj *pRequest = NULL;
|
||||
char *sql = "taos_get_db_route_info";
|
||||
int32_t code = buildRequest(connId, sql, strlen(sql), NULL, false, &pRequest);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
terrno = code;
|
||||
return terrno;
|
||||
}
|
||||
|
||||
STscObj *pTscObj = pRequest->pTscObj;
|
||||
STscObj *pTscObj = pRequest->pTscObj;
|
||||
SCatalog *pCtg = NULL;
|
||||
code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCtg);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -1024,7 +1011,7 @@ int taos_get_db_route_info(TAOS* taos, const char* db, TAOS_DB_ROUTE_INFO* dbInf
|
|||
|
||||
char dbFName[TSDB_DB_FNAME_LEN] = {0};
|
||||
snprintf(dbFName, sizeof(dbFName), "%d.%s", pTscObj->acctId, db);
|
||||
|
||||
|
||||
code = catalogGetDBVgInfo(pCtg, &conn, dbFName, dbInfo);
|
||||
if (code) {
|
||||
goto _return;
|
||||
|
@ -1038,7 +1025,7 @@ _return:
|
|||
return code;
|
||||
}
|
||||
|
||||
int taos_get_table_vgId(TAOS* taos, const char* db, const char* table, int* vgId) {
|
||||
int taos_get_table_vgId(TAOS *taos, const char *db, const char *table, int *vgId) {
|
||||
if (NULL == taos) {
|
||||
terrno = TSDB_CODE_TSC_DISCONNECTED;
|
||||
return terrno;
|
||||
|
@ -1050,15 +1037,15 @@ int taos_get_table_vgId(TAOS* taos, const char* db, const char* table, int* vgId
|
|||
return terrno;
|
||||
}
|
||||
|
||||
int64_t connId = *(int64_t *)taos;
|
||||
SRequestObj *pRequest = NULL;
|
||||
char *sql = "taos_get_table_vgId";
|
||||
int32_t code = buildRequest(connId, sql, strlen(sql), NULL, false, &pRequest);
|
||||
int64_t connId = *(int64_t *)taos;
|
||||
SRequestObj *pRequest = NULL;
|
||||
char *sql = "taos_get_table_vgId";
|
||||
int32_t code = buildRequest(connId, sql, strlen(sql), NULL, false, &pRequest);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
STscObj *pTscObj = pRequest->pTscObj;
|
||||
STscObj *pTscObj = pRequest->pTscObj;
|
||||
SCatalog *pCtg = NULL;
|
||||
code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCtg);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
|
|
@ -1,148 +0,0 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef TDENGINE_DATABLOCKMGT_H
|
||||
#define TDENGINE_DATABLOCKMGT_H
|
||||
|
||||
#include "catalog.h"
|
||||
#include "os.h"
|
||||
#include "query.h"
|
||||
#include "tname.h"
|
||||
#include "ttypes.h"
|
||||
|
||||
#define IS_DATA_COL_ORDERED(spd) ((spd->orderStatus) == (int8_t)ORDER_STATUS_ORDERED)
|
||||
|
||||
typedef enum EOrderStatus {
|
||||
ORDER_STATUS_UNKNOWN = 0,
|
||||
ORDER_STATUS_ORDERED = 1,
|
||||
ORDER_STATUS_DISORDERED = 2,
|
||||
} EOrderStatus;
|
||||
|
||||
typedef enum EValStat {
|
||||
VAL_STAT_HAS = 0x0, // 0 means has val
|
||||
VAL_STAT_NONE = 0x01, // 1 means no val
|
||||
} EValStat;
|
||||
|
||||
typedef struct SBoundColumn {
|
||||
int32_t offset; // all column offset value
|
||||
int32_t toffset; // first part offset for SDataRow TODO: get offset from STSchema on future
|
||||
uint8_t valStat; // EValStat. denote if current column bound or not(0 means has val, 1 means no val)
|
||||
} SBoundColumn;
|
||||
|
||||
typedef struct {
|
||||
col_id_t schemaColIdx;
|
||||
col_id_t boundIdx;
|
||||
col_id_t finalIdx;
|
||||
} SBoundIdxInfo;
|
||||
|
||||
typedef struct SParsedDataColInfo {
|
||||
col_id_t numOfCols;
|
||||
col_id_t numOfBound;
|
||||
uint16_t flen; // TODO: get from STSchema
|
||||
uint16_t allNullLen; // TODO: get from STSchema(base on SDataRow)
|
||||
uint16_t extendedVarLen;
|
||||
uint16_t boundNullLen; // bound column len with all NULL value(without VarDataOffsetT/SColIdx part)
|
||||
col_id_t *boundColumns; // bound column idx according to schema
|
||||
SBoundColumn *cols;
|
||||
SBoundIdxInfo *colIdxInfo;
|
||||
int8_t orderStatus; // bound columns
|
||||
} SParsedDataColInfo;
|
||||
|
||||
typedef struct {
|
||||
uint8_t rowType; // default is 0, that is SDataRow
|
||||
int32_t rowSize;
|
||||
} SMemRowBuilder;
|
||||
|
||||
typedef struct STableDataBlocks {
|
||||
int8_t tsSource; // where does the UNIX timestamp come from, server or client
|
||||
bool ordered; // if current rows are ordered or not
|
||||
int32_t vgId; // virtual group id
|
||||
int64_t prevTS; // previous timestamp, recorded to decide if the records array is ts ascending
|
||||
int32_t numOfTables; // number of tables in current submit block
|
||||
int32_t rowSize; // row size for current table
|
||||
uint32_t nAllocSize;
|
||||
uint32_t headerSize; // header for table info (uid, tid, submit metadata)
|
||||
uint32_t size;
|
||||
STableMeta *pTableMeta; // the tableMeta of current table, the table meta will be used during submit, keep a ref to
|
||||
// avoid to be removed from cache
|
||||
char *pData;
|
||||
bool cloned;
|
||||
int32_t createTbReqLen;
|
||||
SParsedDataColInfo boundColumnInfo;
|
||||
SRowBuilder rowBuilder;
|
||||
} STableDataBlocks;
|
||||
|
||||
static FORCE_INLINE int32_t getExtendedRowSize(STableDataBlocks *pBlock) {
|
||||
STableComInfo *pTableInfo = &pBlock->pTableMeta->tableInfo;
|
||||
ASSERT(pBlock->rowSize == pTableInfo->rowSize);
|
||||
return pBlock->rowSize + TD_ROW_HEAD_LEN - sizeof(TSKEY) + pBlock->boundColumnInfo.extendedVarLen +
|
||||
(int32_t)TD_BITMAP_BYTES(pTableInfo->numOfColumns - 1);
|
||||
}
|
||||
|
||||
static FORCE_INLINE void getSTSRowAppendInfo(uint8_t rowType, SParsedDataColInfo *spd, col_id_t idx, int32_t *toffset,
|
||||
col_id_t *colIdx) {
|
||||
col_id_t schemaIdx = 0;
|
||||
if (IS_DATA_COL_ORDERED(spd)) {
|
||||
schemaIdx = spd->boundColumns[idx];
|
||||
if (TD_IS_TP_ROW_T(rowType)) {
|
||||
*toffset = (spd->cols + schemaIdx)->toffset; // the offset of firstPart
|
||||
*colIdx = schemaIdx;
|
||||
} else {
|
||||
*toffset = idx * sizeof(SKvRowIdx); // the offset of SKvRowIdx
|
||||
*colIdx = idx;
|
||||
}
|
||||
} else {
|
||||
ASSERT(idx == (spd->colIdxInfo + idx)->boundIdx);
|
||||
schemaIdx = (spd->colIdxInfo + idx)->schemaColIdx;
|
||||
if (TD_IS_TP_ROW_T(rowType)) {
|
||||
*toffset = (spd->cols + schemaIdx)->toffset;
|
||||
*colIdx = schemaIdx;
|
||||
} else {
|
||||
*toffset = ((spd->colIdxInfo + idx)->finalIdx) * sizeof(SKvRowIdx);
|
||||
*colIdx = (spd->colIdxInfo + idx)->finalIdx;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static FORCE_INLINE int32_t setBlockInfo(SSubmitBlk *pBlocks, STableDataBlocks *dataBuf, int32_t numOfRows) {
|
||||
pBlocks->suid = (TSDB_NORMAL_TABLE == dataBuf->pTableMeta->tableType ? 0 : dataBuf->pTableMeta->suid);
|
||||
pBlocks->uid = dataBuf->pTableMeta->uid;
|
||||
pBlocks->sversion = dataBuf->pTableMeta->sversion;
|
||||
pBlocks->schemaLen = dataBuf->createTbReqLen;
|
||||
|
||||
if (pBlocks->numOfRows + numOfRows >= INT32_MAX) {
|
||||
return TSDB_CODE_TSC_INVALID_OPERATION;
|
||||
} else {
|
||||
pBlocks->numOfRows += numOfRows;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
}
|
||||
|
||||
int32_t schemaIdxCompar(const void *lhs, const void *rhs);
|
||||
int32_t boundIdxCompar(const void *lhs, const void *rhs);
|
||||
void setBoundColumnInfo(SParsedDataColInfo *pColList, SSchema *pSchema, col_id_t numOfCols);
|
||||
void destroyBlockArrayList(SArray *pDataBlockList);
|
||||
void destroyBlockHashmap(SHashObj *pDataBlockHash);
|
||||
int initRowBuilder(SRowBuilder *pBuilder, int16_t schemaVer, SParsedDataColInfo *pColInfo);
|
||||
int32_t allocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int32_t *numOfRows);
|
||||
int32_t getDataBlockFromList(SHashObj *pHashList, void *id, int32_t idLen, int32_t size, int32_t startOffset,
|
||||
int32_t rowSize, STableMeta *pTableMeta, STableDataBlocks **dataBlocks, SArray *pBlockList,
|
||||
SVCreateTbReq *pCreateTbReq);
|
||||
int32_t mergeTableDataBlocks(SHashObj *pHashObj, uint8_t payloadType, SArray **pVgDataBlocks);
|
||||
int32_t buildCreateTbMsg(STableDataBlocks *pBlocks, SVCreateTbReq *pCreateTbReq);
|
||||
|
||||
int32_t allocateMemForSize(STableDataBlocks *pDataBlock, int32_t allSize);
|
||||
|
||||
#endif // TDENGINE_DATABLOCKMGT_H
|
|
@ -0,0 +1,167 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef TDENGINE_PAR_INSERT_UTIL_H
|
||||
#define TDENGINE_PAR_INSERT_UTIL_H
|
||||
|
||||
#include "parUtil.h"
|
||||
|
||||
struct SToken;
|
||||
|
||||
#define IS_DATA_COL_ORDERED(spd) ((spd->orderStatus) == (int8_t)ORDER_STATUS_ORDERED)
|
||||
|
||||
#define NEXT_TOKEN(pSql, sToken) \
|
||||
do { \
|
||||
int32_t index = 0; \
|
||||
sToken = tStrGetToken(pSql, &index, false); \
|
||||
pSql += index; \
|
||||
} while (0)
|
||||
|
||||
#define CHECK_CODE(expr) \
|
||||
do { \
|
||||
int32_t code = expr; \
|
||||
if (TSDB_CODE_SUCCESS != code) { \
|
||||
return code; \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
typedef enum EOrderStatus {
|
||||
ORDER_STATUS_UNKNOWN = 0,
|
||||
ORDER_STATUS_ORDERED = 1,
|
||||
ORDER_STATUS_DISORDERED = 2,
|
||||
} EOrderStatus;
|
||||
|
||||
typedef enum EValStat {
|
||||
VAL_STAT_HAS = 0x0, // 0 means has val
|
||||
VAL_STAT_NONE = 0x01, // 1 means no val
|
||||
} EValStat;
|
||||
|
||||
typedef struct SBoundColumn {
|
||||
int32_t offset; // all column offset value
|
||||
int32_t toffset; // first part offset for SDataRow TODO: get offset from STSchema on future
|
||||
uint8_t valStat; // EValStat. denote if current column bound or not(0 means has val, 1 means no val)
|
||||
} SBoundColumn;
|
||||
|
||||
typedef struct {
|
||||
col_id_t schemaColIdx;
|
||||
col_id_t boundIdx;
|
||||
col_id_t finalIdx;
|
||||
} SBoundIdxInfo;
|
||||
|
||||
typedef struct SParsedDataColInfo {
|
||||
col_id_t numOfCols;
|
||||
col_id_t numOfBound;
|
||||
uint16_t flen; // TODO: get from STSchema
|
||||
uint16_t allNullLen; // TODO: get from STSchema(base on SDataRow)
|
||||
uint16_t extendedVarLen;
|
||||
uint16_t boundNullLen; // bound column len with all NULL value(without VarDataOffsetT/SColIdx part)
|
||||
col_id_t *boundColumns; // bound column idx according to schema
|
||||
SBoundColumn *cols;
|
||||
SBoundIdxInfo *colIdxInfo;
|
||||
int8_t orderStatus; // bound columns
|
||||
} SParsedDataColInfo;
|
||||
|
||||
typedef struct SInsertParseBaseContext {
|
||||
SParseContext *pComCxt;
|
||||
char *pSql;
|
||||
SMsgBuf msg;
|
||||
} SInsertParseBaseContext;
|
||||
|
||||
typedef struct SInsertParseContext {
|
||||
SParseContext *pComCxt; // input
|
||||
char *pSql; // input
|
||||
SMsgBuf msg; // input
|
||||
STableMeta *pTableMeta; // each table
|
||||
SParsedDataColInfo tags; // each table
|
||||
SVCreateTbReq createTblReq; // each table
|
||||
SHashObj *pVgroupsHashObj; // global
|
||||
SHashObj *pTableBlockHashObj; // global
|
||||
SHashObj *pSubTableHashObj; // global
|
||||
SArray *pVgDataBlocks; // global
|
||||
SHashObj *pTableNameHashObj; // global
|
||||
SHashObj *pDbFNameHashObj; // global
|
||||
int32_t totalNum;
|
||||
SVnodeModifOpStmt *pOutput;
|
||||
SStmtCallback *pStmtCb;
|
||||
SParseMetaCache *pMetaCache;
|
||||
char sTableName[TSDB_TABLE_NAME_LEN];
|
||||
char tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW];
|
||||
int64_t memElapsed;
|
||||
int64_t parRowElapsed;
|
||||
} SInsertParseContext;
|
||||
|
||||
typedef struct SInsertParseSyntaxCxt {
|
||||
SParseContext *pComCxt;
|
||||
char *pSql;
|
||||
SMsgBuf msg;
|
||||
SParseMetaCache *pMetaCache;
|
||||
} SInsertParseSyntaxCxt;
|
||||
|
||||
typedef struct SMemParam {
|
||||
SRowBuilder *rb;
|
||||
SSchema *schema;
|
||||
int32_t toffset;
|
||||
col_id_t colIdx;
|
||||
} SMemParam;
|
||||
|
||||
typedef struct {
|
||||
uint8_t rowType; // default is 0, that is SDataRow
|
||||
int32_t rowSize;
|
||||
} SMemRowBuilder;
|
||||
|
||||
typedef struct STableDataBlocks {
|
||||
int8_t tsSource; // where does the UNIX timestamp come from, server or client
|
||||
bool ordered; // if current rows are ordered or not
|
||||
int32_t vgId; // virtual group id
|
||||
int64_t prevTS; // previous timestamp, recorded to decide if the records array is ts ascending
|
||||
int32_t numOfTables; // number of tables in current submit block
|
||||
int32_t rowSize; // row size for current table
|
||||
uint32_t nAllocSize;
|
||||
uint32_t headerSize; // header for table info (uid, tid, submit metadata)
|
||||
uint32_t size;
|
||||
STableMeta *pTableMeta; // the tableMeta of current table, the table meta will be used during submit, keep a ref to
|
||||
// avoid to be removed from cache
|
||||
char *pData;
|
||||
bool cloned;
|
||||
int32_t createTbReqLen;
|
||||
SParsedDataColInfo boundColumnInfo;
|
||||
SRowBuilder rowBuilder;
|
||||
} STableDataBlocks;
|
||||
|
||||
int32_t insGetExtendedRowSize(STableDataBlocks *pBlock);
|
||||
void insGetSTSRowAppendInfo(uint8_t rowType, SParsedDataColInfo *spd, col_id_t idx, int32_t *toffset, col_id_t *colIdx);
|
||||
int32_t insSetBlockInfo(SSubmitBlk *pBlocks, STableDataBlocks *dataBuf, int32_t numOfRows);
|
||||
int32_t insSchemaIdxCompar(const void *lhs, const void *rhs);
|
||||
int32_t insBoundIdxCompar(const void *lhs, const void *rhs);
|
||||
void insSetBoundColumnInfo(SParsedDataColInfo *pColList, SSchema *pSchema, col_id_t numOfCols);
|
||||
void insDestroyBlockArrayList(SArray *pDataBlockList);
|
||||
void insDestroyBlockHashmap(SHashObj *pDataBlockHash);
|
||||
int32_t insInitRowBuilder(SRowBuilder *pBuilder, int16_t schemaVer, SParsedDataColInfo *pColInfo);
|
||||
int32_t insGetDataBlockFromList(SHashObj *pHashList, void *id, int32_t idLen, int32_t size, int32_t startOffset,
|
||||
int32_t rowSize, STableMeta *pTableMeta, STableDataBlocks **dataBlocks,
|
||||
SArray *pBlockList, SVCreateTbReq *pCreateTbReq);
|
||||
int32_t insMergeTableDataBlocks(SHashObj *pHashObj, uint8_t payloadType, SArray **pVgDataBlocks);
|
||||
int32_t insBuildCreateTbMsg(STableDataBlocks *pBlocks, SVCreateTbReq *pCreateTbReq);
|
||||
int32_t insAllocateMemForSize(STableDataBlocks *pDataBlock, int32_t allSize);
|
||||
int32_t insCreateSName(SName *pName, struct SToken *pTableName, int32_t acctId, const char *dbName, SMsgBuf *pMsgBuf);
|
||||
int32_t insFindCol(struct SToken *pColname, int32_t start, int32_t end, SSchema *pSchema);
|
||||
void insBuildCreateTbReq(SVCreateTbReq *pTbReq, const char *tname, STag *pTag, int64_t suid, const char *sname,
|
||||
SArray *tagName, uint8_t tagNum);
|
||||
int32_t insMemRowAppend(SMsgBuf *pMsgBuf, const void *value, int32_t len, void *param);
|
||||
int32_t insCheckTimestamp(STableDataBlocks *pDataBlocks, const char *start);
|
||||
int32_t insBuildOutput(SInsertParseContext *pCxt);
|
||||
void insDestroyDataBlock(STableDataBlocks *pDataBlock);
|
||||
|
||||
#endif // TDENGINE_PAR_INSERT_UTIL_H
|
|
@ -0,0 +1,363 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "parInsertUtil.h"
|
||||
#include "parInt.h"
|
||||
#include "parToken.h"
|
||||
#include "ttime.h"
|
||||
|
||||
int32_t qCreateSName(SName* pName, const char* pTableName, int32_t acctId, char* dbName, char* msgBuf,
|
||||
int32_t msgBufLen) {
|
||||
SMsgBuf msg = {.buf = msgBuf, .len = msgBufLen};
|
||||
SToken sToken;
|
||||
int32_t code = 0;
|
||||
char* tbName = NULL;
|
||||
|
||||
NEXT_TOKEN(pTableName, sToken);
|
||||
|
||||
if (sToken.n == 0) {
|
||||
return buildInvalidOperationMsg(&msg, "empty table name");
|
||||
}
|
||||
|
||||
code = insCreateSName(pName, &sToken, acctId, dbName, &msg);
|
||||
if (code) {
|
||||
return code;
|
||||
}
|
||||
|
||||
NEXT_TOKEN(pTableName, sToken);
|
||||
|
||||
if (sToken.n > 0) {
|
||||
return buildInvalidOperationMsg(&msg, "table name format is wrong");
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
typedef struct SmlExecTableHandle {
|
||||
SParsedDataColInfo tags; // each table
|
||||
SVCreateTbReq createTblReq; // each table
|
||||
} SmlExecTableHandle;
|
||||
|
||||
typedef struct SmlExecHandle {
|
||||
SHashObj* pBlockHash;
|
||||
SmlExecTableHandle tableExecHandle;
|
||||
SQuery* pQuery;
|
||||
} SSmlExecHandle;
|
||||
|
||||
static void smlDestroyTableHandle(void* pHandle) {
|
||||
SmlExecTableHandle* handle = (SmlExecTableHandle*)pHandle;
|
||||
destroyBoundColumnInfo(&handle->tags);
|
||||
tdDestroySVCreateTbReq(&handle->createTblReq);
|
||||
}
|
||||
|
||||
static int32_t smlBoundColumnData(SArray* cols, SParsedDataColInfo* pColList, SSchema* pSchema, bool isTag) {
|
||||
col_id_t nCols = pColList->numOfCols;
|
||||
|
||||
pColList->numOfBound = 0;
|
||||
pColList->boundNullLen = 0;
|
||||
memset(pColList->boundColumns, 0, sizeof(col_id_t) * nCols);
|
||||
for (col_id_t i = 0; i < nCols; ++i) {
|
||||
pColList->cols[i].valStat = VAL_STAT_NONE;
|
||||
}
|
||||
|
||||
bool isOrdered = true;
|
||||
col_id_t lastColIdx = -1; // last column found
|
||||
for (int i = 0; i < taosArrayGetSize(cols); ++i) {
|
||||
SSmlKv* kv = taosArrayGetP(cols, i);
|
||||
SToken sToken = {.n = kv->keyLen, .z = (char*)kv->key};
|
||||
col_id_t t = lastColIdx + 1;
|
||||
col_id_t index = ((t == 0 && !isTag) ? 0 : insFindCol(&sToken, t, nCols, pSchema));
|
||||
uDebug("SML, index:%d, t:%d, ncols:%d", index, t, nCols);
|
||||
if (index < 0 && t > 0) {
|
||||
index = insFindCol(&sToken, 0, t, pSchema);
|
||||
isOrdered = false;
|
||||
}
|
||||
if (index < 0) {
|
||||
uError("smlBoundColumnData. index:%d", index);
|
||||
return TSDB_CODE_SML_INVALID_DATA;
|
||||
}
|
||||
if (pColList->cols[index].valStat == VAL_STAT_HAS) {
|
||||
uError("smlBoundColumnData. already set. index:%d", index);
|
||||
return TSDB_CODE_SML_INVALID_DATA;
|
||||
}
|
||||
lastColIdx = index;
|
||||
pColList->cols[index].valStat = VAL_STAT_HAS;
|
||||
pColList->boundColumns[pColList->numOfBound] = index;
|
||||
++pColList->numOfBound;
|
||||
switch (pSchema[t].type) {
|
||||
case TSDB_DATA_TYPE_BINARY:
|
||||
pColList->boundNullLen += (sizeof(VarDataOffsetT) + VARSTR_HEADER_SIZE + CHAR_BYTES);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_NCHAR:
|
||||
pColList->boundNullLen += (sizeof(VarDataOffsetT) + VARSTR_HEADER_SIZE + TSDB_NCHAR_SIZE);
|
||||
break;
|
||||
default:
|
||||
pColList->boundNullLen += TYPE_BYTES[pSchema[t].type];
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
pColList->orderStatus = isOrdered ? ORDER_STATUS_ORDERED : ORDER_STATUS_DISORDERED;
|
||||
|
||||
if (!isOrdered) {
|
||||
pColList->colIdxInfo = taosMemoryCalloc(pColList->numOfBound, sizeof(SBoundIdxInfo));
|
||||
if (NULL == pColList->colIdxInfo) {
|
||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
}
|
||||
SBoundIdxInfo* pColIdx = pColList->colIdxInfo;
|
||||
for (col_id_t i = 0; i < pColList->numOfBound; ++i) {
|
||||
pColIdx[i].schemaColIdx = pColList->boundColumns[i];
|
||||
pColIdx[i].boundIdx = i;
|
||||
}
|
||||
taosSort(pColIdx, pColList->numOfBound, sizeof(SBoundIdxInfo), insSchemaIdxCompar);
|
||||
for (col_id_t i = 0; i < pColList->numOfBound; ++i) {
|
||||
pColIdx[i].finalIdx = i;
|
||||
}
|
||||
taosSort(pColIdx, pColList->numOfBound, sizeof(SBoundIdxInfo), insBoundIdxCompar);
|
||||
}
|
||||
|
||||
if (pColList->numOfCols > pColList->numOfBound) {
|
||||
memset(&pColList->boundColumns[pColList->numOfBound], 0,
|
||||
sizeof(col_id_t) * (pColList->numOfCols - pColList->numOfBound));
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief No json tag for schemaless
|
||||
*
|
||||
* @param cols
|
||||
* @param tags
|
||||
* @param pSchema
|
||||
* @param ppTag
|
||||
* @param msg
|
||||
* @return int32_t
|
||||
*/
|
||||
static int32_t smlBuildTagRow(SArray* cols, SParsedDataColInfo* tags, SSchema* pSchema, STag** ppTag, SArray** tagName,
|
||||
SMsgBuf* msg) {
|
||||
SArray* pTagArray = taosArrayInit(tags->numOfBound, sizeof(STagVal));
|
||||
if (!pTagArray) {
|
||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
}
|
||||
*tagName = taosArrayInit(8, TSDB_COL_NAME_LEN);
|
||||
if (!*tagName) {
|
||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
for (int i = 0; i < tags->numOfBound; ++i) {
|
||||
SSchema* pTagSchema = &pSchema[tags->boundColumns[i]];
|
||||
SSmlKv* kv = taosArrayGetP(cols, i);
|
||||
|
||||
taosArrayPush(*tagName, pTagSchema->name);
|
||||
STagVal val = {.cid = pTagSchema->colId, .type = pTagSchema->type};
|
||||
// strcpy(val.colName, pTagSchema->name);
|
||||
if (pTagSchema->type == TSDB_DATA_TYPE_BINARY) {
|
||||
val.pData = (uint8_t*)kv->value;
|
||||
val.nData = kv->length;
|
||||
} else if (pTagSchema->type == TSDB_DATA_TYPE_NCHAR) {
|
||||
int32_t output = 0;
|
||||
void* p = taosMemoryCalloc(1, kv->length * TSDB_NCHAR_SIZE);
|
||||
if (p == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto end;
|
||||
}
|
||||
if (!taosMbsToUcs4(kv->value, kv->length, (TdUcs4*)(p), kv->length * TSDB_NCHAR_SIZE, &output)) {
|
||||
if (errno == E2BIG) {
|
||||
taosMemoryFree(p);
|
||||
code = generateSyntaxErrMsg(msg, TSDB_CODE_PAR_VALUE_TOO_LONG, pTagSchema->name);
|
||||
goto end;
|
||||
}
|
||||
char buf[512] = {0};
|
||||
snprintf(buf, tListLen(buf), " taosMbsToUcs4 error:%s", strerror(errno));
|
||||
taosMemoryFree(p);
|
||||
code = buildSyntaxErrMsg(msg, buf, kv->value);
|
||||
goto end;
|
||||
}
|
||||
val.pData = p;
|
||||
val.nData = output;
|
||||
} else {
|
||||
memcpy(&val.i64, &(kv->value), kv->length);
|
||||
}
|
||||
taosArrayPush(pTagArray, &val);
|
||||
}
|
||||
|
||||
code = tTagNew(pTagArray, 1, false, ppTag);
|
||||
end:
|
||||
for (int i = 0; i < taosArrayGetSize(pTagArray); ++i) {
|
||||
STagVal* p = (STagVal*)taosArrayGet(pTagArray, i);
|
||||
if (p->type == TSDB_DATA_TYPE_NCHAR) {
|
||||
taosMemoryFree(p->pData);
|
||||
}
|
||||
}
|
||||
taosArrayDestroy(pTagArray);
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t smlBindData(void* handle, SArray* tags, SArray* colsSchema, SArray* cols, bool format, STableMeta* pTableMeta,
|
||||
char* tableName, const char* sTableName, int32_t sTableNameLen, char* msgBuf, int16_t msgBufLen) {
|
||||
SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen};
|
||||
|
||||
SSmlExecHandle* smlHandle = (SSmlExecHandle*)handle;
|
||||
smlDestroyTableHandle(&smlHandle->tableExecHandle); // free for each table
|
||||
SSchema* pTagsSchema = getTableTagSchema(pTableMeta);
|
||||
insSetBoundColumnInfo(&smlHandle->tableExecHandle.tags, pTagsSchema, getNumOfTags(pTableMeta));
|
||||
int ret = smlBoundColumnData(tags, &smlHandle->tableExecHandle.tags, pTagsSchema, true);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
buildInvalidOperationMsg(&pBuf, "bound tags error");
|
||||
return ret;
|
||||
}
|
||||
STag* pTag = NULL;
|
||||
SArray* tagName = NULL;
|
||||
ret = smlBuildTagRow(tags, &smlHandle->tableExecHandle.tags, pTagsSchema, &pTag, &tagName, &pBuf);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
taosArrayDestroy(tagName);
|
||||
return ret;
|
||||
}
|
||||
|
||||
insBuildCreateTbReq(&smlHandle->tableExecHandle.createTblReq, tableName, pTag, pTableMeta->suid, NULL, tagName,
|
||||
pTableMeta->tableInfo.numOfTags);
|
||||
taosArrayDestroy(tagName);
|
||||
|
||||
smlHandle->tableExecHandle.createTblReq.ctb.stbName = taosMemoryMalloc(sTableNameLen + 1);
|
||||
memcpy(smlHandle->tableExecHandle.createTblReq.ctb.stbName, sTableName, sTableNameLen);
|
||||
smlHandle->tableExecHandle.createTblReq.ctb.stbName[sTableNameLen] = 0;
|
||||
|
||||
STableDataBlocks* pDataBlock = NULL;
|
||||
ret = insGetDataBlockFromList(smlHandle->pBlockHash, &pTableMeta->uid, sizeof(pTableMeta->uid),
|
||||
TSDB_DEFAULT_PAYLOAD_SIZE, sizeof(SSubmitBlk), getTableInfo(pTableMeta).rowSize,
|
||||
pTableMeta, &pDataBlock, NULL, &smlHandle->tableExecHandle.createTblReq);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
buildInvalidOperationMsg(&pBuf, "create data block error");
|
||||
return ret;
|
||||
}
|
||||
|
||||
SSchema* pSchema = getTableColumnSchema(pTableMeta);
|
||||
|
||||
ret = smlBoundColumnData(colsSchema, &pDataBlock->boundColumnInfo, pSchema, false);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
buildInvalidOperationMsg(&pBuf, "bound cols error");
|
||||
return ret;
|
||||
}
|
||||
int32_t extendedRowSize = insGetExtendedRowSize(pDataBlock);
|
||||
SParsedDataColInfo* spd = &pDataBlock->boundColumnInfo;
|
||||
SRowBuilder* pBuilder = &pDataBlock->rowBuilder;
|
||||
SMemParam param = {.rb = pBuilder};
|
||||
|
||||
insInitRowBuilder(&pDataBlock->rowBuilder, pDataBlock->pTableMeta->sversion, &pDataBlock->boundColumnInfo);
|
||||
|
||||
int32_t rowNum = taosArrayGetSize(cols);
|
||||
if (rowNum <= 0) {
|
||||
return buildInvalidOperationMsg(&pBuf, "cols size <= 0");
|
||||
}
|
||||
ret = insAllocateMemForSize(pDataBlock, extendedRowSize * rowNum);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
buildInvalidOperationMsg(&pBuf, "allocate memory error");
|
||||
return ret;
|
||||
}
|
||||
for (int32_t r = 0; r < rowNum; ++r) {
|
||||
STSRow* row = (STSRow*)(pDataBlock->pData + pDataBlock->size); // skip the SSubmitBlk header
|
||||
tdSRowResetBuf(pBuilder, row);
|
||||
void* rowData = taosArrayGetP(cols, r);
|
||||
size_t rowDataSize = 0;
|
||||
if (format) {
|
||||
rowDataSize = taosArrayGetSize(rowData);
|
||||
}
|
||||
|
||||
// 1. set the parsed value from sql string
|
||||
for (int c = 0, j = 0; c < spd->numOfBound; ++c) {
|
||||
SSchema* pColSchema = &pSchema[spd->boundColumns[c]];
|
||||
|
||||
param.schema = pColSchema;
|
||||
insGetSTSRowAppendInfo(pBuilder->rowType, spd, c, ¶m.toffset, ¶m.colIdx);
|
||||
|
||||
SSmlKv* kv = NULL;
|
||||
if (format) {
|
||||
if (j < rowDataSize) {
|
||||
kv = taosArrayGetP(rowData, j);
|
||||
if (rowDataSize != spd->numOfBound && j != 0 &&
|
||||
(kv->keyLen != strlen(pColSchema->name) || strncmp(kv->key, pColSchema->name, kv->keyLen) != 0)) {
|
||||
kv = NULL;
|
||||
} else {
|
||||
j++;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
void** p = taosHashGet(rowData, pColSchema->name, strlen(pColSchema->name));
|
||||
if (p) kv = *p;
|
||||
}
|
||||
|
||||
if (kv) {
|
||||
int32_t colLen = kv->length;
|
||||
if (pColSchema->type == TSDB_DATA_TYPE_TIMESTAMP) {
|
||||
// uError("SML:data before:%" PRId64 ", precision:%d", kv->i, pTableMeta->tableInfo.precision);
|
||||
kv->i = convertTimePrecision(kv->i, TSDB_TIME_PRECISION_NANO, pTableMeta->tableInfo.precision);
|
||||
// uError("SML:data after:%" PRId64 ", precision:%d", kv->i, pTableMeta->tableInfo.precision);
|
||||
}
|
||||
|
||||
if (IS_VAR_DATA_TYPE(kv->type)) {
|
||||
insMemRowAppend(&pBuf, kv->value, colLen, ¶m);
|
||||
} else {
|
||||
insMemRowAppend(&pBuf, &(kv->value), colLen, ¶m);
|
||||
}
|
||||
} else {
|
||||
pBuilder->hasNone = true;
|
||||
}
|
||||
|
||||
if (PRIMARYKEY_TIMESTAMP_COL_ID == pColSchema->colId) {
|
||||
TSKEY tsKey = TD_ROW_KEY(row);
|
||||
insCheckTimestamp(pDataBlock, (const char*)&tsKey);
|
||||
}
|
||||
}
|
||||
|
||||
// set the null value for the columns that do not assign values
|
||||
if ((spd->numOfBound < spd->numOfCols) && TD_IS_TP_ROW(row)) {
|
||||
pBuilder->hasNone = true;
|
||||
}
|
||||
|
||||
tdSRowEnd(pBuilder);
|
||||
pDataBlock->size += extendedRowSize;
|
||||
}
|
||||
|
||||
SSubmitBlk* pBlocks = (SSubmitBlk*)(pDataBlock->pData);
|
||||
if (TSDB_CODE_SUCCESS != insSetBlockInfo(pBlocks, pDataBlock, rowNum)) {
|
||||
return buildInvalidOperationMsg(&pBuf, "too many rows in sql, total number of rows should be less than INT32_MAX");
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
void* smlInitHandle(SQuery* pQuery) {
|
||||
SSmlExecHandle* handle = taosMemoryCalloc(1, sizeof(SSmlExecHandle));
|
||||
if (!handle) return NULL;
|
||||
handle->pBlockHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
|
||||
handle->pQuery = pQuery;
|
||||
|
||||
return handle;
|
||||
}
|
||||
|
||||
void smlDestroyHandle(void* pHandle) {
|
||||
if (!pHandle) return;
|
||||
SSmlExecHandle* handle = (SSmlExecHandle*)pHandle;
|
||||
insDestroyBlockHashmap(handle->pBlockHash);
|
||||
smlDestroyTableHandle(&handle->tableExecHandle);
|
||||
taosMemoryFree(handle);
|
||||
}
|
||||
|
||||
int32_t smlBuildOutput(void* handle, SHashObj* pVgHash) {
|
||||
SSmlExecHandle* smlHandle = (SSmlExecHandle*)handle;
|
||||
return qBuildStmtOutput(smlHandle->pQuery, pVgHash, smlHandle->pBlockHash);
|
||||
}
|
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,486 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "os.h"
|
||||
#include "parInsertUtil.h"
|
||||
#include "parInt.h"
|
||||
#include "parToken.h"
|
||||
#include "query.h"
|
||||
#include "tglobal.h"
|
||||
#include "ttime.h"
|
||||
#include "ttypes.h"
|
||||
|
||||
typedef struct SKvParam {
|
||||
int16_t pos;
|
||||
SArray* pTagVals;
|
||||
SSchema* schema;
|
||||
char buf[TSDB_MAX_TAGS_LEN];
|
||||
} SKvParam;
|
||||
|
||||
int32_t qBuildStmtOutput(SQuery* pQuery, SHashObj* pVgHash, SHashObj* pBlockHash) {
|
||||
SVnodeModifOpStmt* modifyNode = (SVnodeModifOpStmt*)pQuery->pRoot;
|
||||
int32_t code = 0;
|
||||
SInsertParseContext insertCtx = {
|
||||
.pVgroupsHashObj = pVgHash,
|
||||
.pTableBlockHashObj = pBlockHash,
|
||||
.pOutput = (SVnodeModifOpStmt*)pQuery->pRoot,
|
||||
};
|
||||
|
||||
// merge according to vgId
|
||||
if (taosHashGetSize(insertCtx.pTableBlockHashObj) > 0) {
|
||||
CHECK_CODE(
|
||||
insMergeTableDataBlocks(insertCtx.pTableBlockHashObj, modifyNode->payloadType, &insertCtx.pVgDataBlocks));
|
||||
}
|
||||
|
||||
CHECK_CODE(insBuildOutput(&insertCtx));
|
||||
|
||||
insDestroyBlockArrayList(insertCtx.pVgDataBlocks);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, const char* sTableName, char* tName,
|
||||
TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen) {
|
||||
STableDataBlocks* pDataBlock = (STableDataBlocks*)pBlock;
|
||||
SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen};
|
||||
SParsedDataColInfo* tags = (SParsedDataColInfo*)boundTags;
|
||||
if (NULL == tags) {
|
||||
return TSDB_CODE_QRY_APP_ERROR;
|
||||
}
|
||||
|
||||
SArray* pTagArray = taosArrayInit(tags->numOfBound, sizeof(STagVal));
|
||||
if (!pTagArray) {
|
||||
return buildInvalidOperationMsg(&pBuf, "out of memory");
|
||||
}
|
||||
|
||||
SArray* tagName = taosArrayInit(8, TSDB_COL_NAME_LEN);
|
||||
if (!tagName) {
|
||||
return buildInvalidOperationMsg(&pBuf, "out of memory");
|
||||
}
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SSchema* pSchema = getTableTagSchema(pDataBlock->pTableMeta);
|
||||
|
||||
bool isJson = false;
|
||||
STag* pTag = NULL;
|
||||
|
||||
for (int c = 0; c < tags->numOfBound; ++c) {
|
||||
if (bind[c].is_null && bind[c].is_null[0]) {
|
||||
continue;
|
||||
}
|
||||
|
||||
SSchema* pTagSchema = &pSchema[tags->boundColumns[c]];
|
||||
int32_t colLen = pTagSchema->bytes;
|
||||
if (IS_VAR_DATA_TYPE(pTagSchema->type)) {
|
||||
colLen = bind[c].length[0];
|
||||
}
|
||||
taosArrayPush(tagName, pTagSchema->name);
|
||||
if (pTagSchema->type == TSDB_DATA_TYPE_JSON) {
|
||||
if (colLen > (TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
|
||||
code = buildSyntaxErrMsg(&pBuf, "json string too long than 4095", bind[c].buffer);
|
||||
goto end;
|
||||
}
|
||||
|
||||
isJson = true;
|
||||
char* tmp = taosMemoryCalloc(1, colLen + 1);
|
||||
memcpy(tmp, bind[c].buffer, colLen);
|
||||
code = parseJsontoTagData(tmp, pTagArray, &pTag, &pBuf);
|
||||
taosMemoryFree(tmp);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto end;
|
||||
}
|
||||
} else {
|
||||
STagVal val = {.cid = pTagSchema->colId, .type = pTagSchema->type};
|
||||
// strcpy(val.colName, pTagSchema->name);
|
||||
if (pTagSchema->type == TSDB_DATA_TYPE_BINARY) {
|
||||
val.pData = (uint8_t*)bind[c].buffer;
|
||||
val.nData = colLen;
|
||||
} else if (pTagSchema->type == TSDB_DATA_TYPE_NCHAR) {
|
||||
int32_t output = 0;
|
||||
void* p = taosMemoryCalloc(1, colLen * TSDB_NCHAR_SIZE);
|
||||
if (p == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto end;
|
||||
}
|
||||
if (!taosMbsToUcs4(bind[c].buffer, colLen, (TdUcs4*)(p), colLen * TSDB_NCHAR_SIZE, &output)) {
|
||||
if (errno == E2BIG) {
|
||||
taosMemoryFree(p);
|
||||
code = generateSyntaxErrMsg(&pBuf, TSDB_CODE_PAR_VALUE_TOO_LONG, pTagSchema->name);
|
||||
goto end;
|
||||
}
|
||||
char buf[512] = {0};
|
||||
snprintf(buf, tListLen(buf), " taosMbsToUcs4 error:%s", strerror(errno));
|
||||
taosMemoryFree(p);
|
||||
code = buildSyntaxErrMsg(&pBuf, buf, bind[c].buffer);
|
||||
goto end;
|
||||
}
|
||||
val.pData = p;
|
||||
val.nData = output;
|
||||
} else {
|
||||
memcpy(&val.i64, bind[c].buffer, colLen);
|
||||
}
|
||||
taosArrayPush(pTagArray, &val);
|
||||
}
|
||||
}
|
||||
|
||||
if (!isJson && (code = tTagNew(pTagArray, 1, false, &pTag)) != TSDB_CODE_SUCCESS) {
|
||||
goto end;
|
||||
}
|
||||
|
||||
SVCreateTbReq tbReq = {0};
|
||||
insBuildCreateTbReq(&tbReq, tName, pTag, suid, sTableName, tagName, pDataBlock->pTableMeta->tableInfo.numOfTags);
|
||||
code = insBuildCreateTbMsg(pDataBlock, &tbReq);
|
||||
tdDestroySVCreateTbReq(&tbReq);
|
||||
|
||||
end:
|
||||
for (int i = 0; i < taosArrayGetSize(pTagArray); ++i) {
|
||||
STagVal* p = (STagVal*)taosArrayGet(pTagArray, i);
|
||||
if (p->type == TSDB_DATA_TYPE_NCHAR) {
|
||||
taosMemoryFreeClear(p->pData);
|
||||
}
|
||||
}
|
||||
taosArrayDestroy(pTagArray);
|
||||
taosArrayDestroy(tagName);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t qBindStmtColsValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen) {
|
||||
STableDataBlocks* pDataBlock = (STableDataBlocks*)pBlock;
|
||||
SSchema* pSchema = getTableColumnSchema(pDataBlock->pTableMeta);
|
||||
int32_t extendedRowSize = insGetExtendedRowSize(pDataBlock);
|
||||
SParsedDataColInfo* spd = &pDataBlock->boundColumnInfo;
|
||||
SRowBuilder* pBuilder = &pDataBlock->rowBuilder;
|
||||
SMemParam param = {.rb = pBuilder};
|
||||
SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen};
|
||||
int32_t rowNum = bind->num;
|
||||
|
||||
CHECK_CODE(
|
||||
insInitRowBuilder(&pDataBlock->rowBuilder, pDataBlock->pTableMeta->sversion, &pDataBlock->boundColumnInfo));
|
||||
|
||||
CHECK_CODE(insAllocateMemForSize(pDataBlock, extendedRowSize * bind->num));
|
||||
|
||||
for (int32_t r = 0; r < bind->num; ++r) {
|
||||
STSRow* row = (STSRow*)(pDataBlock->pData + pDataBlock->size); // skip the SSubmitBlk header
|
||||
tdSRowResetBuf(pBuilder, row);
|
||||
|
||||
for (int c = 0; c < spd->numOfBound; ++c) {
|
||||
SSchema* pColSchema = &pSchema[spd->boundColumns[c]];
|
||||
|
||||
if (bind[c].num != rowNum) {
|
||||
return buildInvalidOperationMsg(&pBuf, "row number in each bind param should be the same");
|
||||
}
|
||||
|
||||
param.schema = pColSchema;
|
||||
insGetSTSRowAppendInfo(pBuilder->rowType, spd, c, ¶m.toffset, ¶m.colIdx);
|
||||
|
||||
if (bind[c].is_null && bind[c].is_null[r]) {
|
||||
if (pColSchema->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
|
||||
return buildInvalidOperationMsg(&pBuf, "primary timestamp should not be NULL");
|
||||
}
|
||||
|
||||
CHECK_CODE(insMemRowAppend(&pBuf, NULL, 0, ¶m));
|
||||
} else {
|
||||
if (bind[c].buffer_type != pColSchema->type) {
|
||||
return buildInvalidOperationMsg(&pBuf, "column type mis-match with buffer type");
|
||||
}
|
||||
|
||||
int32_t colLen = pColSchema->bytes;
|
||||
if (IS_VAR_DATA_TYPE(pColSchema->type)) {
|
||||
colLen = bind[c].length[r];
|
||||
}
|
||||
|
||||
CHECK_CODE(insMemRowAppend(&pBuf, (char*)bind[c].buffer + bind[c].buffer_length * r, colLen, ¶m));
|
||||
}
|
||||
|
||||
if (PRIMARYKEY_TIMESTAMP_COL_ID == pColSchema->colId) {
|
||||
TSKEY tsKey = TD_ROW_KEY(row);
|
||||
insCheckTimestamp(pDataBlock, (const char*)&tsKey);
|
||||
}
|
||||
}
|
||||
// set the null value for the columns that do not assign values
|
||||
if ((spd->numOfBound < spd->numOfCols) && TD_IS_TP_ROW(row)) {
|
||||
pBuilder->hasNone = true;
|
||||
}
|
||||
tdSRowEnd(pBuilder);
|
||||
#ifdef TD_DEBUG_PRINT_ROW
|
||||
STSchema* pSTSchema = tdGetSTSChemaFromSSChema(pSchema, spd->numOfCols, 1);
|
||||
tdSRowPrint(row, pSTSchema, __func__);
|
||||
taosMemoryFree(pSTSchema);
|
||||
#endif
|
||||
pDataBlock->size += extendedRowSize;
|
||||
}
|
||||
|
||||
SSubmitBlk* pBlocks = (SSubmitBlk*)(pDataBlock->pData);
|
||||
if (TSDB_CODE_SUCCESS != insSetBlockInfo(pBlocks, pDataBlock, bind->num)) {
|
||||
return buildInvalidOperationMsg(&pBuf, "too many rows in sql, total number of rows should be less than INT32_MAX");
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qBindStmtSingleColValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen, int32_t colIdx,
|
||||
int32_t rowNum) {
|
||||
STableDataBlocks* pDataBlock = (STableDataBlocks*)pBlock;
|
||||
SSchema* pSchema = getTableColumnSchema(pDataBlock->pTableMeta);
|
||||
int32_t extendedRowSize = insGetExtendedRowSize(pDataBlock);
|
||||
SParsedDataColInfo* spd = &pDataBlock->boundColumnInfo;
|
||||
SRowBuilder* pBuilder = &pDataBlock->rowBuilder;
|
||||
SMemParam param = {.rb = pBuilder};
|
||||
SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen};
|
||||
bool rowStart = (0 == colIdx);
|
||||
bool rowEnd = ((colIdx + 1) == spd->numOfBound);
|
||||
|
||||
if (rowStart) {
|
||||
CHECK_CODE(
|
||||
insInitRowBuilder(&pDataBlock->rowBuilder, pDataBlock->pTableMeta->sversion, &pDataBlock->boundColumnInfo));
|
||||
CHECK_CODE(insAllocateMemForSize(pDataBlock, extendedRowSize * bind->num));
|
||||
}
|
||||
|
||||
for (int32_t r = 0; r < bind->num; ++r) {
|
||||
STSRow* row = (STSRow*)(pDataBlock->pData + pDataBlock->size + extendedRowSize * r); // skip the SSubmitBlk header
|
||||
if (rowStart) {
|
||||
tdSRowResetBuf(pBuilder, row);
|
||||
} else {
|
||||
tdSRowGetBuf(pBuilder, row);
|
||||
}
|
||||
|
||||
SSchema* pColSchema = &pSchema[spd->boundColumns[colIdx]];
|
||||
|
||||
if (bind->num != rowNum) {
|
||||
return buildInvalidOperationMsg(&pBuf, "row number in each bind param should be the same");
|
||||
}
|
||||
|
||||
param.schema = pColSchema;
|
||||
insGetSTSRowAppendInfo(pBuilder->rowType, spd, colIdx, ¶m.toffset, ¶m.colIdx);
|
||||
|
||||
if (bind->is_null && bind->is_null[r]) {
|
||||
if (pColSchema->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
|
||||
return buildInvalidOperationMsg(&pBuf, "primary timestamp should not be NULL");
|
||||
}
|
||||
|
||||
CHECK_CODE(insMemRowAppend(&pBuf, NULL, 0, ¶m));
|
||||
} else {
|
||||
if (bind->buffer_type != pColSchema->type) {
|
||||
return buildInvalidOperationMsg(&pBuf, "column type mis-match with buffer type");
|
||||
}
|
||||
|
||||
int32_t colLen = pColSchema->bytes;
|
||||
if (IS_VAR_DATA_TYPE(pColSchema->type)) {
|
||||
colLen = bind->length[r];
|
||||
}
|
||||
|
||||
CHECK_CODE(insMemRowAppend(&pBuf, (char*)bind->buffer + bind->buffer_length * r, colLen, ¶m));
|
||||
}
|
||||
|
||||
if (PRIMARYKEY_TIMESTAMP_COL_ID == pColSchema->colId) {
|
||||
TSKEY tsKey = TD_ROW_KEY(row);
|
||||
insCheckTimestamp(pDataBlock, (const char*)&tsKey);
|
||||
}
|
||||
|
||||
// set the null value for the columns that do not assign values
|
||||
if (rowEnd && (spd->numOfBound < spd->numOfCols) && TD_IS_TP_ROW(row)) {
|
||||
pBuilder->hasNone = true;
|
||||
}
|
||||
if (rowEnd) {
|
||||
tdSRowEnd(pBuilder);
|
||||
}
|
||||
#ifdef TD_DEBUG_PRINT_ROW
|
||||
if (rowEnd) {
|
||||
STSchema* pSTSchema = tdGetSTSChemaFromSSChema(pSchema, spd->numOfCols, 1);
|
||||
tdSRowPrint(row, pSTSchema, __func__);
|
||||
taosMemoryFree(pSTSchema);
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
if (rowEnd) {
|
||||
pDataBlock->size += extendedRowSize * bind->num;
|
||||
|
||||
SSubmitBlk* pBlocks = (SSubmitBlk*)(pDataBlock->pData);
|
||||
if (TSDB_CODE_SUCCESS != insSetBlockInfo(pBlocks, pDataBlock, bind->num)) {
|
||||
return buildInvalidOperationMsg(&pBuf,
|
||||
"too many rows in sql, total number of rows should be less than INT32_MAX");
|
||||
}
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t buildBoundFields(SParsedDataColInfo* boundInfo, SSchema* pSchema, int32_t* fieldNum, TAOS_FIELD_E** fields,
|
||||
uint8_t timePrec) {
|
||||
if (fields) {
|
||||
*fields = taosMemoryCalloc(boundInfo->numOfBound, sizeof(TAOS_FIELD));
|
||||
if (NULL == *fields) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
SSchema* schema = &pSchema[boundInfo->boundColumns[0]];
|
||||
if (TSDB_DATA_TYPE_TIMESTAMP == schema->type) {
|
||||
(*fields)[0].precision = timePrec;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < boundInfo->numOfBound; ++i) {
|
||||
schema = &pSchema[boundInfo->boundColumns[i]];
|
||||
strcpy((*fields)[i].name, schema->name);
|
||||
(*fields)[i].type = schema->type;
|
||||
(*fields)[i].bytes = schema->bytes;
|
||||
}
|
||||
}
|
||||
|
||||
*fieldNum = boundInfo->numOfBound;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qBuildStmtTagFields(void* pBlock, void* boundTags, int32_t* fieldNum, TAOS_FIELD_E** fields) {
|
||||
STableDataBlocks* pDataBlock = (STableDataBlocks*)pBlock;
|
||||
SParsedDataColInfo* tags = (SParsedDataColInfo*)boundTags;
|
||||
if (NULL == tags) {
|
||||
return TSDB_CODE_QRY_APP_ERROR;
|
||||
}
|
||||
|
||||
if (pDataBlock->pTableMeta->tableType != TSDB_SUPER_TABLE && pDataBlock->pTableMeta->tableType != TSDB_CHILD_TABLE) {
|
||||
return TSDB_CODE_TSC_STMT_API_ERROR;
|
||||
}
|
||||
|
||||
SSchema* pSchema = getTableTagSchema(pDataBlock->pTableMeta);
|
||||
if (tags->numOfBound <= 0) {
|
||||
*fieldNum = 0;
|
||||
*fields = NULL;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
CHECK_CODE(buildBoundFields(tags, pSchema, fieldNum, fields, 0));
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qBuildStmtColFields(void* pBlock, int32_t* fieldNum, TAOS_FIELD_E** fields) {
|
||||
STableDataBlocks* pDataBlock = (STableDataBlocks*)pBlock;
|
||||
SSchema* pSchema = getTableColumnSchema(pDataBlock->pTableMeta);
|
||||
if (pDataBlock->boundColumnInfo.numOfBound <= 0) {
|
||||
*fieldNum = 0;
|
||||
if (fields) {
|
||||
*fields = NULL;
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
CHECK_CODE(buildBoundFields(&pDataBlock->boundColumnInfo, pSchema, fieldNum, fields,
|
||||
pDataBlock->pTableMeta->tableInfo.precision));
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qResetStmtDataBlock(void* block, bool keepBuf) {
|
||||
STableDataBlocks* pBlock = (STableDataBlocks*)block;
|
||||
|
||||
if (keepBuf) {
|
||||
taosMemoryFreeClear(pBlock->pData);
|
||||
pBlock->pData = taosMemoryMalloc(TSDB_PAYLOAD_SIZE);
|
||||
if (NULL == pBlock->pData) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
memset(pBlock->pData, 0, sizeof(SSubmitBlk));
|
||||
} else {
|
||||
pBlock->pData = NULL;
|
||||
}
|
||||
|
||||
pBlock->ordered = true;
|
||||
pBlock->prevTS = INT64_MIN;
|
||||
pBlock->size = sizeof(SSubmitBlk);
|
||||
pBlock->tsSource = -1;
|
||||
pBlock->numOfTables = 1;
|
||||
pBlock->nAllocSize = TSDB_PAYLOAD_SIZE;
|
||||
pBlock->headerSize = pBlock->size;
|
||||
pBlock->createTbReqLen = 0;
|
||||
|
||||
memset(&pBlock->rowBuilder, 0, sizeof(pBlock->rowBuilder));
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qCloneStmtDataBlock(void** pDst, void* pSrc) {
|
||||
*pDst = taosMemoryMalloc(sizeof(STableDataBlocks));
|
||||
if (NULL == *pDst) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
memcpy(*pDst, pSrc, sizeof(STableDataBlocks));
|
||||
((STableDataBlocks*)(*pDst))->cloned = true;
|
||||
|
||||
STableDataBlocks* pBlock = (STableDataBlocks*)(*pDst);
|
||||
if (pBlock->pTableMeta) {
|
||||
void* pNewMeta = taosMemoryMalloc(TABLE_META_SIZE(pBlock->pTableMeta));
|
||||
if (NULL == pNewMeta) {
|
||||
taosMemoryFreeClear(*pDst);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
memcpy(pNewMeta, pBlock->pTableMeta, TABLE_META_SIZE(pBlock->pTableMeta));
|
||||
pBlock->pTableMeta = pNewMeta;
|
||||
}
|
||||
|
||||
return qResetStmtDataBlock(*pDst, false);
|
||||
}
|
||||
|
||||
int32_t qRebuildStmtDataBlock(void** pDst, void* pSrc, uint64_t uid, int32_t vgId) {
|
||||
int32_t code = qCloneStmtDataBlock(pDst, pSrc);
|
||||
if (code) {
|
||||
return code;
|
||||
}
|
||||
|
||||
STableDataBlocks* pBlock = (STableDataBlocks*)*pDst;
|
||||
pBlock->pData = taosMemoryMalloc(pBlock->nAllocSize);
|
||||
if (NULL == pBlock->pData) {
|
||||
qFreeStmtDataBlock(pBlock);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
pBlock->vgId = vgId;
|
||||
|
||||
if (pBlock->pTableMeta) {
|
||||
pBlock->pTableMeta->uid = uid;
|
||||
pBlock->pTableMeta->vgId = vgId;
|
||||
}
|
||||
|
||||
memset(pBlock->pData, 0, sizeof(SSubmitBlk));
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
STableMeta* qGetTableMetaInDataBlock(void* pDataBlock) { return ((STableDataBlocks*)pDataBlock)->pTableMeta; }
|
||||
|
||||
void qFreeStmtDataBlock(void* pDataBlock) {
|
||||
if (pDataBlock == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
taosMemoryFreeClear(((STableDataBlocks*)pDataBlock)->pTableMeta);
|
||||
taosMemoryFreeClear(((STableDataBlocks*)pDataBlock)->pData);
|
||||
taosMemoryFreeClear(pDataBlock);
|
||||
}
|
||||
|
||||
void qDestroyStmtDataBlock(void* pBlock) {
|
||||
if (pBlock == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
STableDataBlocks* pDataBlock = (STableDataBlocks*)pBlock;
|
||||
|
||||
pDataBlock->cloned = false;
|
||||
insDestroyDataBlock(pDataBlock);
|
||||
}
|
|
@ -13,7 +13,7 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "parInsertData.h"
|
||||
#include "parInsertUtil.h"
|
||||
|
||||
#include "catalog.h"
|
||||
#include "parInt.h"
|
||||
|
@ -81,7 +81,53 @@ static int32_t rowDataComparStable(const void* lhs, const void* rhs) {
|
|||
}
|
||||
}
|
||||
|
||||
void setBoundColumnInfo(SParsedDataColInfo* pColList, SSchema* pSchema, col_id_t numOfCols) {
|
||||
int32_t insGetExtendedRowSize(STableDataBlocks* pBlock) {
|
||||
STableComInfo* pTableInfo = &pBlock->pTableMeta->tableInfo;
|
||||
ASSERT(pBlock->rowSize == pTableInfo->rowSize);
|
||||
return pBlock->rowSize + TD_ROW_HEAD_LEN - sizeof(TSKEY) + pBlock->boundColumnInfo.extendedVarLen +
|
||||
(int32_t)TD_BITMAP_BYTES(pTableInfo->numOfColumns - 1);
|
||||
}
|
||||
|
||||
void insGetSTSRowAppendInfo(uint8_t rowType, SParsedDataColInfo* spd, col_id_t idx, int32_t* toffset,
|
||||
col_id_t* colIdx) {
|
||||
col_id_t schemaIdx = 0;
|
||||
if (IS_DATA_COL_ORDERED(spd)) {
|
||||
schemaIdx = spd->boundColumns[idx];
|
||||
if (TD_IS_TP_ROW_T(rowType)) {
|
||||
*toffset = (spd->cols + schemaIdx)->toffset; // the offset of firstPart
|
||||
*colIdx = schemaIdx;
|
||||
} else {
|
||||
*toffset = idx * sizeof(SKvRowIdx); // the offset of SKvRowIdx
|
||||
*colIdx = idx;
|
||||
}
|
||||
} else {
|
||||
ASSERT(idx == (spd->colIdxInfo + idx)->boundIdx);
|
||||
schemaIdx = (spd->colIdxInfo + idx)->schemaColIdx;
|
||||
if (TD_IS_TP_ROW_T(rowType)) {
|
||||
*toffset = (spd->cols + schemaIdx)->toffset;
|
||||
*colIdx = schemaIdx;
|
||||
} else {
|
||||
*toffset = ((spd->colIdxInfo + idx)->finalIdx) * sizeof(SKvRowIdx);
|
||||
*colIdx = (spd->colIdxInfo + idx)->finalIdx;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
int32_t insSetBlockInfo(SSubmitBlk* pBlocks, STableDataBlocks* dataBuf, int32_t numOfRows) {
|
||||
pBlocks->suid = (TSDB_NORMAL_TABLE == dataBuf->pTableMeta->tableType ? 0 : dataBuf->pTableMeta->suid);
|
||||
pBlocks->uid = dataBuf->pTableMeta->uid;
|
||||
pBlocks->sversion = dataBuf->pTableMeta->sversion;
|
||||
pBlocks->schemaLen = dataBuf->createTbReqLen;
|
||||
|
||||
if (pBlocks->numOfRows + numOfRows >= INT32_MAX) {
|
||||
return TSDB_CODE_TSC_INVALID_OPERATION;
|
||||
} else {
|
||||
pBlocks->numOfRows += numOfRows;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
}
|
||||
|
||||
void insSetBoundColumnInfo(SParsedDataColInfo* pColList, SSchema* pSchema, col_id_t numOfCols) {
|
||||
pColList->numOfCols = numOfCols;
|
||||
pColList->numOfBound = numOfCols;
|
||||
pColList->orderStatus = ORDER_STATUS_ORDERED; // default is ORDERED for non-bound mode
|
||||
|
@ -118,7 +164,7 @@ void setBoundColumnInfo(SParsedDataColInfo* pColList, SSchema* pSchema, col_id_t
|
|||
pColList->extendedVarLen = (uint16_t)(nVar * sizeof(VarDataOffsetT));
|
||||
}
|
||||
|
||||
int32_t schemaIdxCompar(const void* lhs, const void* rhs) {
|
||||
int32_t insSchemaIdxCompar(const void* lhs, const void* rhs) {
|
||||
uint16_t left = *(uint16_t*)lhs;
|
||||
uint16_t right = *(uint16_t*)rhs;
|
||||
|
||||
|
@ -129,7 +175,7 @@ int32_t schemaIdxCompar(const void* lhs, const void* rhs) {
|
|||
}
|
||||
}
|
||||
|
||||
int32_t boundIdxCompar(const void* lhs, const void* rhs) {
|
||||
int32_t insBoundIdxCompar(const void* lhs, const void* rhs) {
|
||||
uint16_t left = *(uint16_t*)POINTER_SHIFT(lhs, sizeof(uint16_t));
|
||||
uint16_t right = *(uint16_t*)POINTER_SHIFT(rhs, sizeof(uint16_t));
|
||||
|
||||
|
@ -178,7 +224,7 @@ static int32_t createDataBlock(size_t defaultSize, int32_t rowSize, int32_t star
|
|||
|
||||
SParsedDataColInfo* pColInfo = &dataBuf->boundColumnInfo;
|
||||
SSchema* pSchema = getTableColumnSchema(dataBuf->pTableMeta);
|
||||
setBoundColumnInfo(pColInfo, pSchema, dataBuf->pTableMeta->tableInfo.numOfColumns);
|
||||
insSetBoundColumnInfo(pColInfo, pSchema, dataBuf->pTableMeta->tableInfo.numOfColumns);
|
||||
|
||||
dataBuf->ordered = true;
|
||||
dataBuf->prevTS = INT64_MIN;
|
||||
|
@ -192,7 +238,7 @@ static int32_t createDataBlock(size_t defaultSize, int32_t rowSize, int32_t star
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t buildCreateTbMsg(STableDataBlocks* pBlocks, SVCreateTbReq* pCreateTbReq) {
|
||||
int32_t insBuildCreateTbMsg(STableDataBlocks* pBlocks, SVCreateTbReq* pCreateTbReq) {
|
||||
SEncoder coder = {0};
|
||||
char* pBuf;
|
||||
int32_t len;
|
||||
|
@ -222,7 +268,7 @@ int32_t buildCreateTbMsg(STableDataBlocks* pBlocks, SVCreateTbReq* pCreateTbReq)
|
|||
return code;
|
||||
}
|
||||
|
||||
static void destroyDataBlock(STableDataBlocks* pDataBlock) {
|
||||
void insDestroyDataBlock(STableDataBlocks* pDataBlock) {
|
||||
if (pDataBlock == NULL) {
|
||||
return;
|
||||
}
|
||||
|
@ -237,9 +283,9 @@ static void destroyDataBlock(STableDataBlocks* pDataBlock) {
|
|||
taosMemoryFreeClear(pDataBlock);
|
||||
}
|
||||
|
||||
int32_t getDataBlockFromList(SHashObj* pHashList, void* id, int32_t idLen, int32_t size, int32_t startOffset,
|
||||
int32_t rowSize, STableMeta* pTableMeta, STableDataBlocks** dataBlocks, SArray* pBlockList,
|
||||
SVCreateTbReq* pCreateTbReq) {
|
||||
int32_t insGetDataBlockFromList(SHashObj* pHashList, void* id, int32_t idLen, int32_t size, int32_t startOffset,
|
||||
int32_t rowSize, STableMeta* pTableMeta, STableDataBlocks** dataBlocks,
|
||||
SArray* pBlockList, SVCreateTbReq* pCreateTbReq) {
|
||||
*dataBlocks = NULL;
|
||||
STableDataBlocks** t1 = (STableDataBlocks**)taosHashGet(pHashList, (const char*)id, idLen);
|
||||
if (t1 != NULL) {
|
||||
|
@ -253,9 +299,9 @@ int32_t getDataBlockFromList(SHashObj* pHashList, void* id, int32_t idLen, int32
|
|||
}
|
||||
|
||||
if (NULL != pCreateTbReq && NULL != pCreateTbReq->ctb.pTag) {
|
||||
ret = buildCreateTbMsg(*dataBlocks, pCreateTbReq);
|
||||
ret = insBuildCreateTbMsg(*dataBlocks, pCreateTbReq);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
destroyDataBlock(*dataBlocks);
|
||||
insDestroyDataBlock(*dataBlocks);
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
|
@ -283,7 +329,7 @@ static int32_t getRowExpandSize(STableMeta* pTableMeta) {
|
|||
return result;
|
||||
}
|
||||
|
||||
void destroyBlockArrayList(SArray* pDataBlockList) {
|
||||
void insDestroyBlockArrayList(SArray* pDataBlockList) {
|
||||
if (pDataBlockList == NULL) {
|
||||
return;
|
||||
}
|
||||
|
@ -291,13 +337,13 @@ void destroyBlockArrayList(SArray* pDataBlockList) {
|
|||
size_t size = taosArrayGetSize(pDataBlockList);
|
||||
for (int32_t i = 0; i < size; i++) {
|
||||
void* p = taosArrayGetP(pDataBlockList, i);
|
||||
destroyDataBlock(p);
|
||||
insDestroyDataBlock(p);
|
||||
}
|
||||
|
||||
taosArrayDestroy(pDataBlockList);
|
||||
}
|
||||
|
||||
void destroyBlockHashmap(SHashObj* pDataBlockHash) {
|
||||
void insDestroyBlockHashmap(SHashObj* pDataBlockHash) {
|
||||
if (pDataBlockHash == NULL) {
|
||||
return;
|
||||
}
|
||||
|
@ -305,7 +351,7 @@ void destroyBlockHashmap(SHashObj* pDataBlockHash) {
|
|||
void** p1 = taosHashIterate(pDataBlockHash, NULL);
|
||||
while (p1) {
|
||||
STableDataBlocks* pBlocks = *p1;
|
||||
destroyDataBlock(pBlocks);
|
||||
insDestroyDataBlock(pBlocks);
|
||||
|
||||
p1 = taosHashIterate(pDataBlockHash, p1);
|
||||
}
|
||||
|
@ -375,7 +421,7 @@ static int sortRemoveDataBlockDupRows(STableDataBlocks* dataBuf, SBlockKeyInfo*
|
|||
}
|
||||
memset(pBlkKeyInfo->pKeyTuple, 0, nAlloc);
|
||||
|
||||
int32_t extendedRowSize = getExtendedRowSize(dataBuf);
|
||||
int32_t extendedRowSize = insGetExtendedRowSize(dataBuf);
|
||||
SBlockKeyTuple* pBlkKeyTuple = pBlkKeyInfo->pKeyTuple;
|
||||
char* pBlockData = pBlocks->data + pBlocks->schemaLen;
|
||||
int n = 0;
|
||||
|
@ -545,7 +591,7 @@ static int sortMergeDataBlockDupRows(STableDataBlocks* dataBuf, SBlockKeyInfo* p
|
|||
|
||||
tdResetSBlockRowMerger(*ppBlkRowMerger);
|
||||
|
||||
int32_t extendedRowSize = getExtendedRowSize(dataBuf);
|
||||
int32_t extendedRowSize = insGetExtendedRowSize(dataBuf);
|
||||
SBlockKeyTuple* pBlkKeyTuple = pBlkKeyInfo->pKeyTuple;
|
||||
char* pBlockData = pBlocks->data + pBlocks->schemaLen;
|
||||
int32_t n = 0;
|
||||
|
@ -679,7 +725,7 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, SB
|
|||
return pBlock->dataLen + pBlock->schemaLen;
|
||||
}
|
||||
|
||||
int32_t mergeTableDataBlocks(SHashObj* pHashObj, uint8_t payloadType, SArray** pVgDataBlocks) {
|
||||
int32_t insMergeTableDataBlocks(SHashObj* pHashObj, uint8_t payloadType, SArray** pVgDataBlocks) {
|
||||
const int INSERT_HEAD_SIZE = sizeof(SSubmitReq);
|
||||
int code = 0;
|
||||
bool isRawPayload = IS_RAW_PAYLOAD(payloadType);
|
||||
|
@ -696,13 +742,13 @@ int32_t mergeTableDataBlocks(SHashObj* pHashObj, uint8_t payloadType, SArray** p
|
|||
if (pBlocks->numOfRows > 0) {
|
||||
STableDataBlocks* dataBuf = NULL;
|
||||
pOneTableBlock->pTableMeta->vgId = pOneTableBlock->vgId; // for schemaless, restore origin vgId
|
||||
int32_t ret = getDataBlockFromList(pVnodeDataBlockHashList, &pOneTableBlock->vgId, sizeof(pOneTableBlock->vgId),
|
||||
TSDB_PAYLOAD_SIZE, INSERT_HEAD_SIZE, 0, pOneTableBlock->pTableMeta, &dataBuf,
|
||||
pVnodeDataBlockList, NULL);
|
||||
int32_t ret = insGetDataBlockFromList(pVnodeDataBlockHashList, &pOneTableBlock->vgId,
|
||||
sizeof(pOneTableBlock->vgId), TSDB_PAYLOAD_SIZE, INSERT_HEAD_SIZE, 0,
|
||||
pOneTableBlock->pTableMeta, &dataBuf, pVnodeDataBlockList, NULL);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
tdFreeSBlockRowMerger(pBlkRowMerger);
|
||||
taosHashCleanup(pVnodeDataBlockHashList);
|
||||
destroyBlockArrayList(pVnodeDataBlockList);
|
||||
insDestroyBlockArrayList(pVnodeDataBlockList);
|
||||
taosMemoryFreeClear(blkKeyInfo.pKeyTuple);
|
||||
return ret;
|
||||
}
|
||||
|
@ -721,7 +767,7 @@ int32_t mergeTableDataBlocks(SHashObj* pHashObj, uint8_t payloadType, SArray** p
|
|||
} else { // failed to allocate memory, free already allocated memory and return error code
|
||||
tdFreeSBlockRowMerger(pBlkRowMerger);
|
||||
taosHashCleanup(pVnodeDataBlockHashList);
|
||||
destroyBlockArrayList(pVnodeDataBlockList);
|
||||
insDestroyBlockArrayList(pVnodeDataBlockList);
|
||||
taosMemoryFreeClear(dataBuf->pData);
|
||||
taosMemoryFreeClear(blkKeyInfo.pKeyTuple);
|
||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
|
@ -734,7 +780,7 @@ int32_t mergeTableDataBlocks(SHashObj* pHashObj, uint8_t payloadType, SArray** p
|
|||
if ((code = sortMergeDataBlockDupRows(pOneTableBlock, &blkKeyInfo, &pBlkRowMerger)) != 0) {
|
||||
tdFreeSBlockRowMerger(pBlkRowMerger);
|
||||
taosHashCleanup(pVnodeDataBlockHashList);
|
||||
destroyBlockArrayList(pVnodeDataBlockList);
|
||||
insDestroyBlockArrayList(pVnodeDataBlockList);
|
||||
taosMemoryFreeClear(dataBuf->pData);
|
||||
taosMemoryFreeClear(blkKeyInfo.pKeyTuple);
|
||||
return code;
|
||||
|
@ -767,7 +813,7 @@ int32_t mergeTableDataBlocks(SHashObj* pHashObj, uint8_t payloadType, SArray** p
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t allocateMemForSize(STableDataBlocks* pDataBlock, int32_t allSize) {
|
||||
int32_t insAllocateMemForSize(STableDataBlocks* pDataBlock, int32_t allSize) {
|
||||
size_t remain = pDataBlock->nAllocSize - pDataBlock->size;
|
||||
uint32_t nAllocSizeOld = pDataBlock->nAllocSize;
|
||||
|
||||
|
@ -789,35 +835,7 @@ int32_t allocateMemForSize(STableDataBlocks* pDataBlock, int32_t allSize) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t allocateMemIfNeed(STableDataBlocks* pDataBlock, int32_t rowSize, int32_t* numOfRows) {
|
||||
size_t remain = pDataBlock->nAllocSize - pDataBlock->size;
|
||||
const int factor = 5;
|
||||
uint32_t nAllocSizeOld = pDataBlock->nAllocSize;
|
||||
|
||||
// expand the allocated size
|
||||
if (remain < rowSize * factor) {
|
||||
while (remain < rowSize * factor) {
|
||||
pDataBlock->nAllocSize = (uint32_t)(pDataBlock->nAllocSize * 1.5);
|
||||
remain = pDataBlock->nAllocSize - pDataBlock->size;
|
||||
}
|
||||
|
||||
char* tmp = taosMemoryRealloc(pDataBlock->pData, (size_t)pDataBlock->nAllocSize);
|
||||
if (tmp != NULL) {
|
||||
pDataBlock->pData = tmp;
|
||||
memset(pDataBlock->pData + pDataBlock->size, 0, pDataBlock->nAllocSize - pDataBlock->size);
|
||||
} else {
|
||||
// do nothing, if allocate more memory failed
|
||||
pDataBlock->nAllocSize = nAllocSizeOld;
|
||||
*numOfRows = (int32_t)(pDataBlock->nAllocSize - pDataBlock->headerSize) / rowSize;
|
||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
|
||||
*numOfRows = (int32_t)(pDataBlock->nAllocSize - pDataBlock->headerSize) / rowSize;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int initRowBuilder(SRowBuilder* pBuilder, int16_t schemaVer, SParsedDataColInfo* pColInfo) {
|
||||
int32_t insInitRowBuilder(SRowBuilder* pBuilder, int16_t schemaVer, SParsedDataColInfo* pColInfo) {
|
||||
ASSERT(pColInfo->numOfCols > 0 && (pColInfo->numOfBound <= pColInfo->numOfCols));
|
||||
tdSRowInit(pBuilder, schemaVer);
|
||||
tdSRowSetExtendedInfo(pBuilder, pColInfo->numOfCols, pColInfo->numOfBound, pColInfo->flen, pColInfo->allNullLen,
|
||||
|
@ -825,101 +843,218 @@ int initRowBuilder(SRowBuilder* pBuilder, int16_t schemaVer, SParsedDataColInfo*
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qResetStmtDataBlock(void* block, bool keepBuf) {
|
||||
STableDataBlocks* pBlock = (STableDataBlocks*)block;
|
||||
static char* tableNameGetPosition(SToken* pToken, char target) {
|
||||
bool inEscape = false;
|
||||
bool inQuote = false;
|
||||
char quotaStr = 0;
|
||||
|
||||
if (keepBuf) {
|
||||
taosMemoryFreeClear(pBlock->pData);
|
||||
pBlock->pData = taosMemoryMalloc(TSDB_PAYLOAD_SIZE);
|
||||
if (NULL == pBlock->pData) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
for (uint32_t i = 0; i < pToken->n; ++i) {
|
||||
if (*(pToken->z + i) == target && (!inEscape) && (!inQuote)) {
|
||||
return pToken->z + i;
|
||||
}
|
||||
memset(pBlock->pData, 0, sizeof(SSubmitBlk));
|
||||
|
||||
if (*(pToken->z + i) == TS_ESCAPE_CHAR) {
|
||||
if (!inQuote) {
|
||||
inEscape = !inEscape;
|
||||
}
|
||||
}
|
||||
|
||||
if (*(pToken->z + i) == '\'' || *(pToken->z + i) == '"') {
|
||||
if (!inEscape) {
|
||||
if (!inQuote) {
|
||||
quotaStr = *(pToken->z + i);
|
||||
inQuote = !inQuote;
|
||||
} else if (quotaStr == *(pToken->z + i)) {
|
||||
inQuote = !inQuote;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int32_t insCreateSName(SName* pName, SToken* pTableName, int32_t acctId, const char* dbName, SMsgBuf* pMsgBuf) {
|
||||
const char* msg1 = "name too long";
|
||||
const char* msg2 = "invalid database name";
|
||||
const char* msg3 = "db is not specified";
|
||||
const char* msg4 = "invalid table name";
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
char* p = tableNameGetPosition(pTableName, TS_PATH_DELIMITER[0]);
|
||||
|
||||
if (p != NULL) { // db has been specified in sql string so we ignore current db path
|
||||
assert(*p == TS_PATH_DELIMITER[0]);
|
||||
|
||||
int32_t dbLen = p - pTableName->z;
|
||||
if (dbLen <= 0) {
|
||||
return buildInvalidOperationMsg(pMsgBuf, msg2);
|
||||
}
|
||||
char name[TSDB_DB_FNAME_LEN] = {0};
|
||||
strncpy(name, pTableName->z, dbLen);
|
||||
int32_t actualDbLen = strdequote(name);
|
||||
|
||||
code = tNameSetDbName(pName, acctId, name, actualDbLen);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return buildInvalidOperationMsg(pMsgBuf, msg1);
|
||||
}
|
||||
|
||||
int32_t tbLen = pTableName->n - dbLen - 1;
|
||||
if (tbLen <= 0) {
|
||||
return buildInvalidOperationMsg(pMsgBuf, msg4);
|
||||
}
|
||||
|
||||
char tbname[TSDB_TABLE_FNAME_LEN] = {0};
|
||||
strncpy(tbname, p + 1, tbLen);
|
||||
/*tbLen = */ strdequote(tbname);
|
||||
|
||||
code = tNameFromString(pName, tbname, T_NAME_TABLE);
|
||||
if (code != 0) {
|
||||
return buildInvalidOperationMsg(pMsgBuf, msg1);
|
||||
}
|
||||
} else { // get current DB name first, and then set it into path
|
||||
if (pTableName->n >= TSDB_TABLE_NAME_LEN) {
|
||||
return buildInvalidOperationMsg(pMsgBuf, msg1);
|
||||
}
|
||||
|
||||
assert(pTableName->n < TSDB_TABLE_FNAME_LEN);
|
||||
|
||||
char name[TSDB_TABLE_FNAME_LEN] = {0};
|
||||
strncpy(name, pTableName->z, pTableName->n);
|
||||
strdequote(name);
|
||||
|
||||
if (dbName == NULL) {
|
||||
return buildInvalidOperationMsg(pMsgBuf, msg3);
|
||||
}
|
||||
|
||||
code = tNameSetDbName(pName, acctId, dbName, strlen(dbName));
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
code = buildInvalidOperationMsg(pMsgBuf, msg2);
|
||||
return code;
|
||||
}
|
||||
|
||||
code = tNameFromString(pName, name, T_NAME_TABLE);
|
||||
if (code != 0) {
|
||||
code = buildInvalidOperationMsg(pMsgBuf, msg1);
|
||||
}
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t insFindCol(SToken* pColname, int32_t start, int32_t end, SSchema* pSchema) {
|
||||
while (start < end) {
|
||||
if (strlen(pSchema[start].name) == pColname->n && strncmp(pColname->z, pSchema[start].name, pColname->n) == 0) {
|
||||
return start;
|
||||
}
|
||||
++start;
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
|
||||
void insBuildCreateTbReq(SVCreateTbReq* pTbReq, const char* tname, STag* pTag, int64_t suid, const char* sname,
|
||||
SArray* tagName, uint8_t tagNum) {
|
||||
pTbReq->type = TD_CHILD_TABLE;
|
||||
pTbReq->name = strdup(tname);
|
||||
pTbReq->ctb.suid = suid;
|
||||
pTbReq->ctb.tagNum = tagNum;
|
||||
if (sname) pTbReq->ctb.stbName = strdup(sname);
|
||||
pTbReq->ctb.pTag = (uint8_t*)pTag;
|
||||
pTbReq->ctb.tagName = taosArrayDup(tagName);
|
||||
pTbReq->ttl = TSDB_DEFAULT_TABLE_TTL;
|
||||
pTbReq->commentLen = -1;
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
int32_t insMemRowAppend(SMsgBuf* pMsgBuf, const void* value, int32_t len, void* param) {
|
||||
SMemParam* pa = (SMemParam*)param;
|
||||
SRowBuilder* rb = pa->rb;
|
||||
|
||||
if (value == NULL) { // it is a null data
|
||||
tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NULL, value, false, pa->toffset, pa->colIdx);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (TSDB_DATA_TYPE_BINARY == pa->schema->type) {
|
||||
const char* rowEnd = tdRowEnd(rb->pBuf);
|
||||
STR_WITH_SIZE_TO_VARSTR(rowEnd, value, len);
|
||||
tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NORM, rowEnd, false, pa->toffset, pa->colIdx);
|
||||
} else if (TSDB_DATA_TYPE_NCHAR == pa->schema->type) {
|
||||
// if the converted output len is over than pColumnModel->bytes, return error: 'Argument list too long'
|
||||
int32_t output = 0;
|
||||
const char* rowEnd = tdRowEnd(rb->pBuf);
|
||||
if (!taosMbsToUcs4(value, len, (TdUcs4*)varDataVal(rowEnd), pa->schema->bytes - VARSTR_HEADER_SIZE, &output)) {
|
||||
if (errno == E2BIG) {
|
||||
return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_VALUE_TOO_LONG, pa->schema->name);
|
||||
}
|
||||
char buf[512] = {0};
|
||||
snprintf(buf, tListLen(buf), "%s", strerror(errno));
|
||||
return buildSyntaxErrMsg(pMsgBuf, buf, value);
|
||||
}
|
||||
varDataSetLen(rowEnd, output);
|
||||
tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NORM, rowEnd, false, pa->toffset, pa->colIdx);
|
||||
} else {
|
||||
pBlock->pData = NULL;
|
||||
tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NORM, value, false, pa->toffset, pa->colIdx);
|
||||
}
|
||||
|
||||
pBlock->ordered = true;
|
||||
pBlock->prevTS = INT64_MIN;
|
||||
pBlock->size = sizeof(SSubmitBlk);
|
||||
pBlock->tsSource = -1;
|
||||
pBlock->numOfTables = 1;
|
||||
pBlock->nAllocSize = TSDB_PAYLOAD_SIZE;
|
||||
pBlock->headerSize = pBlock->size;
|
||||
pBlock->createTbReqLen = 0;
|
||||
|
||||
memset(&pBlock->rowBuilder, 0, sizeof(pBlock->rowBuilder));
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qCloneStmtDataBlock(void** pDst, void* pSrc) {
|
||||
*pDst = taosMemoryMalloc(sizeof(STableDataBlocks));
|
||||
if (NULL == *pDst) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
int32_t insCheckTimestamp(STableDataBlocks* pDataBlocks, const char* start) {
|
||||
// once the data block is disordered, we do NOT keep previous timestamp any more
|
||||
if (!pDataBlocks->ordered) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
memcpy(*pDst, pSrc, sizeof(STableDataBlocks));
|
||||
((STableDataBlocks*)(*pDst))->cloned = true;
|
||||
TSKEY k = *(TSKEY*)start;
|
||||
if (k <= pDataBlocks->prevTS) {
|
||||
pDataBlocks->ordered = false;
|
||||
}
|
||||
|
||||
STableDataBlocks* pBlock = (STableDataBlocks*)(*pDst);
|
||||
if (pBlock->pTableMeta) {
|
||||
void* pNewMeta = taosMemoryMalloc(TABLE_META_SIZE(pBlock->pTableMeta));
|
||||
if (NULL == pNewMeta) {
|
||||
taosMemoryFreeClear(*pDst);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
pDataBlocks->prevTS = k;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static void buildMsgHeader(STableDataBlocks* src, SVgDataBlocks* blocks) {
|
||||
SSubmitReq* submit = (SSubmitReq*)blocks->pData;
|
||||
submit->header.vgId = htonl(blocks->vg.vgId);
|
||||
submit->header.contLen = htonl(blocks->size);
|
||||
submit->length = submit->header.contLen;
|
||||
submit->numOfBlocks = htonl(blocks->numOfTables);
|
||||
SSubmitBlk* blk = (SSubmitBlk*)(submit + 1);
|
||||
int32_t numOfBlocks = blocks->numOfTables;
|
||||
while (numOfBlocks--) {
|
||||
int32_t dataLen = blk->dataLen;
|
||||
int32_t schemaLen = blk->schemaLen;
|
||||
blk->uid = htobe64(blk->uid);
|
||||
blk->suid = htobe64(blk->suid);
|
||||
blk->sversion = htonl(blk->sversion);
|
||||
blk->dataLen = htonl(blk->dataLen);
|
||||
blk->schemaLen = htonl(blk->schemaLen);
|
||||
blk->numOfRows = htonl(blk->numOfRows);
|
||||
blk = (SSubmitBlk*)(blk->data + schemaLen + dataLen);
|
||||
}
|
||||
}
|
||||
|
||||
int32_t insBuildOutput(SInsertParseContext* pCxt) {
|
||||
size_t numOfVg = taosArrayGetSize(pCxt->pVgDataBlocks);
|
||||
pCxt->pOutput->pDataBlocks = taosArrayInit(numOfVg, POINTER_BYTES);
|
||||
if (NULL == pCxt->pOutput->pDataBlocks) {
|
||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
}
|
||||
for (size_t i = 0; i < numOfVg; ++i) {
|
||||
STableDataBlocks* src = taosArrayGetP(pCxt->pVgDataBlocks, i);
|
||||
SVgDataBlocks* dst = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
|
||||
if (NULL == dst) {
|
||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
}
|
||||
memcpy(pNewMeta, pBlock->pTableMeta, TABLE_META_SIZE(pBlock->pTableMeta));
|
||||
pBlock->pTableMeta = pNewMeta;
|
||||
taosHashGetDup(pCxt->pVgroupsHashObj, (const char*)&src->vgId, sizeof(src->vgId), &dst->vg);
|
||||
dst->numOfTables = src->numOfTables;
|
||||
dst->size = src->size;
|
||||
TSWAP(dst->pData, src->pData);
|
||||
buildMsgHeader(src, dst);
|
||||
taosArrayPush(pCxt->pOutput->pDataBlocks, &dst);
|
||||
}
|
||||
|
||||
return qResetStmtDataBlock(*pDst, false);
|
||||
}
|
||||
|
||||
int32_t qRebuildStmtDataBlock(void** pDst, void* pSrc, uint64_t uid, int32_t vgId) {
|
||||
int32_t code = qCloneStmtDataBlock(pDst, pSrc);
|
||||
if (code) {
|
||||
return code;
|
||||
}
|
||||
|
||||
STableDataBlocks* pBlock = (STableDataBlocks*)*pDst;
|
||||
pBlock->pData = taosMemoryMalloc(pBlock->nAllocSize);
|
||||
if (NULL == pBlock->pData) {
|
||||
qFreeStmtDataBlock(pBlock);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
pBlock->vgId = vgId;
|
||||
|
||||
if (pBlock->pTableMeta) {
|
||||
pBlock->pTableMeta->uid = uid;
|
||||
pBlock->pTableMeta->vgId = vgId;
|
||||
}
|
||||
|
||||
memset(pBlock->pData, 0, sizeof(SSubmitBlk));
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
STableMeta* qGetTableMetaInDataBlock(void* pDataBlock) { return ((STableDataBlocks*)pDataBlock)->pTableMeta; }
|
||||
|
||||
void qFreeStmtDataBlock(void* pDataBlock) {
|
||||
if (pDataBlock == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
taosMemoryFreeClear(((STableDataBlocks*)pDataBlock)->pTableMeta);
|
||||
taosMemoryFreeClear(((STableDataBlocks*)pDataBlock)->pData);
|
||||
taosMemoryFreeClear(pDataBlock);
|
||||
}
|
||||
|
||||
void qDestroyStmtDataBlock(void* pBlock) {
|
||||
if (pBlock == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
STableDataBlocks* pDataBlock = (STableDataBlocks*)pBlock;
|
||||
|
||||
pDataBlock->cloned = false;
|
||||
destroyDataBlock(pDataBlock);
|
||||
}
|
Loading…
Reference in New Issue