Merge remote-tracking branch 'origin/3.0' into feature/qnode
This commit is contained in:
commit
71365eb53d
|
@ -71,8 +71,8 @@ ELSE ()
|
|||
ENDIF ()
|
||||
|
||||
IF (${SANITIZER} MATCHES "true")
|
||||
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -gdwarf-2 -fsanitize=address -fsanitize=undefined -fno-sanitize-recover=all -fsanitize=float-divide-by-zero -fsanitize=float-cast-overflow -fno-sanitize=null -fno-sanitize=alignment -static-libasan -g3")
|
||||
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror -Wno-literal-suffix -Werror=return-type -fPIC -gdwarf-2 -fsanitize=address -fsanitize=undefined -fno-sanitize-recover=all -fsanitize=float-divide-by-zero -fsanitize=float-cast-overflow -fno-sanitize=null -fno-sanitize=alignment -static-libasan -g3")
|
||||
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -gdwarf-2 -fsanitize=address -fsanitize=undefined -fsanitize-recover=all -fsanitize=float-divide-by-zero -fsanitize=float-cast-overflow -fno-sanitize=null -fno-sanitize=alignment -static-libasan -g3")
|
||||
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror -Wno-literal-suffix -Werror=return-type -fPIC -gdwarf-2 -fsanitize=address -fsanitize=undefined -fsanitize-recover=all -fsanitize=float-divide-by-zero -fsanitize=float-cast-overflow -fno-sanitize=null -fno-sanitize=alignment -static-libasan -g3")
|
||||
MESSAGE(STATUS "Will compile with Address Sanitizer!")
|
||||
ELSE ()
|
||||
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -gdwarf-2 -g3")
|
||||
|
|
|
@ -0,0 +1,49 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"fmt"
|
||||
|
||||
_ "github.com/taosdata/driver-go/v2/taosRestful"
|
||||
)
|
||||
|
||||
func createStable(taos *sql.DB) {
|
||||
_, err := taos.Exec("CREATE DATABASE power")
|
||||
if err != nil {
|
||||
fmt.Println("failed to create database, err:", err)
|
||||
}
|
||||
_, err = taos.Exec("CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)")
|
||||
if err != nil {
|
||||
fmt.Println("failed to create stable, err:", err)
|
||||
}
|
||||
}
|
||||
|
||||
func insertData(taos *sql.DB) {
|
||||
sql := `INSERT INTO power.d1001 USING power.meters TAGS(Beijing.Chaoyang, 2) VALUES ('2018-10-03 14:38:05.000', 10.30000, 219, 0.31000) ('2018-10-03 14:38:15.000', 12.60000, 218, 0.33000) ('2018-10-03 14:38:16.800', 12.30000, 221, 0.31000)
|
||||
power.d1002 USING power.meters TAGS(Beijing.Chaoyang, 3) VALUES ('2018-10-03 14:38:16.650', 10.30000, 218, 0.25000)
|
||||
power.d1003 USING power.meters TAGS(Beijing.Haidian, 2) VALUES ('2018-10-03 14:38:05.500', 11.80000, 221, 0.28000) ('2018-10-03 14:38:16.600', 13.40000, 223, 0.29000)
|
||||
power.d1004 USING power.meters TAGS(Beijing.Haidian, 3) VALUES ('2018-10-03 14:38:05.000', 10.80000, 223, 0.29000) ('2018-10-03 14:38:06.500', 11.50000, 221, 0.35000)`
|
||||
result, err := taos.Exec(sql)
|
||||
if err != nil {
|
||||
fmt.Println("failed to insert, err:", err)
|
||||
return
|
||||
}
|
||||
rowsAffected, err := result.RowsAffected()
|
||||
if err != nil {
|
||||
fmt.Println("failed to get affected rows, err:", err)
|
||||
return
|
||||
}
|
||||
fmt.Println("RowsAffected", rowsAffected)
|
||||
}
|
||||
|
||||
func main() {
|
||||
var taosDSN = "root:taosdata@http(localhost:6041)/"
|
||||
taos, err := sql.Open("taosRestful", taosDSN)
|
||||
if err != nil {
|
||||
fmt.Println("failed to connect TDengine, err:", err)
|
||||
return
|
||||
}
|
||||
defer taos.Close()
|
||||
createStable(taos)
|
||||
insertData(taos)
|
||||
}
|
|
@ -190,7 +190,7 @@ int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, con
|
|||
int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, int32_t* capacity,
|
||||
const SColumnInfoData* pSource, uint32_t numOfRow2);
|
||||
int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* pSource, int32_t numOfRows);
|
||||
int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock);
|
||||
int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock, int32_t tsColumnIndex);
|
||||
|
||||
int32_t colDataGetLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRows);
|
||||
void colDataTrim(SColumnInfoData* pColumnInfoData);
|
||||
|
|
|
@ -1352,7 +1352,7 @@ typedef struct {
|
|||
|
||||
typedef struct {
|
||||
int32_t code;
|
||||
char tbFName[TSDB_TABLE_FNAME_LEN];
|
||||
char tbFName[TSDB_TABLE_FNAME_LEN];
|
||||
int32_t sversion;
|
||||
int32_t tversion;
|
||||
} SResReadyRsp;
|
||||
|
@ -2524,7 +2524,7 @@ static FORCE_INLINE void* tDecodeSMqDataBlkRsp(const void* buf, SMqDataBlkRsp* p
|
|||
buf = taosDecodeFixedI32(buf, &pRsp->skipLogNum);
|
||||
buf = taosDecodeFixedI32(buf, &pRsp->blockNum);
|
||||
pRsp->blockData = taosArrayInit(pRsp->blockNum, sizeof(void*));
|
||||
pRsp->blockDataLen = taosArrayInit(pRsp->blockNum, sizeof(void*));
|
||||
pRsp->blockDataLen = taosArrayInit(pRsp->blockNum, sizeof(int32_t));
|
||||
pRsp->blockTbName = taosArrayInit(pRsp->blockNum, sizeof(void*));
|
||||
pRsp->blockSchema = taosArrayInit(pRsp->blockNum, sizeof(void*));
|
||||
if (pRsp->blockNum != 0) {
|
||||
|
|
|
@ -230,23 +230,23 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t
|
|||
taosPrintLog("QRY ", DEBUG_INFO, tsLogEmbedded ? 255 : qDebugFlag, __VA_ARGS__); \
|
||||
} \
|
||||
} while (0)
|
||||
#define qDebug(...) \
|
||||
do { \
|
||||
if (qDebugFlag & DEBUG_DEBUG) { \
|
||||
taosPrintLog("QRY ", DEBUG_DEBUG, qDebugFlag, __VA_ARGS__); \
|
||||
} \
|
||||
#define qDebug(...) \
|
||||
do { \
|
||||
if (qDebugFlag & DEBUG_DEBUG) { \
|
||||
taosPrintLog("QRY ", DEBUG_DEBUG, tsLogEmbedded ? 255 : qDebugFlag, __VA_ARGS__); \
|
||||
} \
|
||||
} while (0)
|
||||
#define qTrace(...) \
|
||||
do { \
|
||||
if (qDebugFlag & DEBUG_TRACE) { \
|
||||
taosPrintLog("QRY ", DEBUG_TRACE, qDebugFlag, __VA_ARGS__); \
|
||||
} \
|
||||
#define qTrace(...) \
|
||||
do { \
|
||||
if (qDebugFlag & DEBUG_TRACE) { \
|
||||
taosPrintLog("QRY ", DEBUG_TRACE, tsLogEmbedded ? 255 : qDebugFlag, __VA_ARGS__); \
|
||||
} \
|
||||
} while (0)
|
||||
#define qDebugL(...) \
|
||||
do { \
|
||||
if (qDebugFlag & DEBUG_DEBUG) { \
|
||||
taosPrintLongString("QRY ", DEBUG_DEBUG, qDebugFlag, __VA_ARGS__); \
|
||||
} \
|
||||
#define qDebugL(...) \
|
||||
do { \
|
||||
if (qDebugFlag & DEBUG_DEBUG) { \
|
||||
taosPrintLongString("QRY ", DEBUG_DEBUG, tsLogEmbedded ? 255 : qDebugFlag, __VA_ARGS__); \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
#define QRY_ERR_RET(c) \
|
||||
|
|
|
@ -39,64 +39,50 @@ int32_t* taosGetErrno();
|
|||
#define TSDB_CODE_SUCCESS 0
|
||||
#define TSDB_CODE_FAILED -1 // unknown or needn't tell detail error
|
||||
|
||||
// rpc
|
||||
#define TSDB_CODE_RPC_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0001)
|
||||
#define TSDB_CODE_RPC_AUTH_REQUIRED TAOS_DEF_ERROR_CODE(0, 0x0002)
|
||||
#define TSDB_CODE_RPC_AUTH_FAILURE TAOS_DEF_ERROR_CODE(0, 0x0003)
|
||||
#define TSDB_CODE_RPC_REDIRECT TAOS_DEF_ERROR_CODE(0, 0x0004)
|
||||
#define TSDB_CODE_RPC_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x0005)
|
||||
#define TSDB_CODE_RPC_ALREADY_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0006)
|
||||
#define TSDB_CODE_RPC_LAST_SESSION_NOT_FINISHED TAOS_DEF_ERROR_CODE(0, 0x0007)
|
||||
#define TSDB_CODE_RPC_MISMATCHED_LINK_ID TAOS_DEF_ERROR_CODE(0, 0x0008)
|
||||
#define TSDB_CODE_RPC_TOO_SLOW TAOS_DEF_ERROR_CODE(0, 0x0009)
|
||||
#define TSDB_CODE_RPC_MAX_SESSIONS TAOS_DEF_ERROR_CODE(0, 0x000A)
|
||||
#define TSDB_CODE_RPC_NETWORK_UNAVAIL TAOS_DEF_ERROR_CODE(0, 0x000B)
|
||||
#define TSDB_CODE_RPC_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x000C)
|
||||
#define TSDB_CODE_RPC_UNEXPECTED_RESPONSE TAOS_DEF_ERROR_CODE(0, 0x000D)
|
||||
#define TSDB_CODE_RPC_INVALID_VALUE TAOS_DEF_ERROR_CODE(0, 0x000E)
|
||||
#define TSDB_CODE_RPC_INVALID_TRAN_ID TAOS_DEF_ERROR_CODE(0, 0x000F)
|
||||
#define TSDB_CODE_RPC_INVALID_SESSION_ID TAOS_DEF_ERROR_CODE(0, 0x0010)
|
||||
#define TSDB_CODE_RPC_INVALID_MSG_TYPE TAOS_DEF_ERROR_CODE(0, 0x0011)
|
||||
#define TSDB_CODE_RPC_INVALID_RESPONSE_TYPE TAOS_DEF_ERROR_CODE(0, 0x0012)
|
||||
#define TSDB_CODE_RPC_INVALID_TIME_STAMP TAOS_DEF_ERROR_CODE(0, 0x0013)
|
||||
#define TSDB_CODE_APP_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x0014)
|
||||
#define TSDB_CODE_RPC_FQDN_ERROR TAOS_DEF_ERROR_CODE(0, 0x0015)
|
||||
#define TSDB_CODE_RPC_INVALID_VERSION TAOS_DEF_ERROR_CODE(0, 0x0016)
|
||||
#define TSDB_CODE_RPC_PORT_EADDRINUSE TAOS_DEF_ERROR_CODE(0, 0x0017)
|
||||
|
||||
//common & util
|
||||
#define TSDB_CODE_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0100)
|
||||
#define TSDB_CODE_OUT_OF_RANGE TAOS_DEF_ERROR_CODE(0, 0x0101)
|
||||
#define TSDB_CODE_OUT_OF_SHM_MEM TAOS_DEF_ERROR_CODE(0, 0x0102)
|
||||
#define TSDB_CODE_INVALID_SHM_ID TAOS_DEF_ERROR_CODE(0, 0x0103)
|
||||
#define TSDB_CODE_INVALID_PTR TAOS_DEF_ERROR_CODE(0, 0x0104)
|
||||
#define TSDB_CODE_INVALID_MSG TAOS_DEF_ERROR_CODE(0, 0x0105)
|
||||
#define TSDB_CODE_INVALID_MSG_LEN TAOS_DEF_ERROR_CODE(0, 0x0106)
|
||||
#define TSDB_CODE_INVALID_PARA TAOS_DEF_ERROR_CODE(0, 0x0107)
|
||||
#define TSDB_CODE_INVALID_CFG TAOS_DEF_ERROR_CODE(0, 0x0108)
|
||||
#define TSDB_CODE_INVALID_OPTION TAOS_DEF_ERROR_CODE(0, 0x0109)
|
||||
#define TSDB_CODE_INVALID_JSON_FORMAT TAOS_DEF_ERROR_CODE(0, 0x010A)
|
||||
#define TSDB_CODE_INVALID_VERSION_NUMBER TAOS_DEF_ERROR_CODE(0, 0x010B)
|
||||
#define TSDB_CODE_INVALID_VERSION_STRING TAOS_DEF_ERROR_CODE(0, 0x010C)
|
||||
#define TSDB_CODE_VERSION_NOT_COMPATIBLE TAOS_DEF_ERROR_CODE(0, 0x010D)
|
||||
#define TSDB_CODE_MEMORY_CORRUPTED TAOS_DEF_ERROR_CODE(0, 0x010E)
|
||||
#define TSDB_CODE_FILE_CORRUPTED TAOS_DEF_ERROR_CODE(0, 0x010F)
|
||||
#define TSDB_CODE_CHECKSUM_ERROR TAOS_DEF_ERROR_CODE(0, 0x0110)
|
||||
#define TSDB_CODE_COMPRESS_ERROR TAOS_DEF_ERROR_CODE(0, 0x0111)
|
||||
#define TSDB_CODE_OPS_NOT_SUPPORT TAOS_DEF_ERROR_CODE(0, 0x0112)
|
||||
#define TSDB_CODE_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0113)
|
||||
#define TSDB_CODE_CFG_NOT_FOUND TAOS_DEF_ERROR_CODE(0, 0x0114)
|
||||
#define TSDB_CODE_REPEAT_INIT TAOS_DEF_ERROR_CODE(0, 0x0115)
|
||||
#define TSDB_CODE_DUP_KEY TAOS_DEF_ERROR_CODE(0, 0x0116)
|
||||
#define TSDB_CODE_NEED_RETRY TAOS_DEF_ERROR_CODE(0, 0x0117)
|
||||
#define TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE TAOS_DEF_ERROR_CODE(0, 0x0118)
|
||||
#define TSDB_CODE_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0001)
|
||||
#define TSDB_CODE_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x0002)
|
||||
#define TSDB_CODE_APP_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x0003)
|
||||
#define TSDB_CODE_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0010)
|
||||
#define TSDB_CODE_OUT_OF_RANGE TAOS_DEF_ERROR_CODE(0, 0x0011)
|
||||
#define TSDB_CODE_OUT_OF_SHM_MEM TAOS_DEF_ERROR_CODE(0, 0x0012)
|
||||
#define TSDB_CODE_INVALID_SHM_ID TAOS_DEF_ERROR_CODE(0, 0x0013)
|
||||
#define TSDB_CODE_INVALID_MSG TAOS_DEF_ERROR_CODE(0, 0x0014)
|
||||
#define TSDB_CODE_INVALID_MSG_LEN TAOS_DEF_ERROR_CODE(0, 0x0015)
|
||||
#define TSDB_CODE_INVALID_MSG_TYPE TAOS_DEF_ERROR_CODE(0, 0x0016)
|
||||
#define TSDB_CODE_INVALID_PTR TAOS_DEF_ERROR_CODE(0, 0x0017)
|
||||
#define TSDB_CODE_INVALID_PARA TAOS_DEF_ERROR_CODE(0, 0x0018)
|
||||
#define TSDB_CODE_INVALID_CFG TAOS_DEF_ERROR_CODE(0, 0x0019)
|
||||
#define TSDB_CODE_INVALID_OPTION TAOS_DEF_ERROR_CODE(0, 0x001A)
|
||||
#define TSDB_CODE_INVALID_JSON_FORMAT TAOS_DEF_ERROR_CODE(0, 0x001B)
|
||||
#define TSDB_CODE_INVALID_VERSION_NUMBER TAOS_DEF_ERROR_CODE(0, 0x001C)
|
||||
#define TSDB_CODE_INVALID_VERSION_STRING TAOS_DEF_ERROR_CODE(0, 0x001D)
|
||||
#define TSDB_CODE_VERSION_NOT_COMPATIBLE TAOS_DEF_ERROR_CODE(0, 0x001E)
|
||||
#define TSDB_CODE_MEMORY_CORRUPTED TAOS_DEF_ERROR_CODE(0, 0x001F)
|
||||
#define TSDB_CODE_FILE_CORRUPTED TAOS_DEF_ERROR_CODE(0, 0x0020)
|
||||
#define TSDB_CODE_CHECKSUM_ERROR TAOS_DEF_ERROR_CODE(0, 0x0021)
|
||||
#define TSDB_CODE_COMPRESS_ERROR TAOS_DEF_ERROR_CODE(0, 0x0022)
|
||||
#define TSDB_CODE_OPS_NOT_SUPPORT TAOS_DEF_ERROR_CODE(0, 0x0023)
|
||||
#define TSDB_CODE_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0024)
|
||||
#define TSDB_CODE_CFG_NOT_FOUND TAOS_DEF_ERROR_CODE(0, 0x0025)
|
||||
#define TSDB_CODE_REPEAT_INIT TAOS_DEF_ERROR_CODE(0, 0x0026)
|
||||
#define TSDB_CODE_DUP_KEY TAOS_DEF_ERROR_CODE(0, 0x0027)
|
||||
#define TSDB_CODE_NEED_RETRY TAOS_DEF_ERROR_CODE(0, 0x0028)
|
||||
#define TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE TAOS_DEF_ERROR_CODE(0, 0x0029)
|
||||
|
||||
#define TSDB_CODE_REF_NO_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0140)
|
||||
#define TSDB_CODE_REF_FULL TAOS_DEF_ERROR_CODE(0, 0x0141)
|
||||
#define TSDB_CODE_REF_ID_REMOVED TAOS_DEF_ERROR_CODE(0, 0x0152)
|
||||
#define TSDB_CODE_REF_INVALID_ID TAOS_DEF_ERROR_CODE(0, 0x0153)
|
||||
#define TSDB_CODE_REF_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0154)
|
||||
#define TSDB_CODE_REF_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0155)
|
||||
#define TSDB_CODE_REF_NO_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0040)
|
||||
#define TSDB_CODE_REF_FULL TAOS_DEF_ERROR_CODE(0, 0x0041)
|
||||
#define TSDB_CODE_REF_ID_REMOVED TAOS_DEF_ERROR_CODE(0, 0x0042)
|
||||
#define TSDB_CODE_REF_INVALID_ID TAOS_DEF_ERROR_CODE(0, 0x0043)
|
||||
#define TSDB_CODE_REF_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0044)
|
||||
#define TSDB_CODE_REF_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0045)
|
||||
|
||||
// rpc
|
||||
#define TSDB_CODE_RPC_REDIRECT TAOS_DEF_ERROR_CODE(0, 0x0100)
|
||||
#define TSDB_CODE_RPC_AUTH_FAILURE TAOS_DEF_ERROR_CODE(0, 0x0101)
|
||||
#define TSDB_CODE_RPC_NETWORK_UNAVAIL TAOS_DEF_ERROR_CODE(0, 0x0102)
|
||||
#define TSDB_CODE_RPC_FQDN_ERROR TAOS_DEF_ERROR_CODE(0, 0x0103)
|
||||
#define TSDB_CODE_RPC_PORT_EADDRINUSE TAOS_DEF_ERROR_CODE(0, 0x0104)
|
||||
|
||||
//client
|
||||
#define TSDB_CODE_TSC_INVALID_OPERATION TAOS_DEF_ERROR_CODE(0, 0x0200)
|
||||
|
@ -296,8 +282,8 @@ int32_t* taosGetErrno();
|
|||
// dnode
|
||||
#define TSDB_CODE_NODE_REDIRECT TAOS_DEF_ERROR_CODE(0, 0x0400)
|
||||
#define TSDB_CODE_NODE_OFFLINE TAOS_DEF_ERROR_CODE(0, 0x0401)
|
||||
#define TSDB_CODE_NODE_ALREADY_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x0403)
|
||||
#define TSDB_CODE_NODE_NOT_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x0404)
|
||||
#define TSDB_CODE_NODE_ALREADY_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x0402)
|
||||
#define TSDB_CODE_NODE_NOT_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x0403)
|
||||
|
||||
// vnode
|
||||
#define TSDB_CODE_VND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0500)
|
||||
|
|
|
@ -341,7 +341,7 @@ size_t blockDataGetNumOfCols(const SSDataBlock* pBlock) {
|
|||
|
||||
size_t blockDataGetNumOfRows(const SSDataBlock* pBlock) { return pBlock->info.rows; }
|
||||
|
||||
int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock) {
|
||||
int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock, int32_t tsColumnIndex) {
|
||||
if (pDataBlock == NULL || pDataBlock->info.rows <= 0) {
|
||||
return 0;
|
||||
}
|
||||
|
@ -350,7 +350,8 @@ int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, 0);
|
||||
int32_t index = (tsColumnIndex == -1)? 0:tsColumnIndex;
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, index);
|
||||
if (pColInfoData->info.type != TSDB_DATA_TYPE_TIMESTAMP) {
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -134,10 +134,10 @@ SMgmtWrapper *dmAcquireWrapper(SDnode *pDnode, EDndNodeType nType);
|
|||
int32_t dmMarkWrapper(SMgmtWrapper *pWrapper);
|
||||
void dmReleaseWrapper(SMgmtWrapper *pWrapper);
|
||||
SMgmtInputOpt dmBuildMgmtInputOpt(SMgmtWrapper *pWrapper);
|
||||
|
||||
void dmSetStatus(SDnode *pDnode, EDndRunStatus stype);
|
||||
void dmProcessServerStartupStatus(SDnode *pDnode, SRpcMsg *pMsg);
|
||||
void dmProcessNetTestReq(SDnode *pDnode, SRpcMsg *pMsg);
|
||||
void dmSetStatus(SDnode *pDnode, EDndRunStatus stype);
|
||||
void dmProcessServerStartupStatus(SDnode *pDnode, SRpcMsg *pMsg);
|
||||
void dmProcessNetTestReq(SDnode *pDnode, SRpcMsg *pMsg);
|
||||
void dmProcessFetchRsp(SRpcMsg *pMsg);
|
||||
|
||||
// dmNodes.c
|
||||
int32_t dmOpenNode(SMgmtWrapper *pWrapper);
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
#define _DEFAULT_SOURCE
|
||||
#include "dmMgmt.h"
|
||||
#include "dmNodes.h"
|
||||
#include "qworker.h"
|
||||
|
||||
static bool dmRequireNode(SDnode *pDnode, SMgmtWrapper *pWrapper) {
|
||||
SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper);
|
||||
|
@ -279,38 +280,42 @@ static void dmGetServerStartupStatus(SDnode *pDnode, SServerStatusRsp *pStatus)
|
|||
|
||||
void dmProcessNetTestReq(SDnode *pDnode, SRpcMsg *pMsg) {
|
||||
dDebug("msg:%p, net test req will be processed", pMsg);
|
||||
SRpcMsg rsp = {.code = 0, .info = pMsg->info};
|
||||
|
||||
SRpcMsg rsp = {.info = pMsg->info};
|
||||
rsp.pCont = rpcMallocCont(pMsg->contLen);
|
||||
if (rsp.pCont == NULL) {
|
||||
rsp.code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
} else {
|
||||
rsp.contLen = pMsg->contLen;
|
||||
}
|
||||
|
||||
rpcSendResponse(&rsp);
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
}
|
||||
|
||||
void dmProcessServerStartupStatus(SDnode *pDnode, SRpcMsg *pMsg) {
|
||||
dDebug("msg:%p, server startup status req will be processed", pMsg);
|
||||
|
||||
SServerStatusRsp statusRsp = {0};
|
||||
dmGetServerStartupStatus(pDnode, &statusRsp);
|
||||
|
||||
SRpcMsg rspMsg = {.info = pMsg->info};
|
||||
int32_t rspLen = tSerializeSServerStatusRsp(NULL, 0, &statusRsp);
|
||||
if (rspLen < 0) {
|
||||
rspMsg.code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _OVER;
|
||||
SRpcMsg rsp = {.info = pMsg->info};
|
||||
int32_t contLen = tSerializeSServerStatusRsp(NULL, 0, &statusRsp);
|
||||
if (contLen < 0) {
|
||||
rsp.code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
} else {
|
||||
rsp.pCont = rpcMallocCont(contLen);
|
||||
if (rsp.pCont != NULL) {
|
||||
tSerializeSServerStatusRsp(rsp.pCont, contLen, &statusRsp);
|
||||
rsp.contLen = contLen;
|
||||
}
|
||||
}
|
||||
|
||||
void *pRsp = rpcMallocCont(rspLen);
|
||||
if (pRsp == NULL) {
|
||||
rspMsg.code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
tSerializeSServerStatusRsp(pRsp, rspLen, &statusRsp);
|
||||
rspMsg.pCont = pRsp;
|
||||
rspMsg.contLen = rspLen;
|
||||
|
||||
_OVER:
|
||||
rpcSendResponse(&rspMsg);
|
||||
rpcSendResponse(&rsp);
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
}
|
||||
|
||||
void dmProcessFetchRsp(SRpcMsg *pMsg) {
|
||||
qWorkerProcessFetchRsp(NULL, NULL, pMsg);
|
||||
// rpcFreeCont(pMsg->pCont);
|
||||
}
|
|
@ -142,13 +142,14 @@ static inline int32_t dmPushToProcQueue(SProc *proc, SProcQueue *queue, SRpcMsg
|
|||
queue->tail = headLen + bodyLen;
|
||||
} else if (remain < 8 + headLen) {
|
||||
memcpy(queue->pBuffer + queue->tail + 8, pHead, remain - 8);
|
||||
memcpy(queue->pBuffer, (char*)pHead + remain - 8, rawHeadLen - (remain - 8));
|
||||
memcpy(queue->pBuffer, (char *)pHead + remain - 8, rawHeadLen - (remain - 8));
|
||||
if (rawBodyLen > 0) memcpy(queue->pBuffer + headLen - (remain - 8), pBody, rawBodyLen);
|
||||
queue->tail = headLen - (remain - 8) + bodyLen;
|
||||
} else if (remain < 8 + headLen + bodyLen) {
|
||||
memcpy(queue->pBuffer + queue->tail + 8, pHead, rawHeadLen);
|
||||
if (rawBodyLen > 0) memcpy(queue->pBuffer + queue->tail + 8 + headLen, pBody, remain - 8 - headLen);
|
||||
if (rawBodyLen > 0) memcpy(queue->pBuffer, (char*)pBody + remain - 8 - headLen, rawBodyLen - (remain - 8 - headLen));
|
||||
if (rawBodyLen > 0)
|
||||
memcpy(queue->pBuffer, (char *)pBody + remain - 8 - headLen, rawBodyLen - (remain - 8 - headLen));
|
||||
queue->tail = bodyLen - (remain - 8 - headLen);
|
||||
} else {
|
||||
memcpy(queue->pBuffer + queue->tail + 8, pHead, rawHeadLen);
|
||||
|
@ -312,12 +313,7 @@ static void *dmConsumChildQueue(void *param) {
|
|||
code = dmProcessNodeMsg(pWrapper, pMsg);
|
||||
if (code != 0) {
|
||||
dError("node:%s, failed to process msg:%p since %s, put into pqueue", proc->name, pMsg, terrstr());
|
||||
SRpcMsg rsp = {
|
||||
.code = (terrno != 0 ? terrno : code),
|
||||
.pCont = pMsg->info.rsp,
|
||||
.contLen = pMsg->info.rspLen,
|
||||
.info = pMsg->info,
|
||||
};
|
||||
SRpcMsg rsp = {.code = (terrno != 0 ? terrno : code), .info = pMsg->info};
|
||||
dmPutToProcPQueue(proc, &rsp, DND_FUNC_RSP);
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
taosFreeQitem(pMsg);
|
||||
|
@ -469,8 +465,18 @@ void dmPutToProcPQueue(SProc *proc, SRpcMsg *pMsg, EProcFuncType ftype) {
|
|||
taosMsleep(retry);
|
||||
}
|
||||
}
|
||||
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
pMsg->pCont = NULL;
|
||||
pMsg->contLen = 0;
|
||||
}
|
||||
|
||||
int32_t dmPutToProcCQueue(SProc *proc, SRpcMsg *pMsg, EProcFuncType ftype) {
|
||||
return dmPushToProcQueue(proc, proc->cqueue, pMsg, ftype);
|
||||
int32_t code = dmPushToProcQueue(proc, proc->cqueue, pMsg, ftype);
|
||||
if (code == 0) {
|
||||
dTrace("msg:%p, is freed after push to cqueue", pMsg);
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
taosFreeQitem(pMsg);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -15,11 +15,10 @@
|
|||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "dmMgmt.h"
|
||||
#include "qworker.h"
|
||||
|
||||
#define INTERNAL_USER "_dnd"
|
||||
#define INTERNAL_CKEY "_key"
|
||||
#define INTERNAL_SECRET "_pwd"
|
||||
static void dmSendRedirectRsp(SRpcMsg *pMsg, const SEpSet *pNewEpSet);
|
||||
static void dmSendRsp(SRpcMsg *pMsg);
|
||||
static void dmBuildMnodeRedirectRsp(SDnode *pDnode, SRpcMsg *pMsg);
|
||||
|
||||
static inline int32_t dmBuildNodeMsg(SRpcMsg *pMsg, SRpcMsg *pRpc) {
|
||||
SRpcConnInfo connInfo = {0};
|
||||
|
@ -49,49 +48,42 @@ int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) {
|
|||
}
|
||||
|
||||
static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
|
||||
SDnodeTrans * pTrans = &pDnode->trans;
|
||||
SDnodeTrans *pTrans = &pDnode->trans;
|
||||
int32_t code = -1;
|
||||
SRpcMsg * pMsg = NULL;
|
||||
bool needRelease = false;
|
||||
SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pRpc->msgType)];
|
||||
SRpcMsg *pMsg = NULL;
|
||||
SMgmtWrapper *pWrapper = NULL;
|
||||
SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pRpc->msgType)];
|
||||
|
||||
dTrace("msg:%s is received, handle:%p len:%d code:0x%x app:%p refId:%" PRId64, TMSG_INFO(pRpc->msgType),
|
||||
pRpc->info.handle, pRpc->contLen, pRpc->code, pRpc->info.ahandle, pRpc->info.refId);
|
||||
pRpc->info.noResp = 0;
|
||||
pRpc->info.persistHandle = 0;
|
||||
pRpc->info.wrapper = NULL;
|
||||
pRpc->info.node = NULL;
|
||||
pRpc->info.rsp = NULL;
|
||||
pRpc->info.rspLen = 0;
|
||||
|
||||
if (pRpc->msgType == TDMT_DND_NET_TEST) {
|
||||
dmProcessNetTestReq(pDnode, pRpc);
|
||||
goto _OVER_JUST_FREE;
|
||||
return;
|
||||
} else if (pRpc->msgType == TDMT_MND_SYSTABLE_RETRIEVE_RSP || pRpc->msgType == TDMT_VND_FETCH_RSP) {
|
||||
qWorkerProcessFetchRsp(NULL, NULL, pRpc);
|
||||
goto _OVER_JUST_FREE;
|
||||
dmProcessFetchRsp(pRpc);
|
||||
return;
|
||||
} else {
|
||||
}
|
||||
|
||||
if (pDnode->status != DND_STAT_RUNNING) {
|
||||
if (pRpc->msgType == TDMT_DND_SERVER_STATUS) {
|
||||
dmProcessServerStartupStatus(pDnode, pRpc);
|
||||
goto _OVER_JUST_FREE;
|
||||
return;
|
||||
} else {
|
||||
terrno = TSDB_CODE_APP_NOT_READY;
|
||||
goto _OVER_RSP_FREE;
|
||||
goto _OVER;
|
||||
}
|
||||
}
|
||||
|
||||
if (IsReq(pRpc) && pRpc->pCont == NULL) {
|
||||
terrno = TSDB_CODE_INVALID_MSG_LEN;
|
||||
goto _OVER_RSP_FREE;
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
if (pHandle->defaultNtype == NODE_END) {
|
||||
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
|
||||
goto _OVER_RSP_FREE;
|
||||
goto _OVER;
|
||||
} else {
|
||||
pWrapper = &pDnode->wrappers[pHandle->defaultNtype];
|
||||
if (pHandle->needCheckVgId) {
|
||||
|
@ -106,15 +98,15 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
|
|||
}
|
||||
} else {
|
||||
terrno = TSDB_CODE_INVALID_MSG_LEN;
|
||||
goto _OVER_RSP_FREE;
|
||||
goto _OVER;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (dmMarkWrapper(pWrapper) != 0) {
|
||||
goto _OVER_RSP_FREE;
|
||||
pWrapper = NULL;
|
||||
goto _OVER;
|
||||
} else {
|
||||
needRelease = true;
|
||||
pRpc->info.wrapper = pWrapper;
|
||||
}
|
||||
|
||||
|
@ -134,24 +126,23 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
|
|||
}
|
||||
|
||||
_OVER:
|
||||
if (code == 0) {
|
||||
if (pWrapper != NULL && InParentProc(pWrapper)) {
|
||||
dTrace("msg:%p, is freed after push to cqueue", pMsg);
|
||||
taosFreeQitem(pMsg);
|
||||
rpcFreeCont(pRpc->pCont);
|
||||
}
|
||||
} else {
|
||||
if (code != 0) {
|
||||
dError("msg:%p, failed to process since %s", pMsg, terrstr());
|
||||
if (terrno != 0) code = terrno;
|
||||
|
||||
if (IsReq(pRpc)) {
|
||||
if (code == TSDB_CODE_NODE_NOT_DEPLOYED || code == TSDB_CODE_NODE_OFFLINE) {
|
||||
if (pRpc->msgType > TDMT_MND_MSG && pRpc->msgType < TDMT_VND_MSG) {
|
||||
code = TSDB_CODE_NODE_REDIRECT;
|
||||
}
|
||||
SRpcMsg rsp = {.code = code, .info = pRpc->info};
|
||||
|
||||
if ((code == TSDB_CODE_NODE_NOT_DEPLOYED || code == TSDB_CODE_APP_NOT_READY) && pRpc->msgType > TDMT_MND_MSG &&
|
||||
pRpc->msgType < TDMT_VND_MSG) {
|
||||
dmBuildMnodeRedirectRsp(pDnode, &rsp);
|
||||
}
|
||||
|
||||
if (pWrapper != NULL) {
|
||||
dmSendRsp(&rsp);
|
||||
} else {
|
||||
rpcSendResponse(&rsp);
|
||||
}
|
||||
SRpcMsg rspMsg = {.code = code, .info = pRpc->info};
|
||||
tmsgSendRsp(&rspMsg);
|
||||
}
|
||||
|
||||
dTrace("msg:%p, is freed", pMsg);
|
||||
|
@ -159,19 +150,7 @@ _OVER:
|
|||
rpcFreeCont(pRpc->pCont);
|
||||
}
|
||||
|
||||
if (needRelease) {
|
||||
dmReleaseWrapper(pWrapper);
|
||||
}
|
||||
return;
|
||||
|
||||
_OVER_JUST_FREE:
|
||||
rpcFreeCont(pRpc->pCont);
|
||||
return;
|
||||
|
||||
_OVER_RSP_FREE:
|
||||
rpcFreeCont(pRpc->pCont);
|
||||
SRpcMsg simpleRsp = {.code = terrno, .info = pRpc->info};
|
||||
rpcSendResponse(&simpleRsp);
|
||||
dmReleaseWrapper(pWrapper);
|
||||
}
|
||||
|
||||
int32_t dmInitMsgHandle(SDnode *pDnode) {
|
||||
|
@ -179,11 +158,11 @@ int32_t dmInitMsgHandle(SDnode *pDnode) {
|
|||
|
||||
for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) {
|
||||
SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
|
||||
SArray * pArray = (*pWrapper->func.getHandlesFp)();
|
||||
SArray *pArray = (*pWrapper->func.getHandlesFp)();
|
||||
if (pArray == NULL) return -1;
|
||||
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
|
||||
SMgmtHandle * pMgmt = taosArrayGet(pArray, i);
|
||||
SMgmtHandle *pMgmt = taosArrayGet(pArray, i);
|
||||
SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pMgmt->msgType)];
|
||||
if (pMgmt->needCheckVgId) {
|
||||
pHandle->needCheckVgId = pMgmt->needCheckVgId;
|
||||
|
@ -218,13 +197,25 @@ static inline void dmSendRsp(SRpcMsg *pMsg) {
|
|||
SMgmtWrapper *pWrapper = pMsg->info.wrapper;
|
||||
if (InChildProc(pWrapper)) {
|
||||
dmPutToProcPQueue(&pWrapper->proc, pMsg, DND_FUNC_RSP);
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
pMsg->pCont = NULL;
|
||||
} else {
|
||||
rpcSendResponse(pMsg);
|
||||
}
|
||||
}
|
||||
|
||||
static void dmBuildMnodeRedirectRsp(SDnode *pDnode, SRpcMsg *pMsg) {
|
||||
SMEpSet msg = {0};
|
||||
dmGetMnodeEpSetForRedirect(&pDnode->data, pMsg, &msg.epSet);
|
||||
|
||||
int32_t contLen = tSerializeSMEpSet(NULL, 0, &msg);
|
||||
pMsg->pCont = rpcMallocCont(contLen);
|
||||
if (pMsg->pCont == NULL) {
|
||||
pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
} else {
|
||||
tSerializeSMEpSet(pMsg->pCont, contLen, &msg);
|
||||
pMsg->contLen = contLen;
|
||||
}
|
||||
}
|
||||
|
||||
static inline void dmSendRedirectRsp(SRpcMsg *pMsg, const SEpSet *pNewEpSet) {
|
||||
SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info};
|
||||
SMEpSet msg = {.epSet = *pNewEpSet};
|
||||
|
@ -246,8 +237,6 @@ static inline void dmRegisterBrokenLinkArg(SRpcMsg *pMsg) {
|
|||
SMgmtWrapper *pWrapper = pMsg->info.wrapper;
|
||||
if (InChildProc(pWrapper)) {
|
||||
dmPutToProcPQueue(&pWrapper->proc, pMsg, DND_FUNC_REGIST);
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
pMsg->pCont = NULL;
|
||||
} else {
|
||||
rpcRegisterBrokenLinkArg(pMsg);
|
||||
}
|
||||
|
@ -275,7 +264,6 @@ int32_t dmInitClient(SDnode *pDnode) {
|
|||
rpcInit.sessions = 1024;
|
||||
rpcInit.connType = TAOS_CONN_CLIENT;
|
||||
rpcInit.idleTime = tsShellActivityTimer * 1000;
|
||||
rpcInit.user = INTERNAL_USER;
|
||||
rpcInit.parent = pDnode;
|
||||
rpcInit.rfp = rpcRfp;
|
||||
|
||||
|
@ -343,34 +331,3 @@ SMsgCb dmGetMsgcb(SDnode *pDnode) {
|
|||
};
|
||||
return msgCb;
|
||||
}
|
||||
|
||||
static void dmSendMnodeRedirectRsp(SRpcMsg *pMsg) {
|
||||
SDnode *pDnode = dmInstance();
|
||||
SEpSet epSet = {0};
|
||||
dmGetMnodeEpSet(&pDnode->data, &epSet);
|
||||
|
||||
dDebug("msg:%p, is redirected, num:%d use:%d", pMsg, epSet.numOfEps, epSet.inUse);
|
||||
for (int32_t i = 0; i < epSet.numOfEps; ++i) {
|
||||
dDebug("mnode index:%d %s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port);
|
||||
if (strcmp(epSet.eps[i].fqdn, tsLocalFqdn) == 0 && epSet.eps[i].port == tsServerPort) {
|
||||
epSet.inUse = (i + 1) % epSet.numOfEps;
|
||||
}
|
||||
|
||||
epSet.eps[i].port = htons(epSet.eps[i].port);
|
||||
}
|
||||
|
||||
SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info};
|
||||
SMEpSet msg = {.epSet = epSet};
|
||||
int32_t contLen = tSerializeSMEpSet(NULL, 0, &msg);
|
||||
rsp.pCont = rpcMallocCont(contLen);
|
||||
if (rsp.pCont == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
} else {
|
||||
tSerializeSMEpSet(rsp.pCont, contLen, &msg);
|
||||
rsp.contLen = contLen;
|
||||
}
|
||||
|
||||
dmSendRsp(&rsp);
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
pMsg->pCont = NULL;
|
||||
}
|
||||
|
|
|
@ -173,6 +173,7 @@ int32_t dmReadEps(SDnodeData *pData);
|
|||
int32_t dmWriteEps(SDnodeData *pData);
|
||||
void dmUpdateEps(SDnodeData *pData, SArray *pDnodeEps);
|
||||
void dmGetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet);
|
||||
void dmGetMnodeEpSetForRedirect(SDnodeData *pData, SRpcMsg *pMsg, SEpSet *pEpSet);
|
||||
void dmSetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet);
|
||||
|
||||
#ifdef __cplusplus
|
||||
|
|
|
@ -314,6 +314,17 @@ void dmGetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet) {
|
|||
taosThreadRwlockUnlock(&pData->lock);
|
||||
}
|
||||
|
||||
void dmGetMnodeEpSetForRedirect(SDnodeData *pData, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||
dmGetMnodeEpSet(pData, pEpSet);
|
||||
dDebug("msg:%p, is redirected, num:%d use:%d", pMsg, pEpSet->numOfEps, pEpSet->inUse);
|
||||
for (int32_t i = 0; i < pEpSet->numOfEps; ++i) {
|
||||
dDebug("mnode index:%d %s:%u", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port);
|
||||
if (strcmp(pEpSet->eps[i].fqdn, tsLocalFqdn) == 0 && pEpSet->eps[i].port == tsServerPort) {
|
||||
pEpSet->inUse = (i + 1) % pEpSet->numOfEps;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void dmSetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet) {
|
||||
taosThreadRwlockWrlock(&pData->lock);
|
||||
pData->mnodeEps = *pEpSet;
|
||||
|
|
|
@ -994,9 +994,6 @@ static int32_t mndTransSendActionMsg(SMnode *pMnode, STrans *pTrans, SArray *pAr
|
|||
pAction->msgSent = 0;
|
||||
pAction->msgReceived = 0;
|
||||
pAction->errCode = terrno;
|
||||
if (terrno == TSDB_CODE_INVALID_PTR || terrno == TSDB_CODE_NODE_OFFLINE) {
|
||||
rpcFreeCont(rpcMsg.pCont);
|
||||
}
|
||||
mError("trans:%d, action:%d not send since %s", pTrans->id, action, terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
|
|
@ -483,12 +483,12 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl
|
|||
terrno = TSDB_CODE_VND_TABLE_COL_NOT_EXISTS;
|
||||
goto _err;
|
||||
}
|
||||
if (!IS_VAR_DATA_TYPE(pColumn->type) || pColumn->bytes <= pAlterTbReq->bytes) {
|
||||
if (!IS_VAR_DATA_TYPE(pColumn->type) || pColumn->bytes > pAlterTbReq->colModBytes) {
|
||||
terrno = TSDB_CODE_VND_INVALID_TABLE_ACTION;
|
||||
goto _err;
|
||||
}
|
||||
pSchema->sver++;
|
||||
pColumn->bytes = pAlterTbReq->bytes;
|
||||
pColumn->bytes = pAlterTbReq->colModBytes;
|
||||
break;
|
||||
case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME:
|
||||
if (pColumn == NULL) {
|
||||
|
|
|
@ -2775,7 +2775,7 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int
|
|||
|
||||
win->ekey = key;
|
||||
if (rv != TD_ROW_SVER(row)) {
|
||||
pSchema = metaGetTbTSchema(REPO_META(pTsdbReadHandle->pTsdb), pCheckInfo->tableId, 1);
|
||||
pSchema = metaGetTbTSchema(REPO_META(pTsdbReadHandle->pTsdb), pCheckInfo->tableId, TD_ROW_SVER(row));
|
||||
rv = TD_ROW_SVER(row);
|
||||
}
|
||||
numOfRows += mergeTwoRowFromMem(pTsdbReadHandle, maxRowsToRead, &curRows, row, NULL, numOfCols, pCheckInfo->tableId,
|
||||
|
|
|
@ -77,7 +77,8 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
|
|||
pVnode->path = (char *)&pVnode[1];
|
||||
strcpy(pVnode->path, path);
|
||||
pVnode->config = info.config;
|
||||
pVnode->state = info.state;
|
||||
pVnode->state.committed = info.state.committed;
|
||||
pVnode->state.applied = info.state.committed;
|
||||
pVnode->pTfs = pTfs;
|
||||
pVnode->msgCb = msgCb;
|
||||
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -1903,7 +1903,7 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, SArray* pColMatchIn
|
|||
filterFreeInfo(filter);
|
||||
|
||||
extractQualifiedTupleByFilterResult(pBlock, rowRes, keep);
|
||||
blockDataUpdateTsWindow(pBlock);
|
||||
blockDataUpdateTsWindow(pBlock, 0);
|
||||
}
|
||||
|
||||
void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowRes, bool keep) {
|
||||
|
@ -2072,7 +2072,7 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprI
|
|||
}
|
||||
|
||||
qDebug("%s result generated, rows:%d, groupId:%"PRIu64, GET_TASKID(pTaskInfo), pBlock->info.rows, pBlock->info.groupId);
|
||||
blockDataUpdateTsWindow(pBlock);
|
||||
blockDataUpdateTsWindow(pBlock, 0);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -2797,7 +2797,9 @@ int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadI
|
|||
}
|
||||
|
||||
pRes->info.rows = numOfRows;
|
||||
blockDataUpdateTsWindow(pRes);
|
||||
|
||||
// todo move this to time window aggregator, since the primary timestamp may not be known by exchange operator.
|
||||
blockDataUpdateTsWindow(pRes, 0);
|
||||
|
||||
int64_t el = taosGetTimestampUs() - startTs;
|
||||
|
||||
|
|
|
@ -553,7 +553,7 @@ static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) {
|
|||
|
||||
pInfo->pageIndex += 1;
|
||||
|
||||
blockDataUpdateTsWindow(pInfo->binfo.pRes);
|
||||
blockDataUpdateTsWindow(pInfo->binfo.pRes, 0);
|
||||
pInfo->binfo.pRes->info.groupId = pGroupInfo->groupId;
|
||||
return pInfo->binfo.pRes;
|
||||
}
|
||||
|
|
|
@ -689,7 +689,7 @@ static SSDataBlock* getUpdateDataBlock(SStreamBlockScanInfo* pInfo, bool inverti
|
|||
}
|
||||
pDataBlock->info.rows = size;
|
||||
pDataBlock->info.type = STREAM_REPROCESS;
|
||||
blockDataUpdateTsWindow(pDataBlock);
|
||||
blockDataUpdateTsWindow(pDataBlock, 0);
|
||||
taosArrayClear(pInfo->tsArray);
|
||||
return pDataBlock;
|
||||
}
|
||||
|
@ -899,7 +899,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
|
|||
}
|
||||
rows = pBlockInfo->rows;
|
||||
doFilter(pInfo->pCondition, pInfo->pRes, NULL);
|
||||
blockDataUpdateTsWindow(pInfo->pRes);
|
||||
blockDataUpdateTsWindow(pInfo->pRes, 0);
|
||||
|
||||
break;
|
||||
}
|
||||
|
@ -972,9 +972,10 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, void* pDataR
|
|||
}
|
||||
|
||||
pInfo->primaryTsIndex = 0; // TODO(liuyao) get it from physical plan
|
||||
pInfo->pUpdateInfo = updateInfoInitP(&pSTInfo->interval, 10000); // TODO(liuyao) get watermark from physical plan
|
||||
if (pInfo->pUpdateInfo == NULL) {
|
||||
goto _error;
|
||||
if (pSTInfo->interval.interval > 0) {
|
||||
pInfo->pUpdateInfo = updateInfoInitP(&pSTInfo->interval, 10000); // TODO(liuyao) get watermark from physical plan
|
||||
} else {
|
||||
pInfo->pUpdateInfo = NULL;
|
||||
}
|
||||
|
||||
pInfo->readHandle = *pHandle;
|
||||
|
|
|
@ -621,7 +621,7 @@ static void saveDataBlockLastRow(char** pRow, SArray* pDataBlock, int32_t rowInd
|
|||
}
|
||||
}
|
||||
|
||||
static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pSDataBlock,
|
||||
static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock,
|
||||
uint64_t tableGroupId) {
|
||||
SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)pOperatorInfo->info;
|
||||
|
||||
|
@ -639,13 +639,17 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
|
|||
// int32_t prevIndex = pResultRowInfo->curPos;
|
||||
|
||||
TSKEY* tsCols = NULL;
|
||||
if (pSDataBlock->pDataBlock != NULL) {
|
||||
SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);
|
||||
if (pBlock->pDataBlock != NULL) {
|
||||
SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
|
||||
tsCols = (int64_t*)pColDataInfo->pData;
|
||||
|
||||
if (tsCols != NULL) {
|
||||
blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex);
|
||||
}
|
||||
}
|
||||
|
||||
int32_t startPos = 0;
|
||||
TSKEY ts = getStartTsKey(&pSDataBlock->info.window, tsCols, pSDataBlock->info.rows, ascScan);
|
||||
TSKEY ts = getStartTsKey(&pBlock->info.window, tsCols, pBlock->info.rows, ascScan);
|
||||
|
||||
STimeWindow win = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval,
|
||||
pInfo->interval.precision, &pInfo->win);
|
||||
|
@ -670,7 +674,7 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
|
|||
int32_t forwardStep = 0;
|
||||
TSKEY ekey = ascScan? win.ekey:win.skey;
|
||||
forwardStep =
|
||||
getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, pInfo->order);
|
||||
getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, pInfo->order);
|
||||
ASSERT(forwardStep > 0);
|
||||
|
||||
// prev time window not interpolation yet.
|
||||
|
@ -686,7 +690,7 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
|
|||
}
|
||||
|
||||
STimeWindow w = pRes->win;
|
||||
ret = setTimeWindowOutputBuf(pResultRowInfo, pSDataBlock->info.uid, &w, masterScan, &pResult, tableGroupId,
|
||||
ret = setTimeWindowOutputBuf(pResultRowInfo, pBlock->info.uid, &w, masterScan, &pResult, tableGroupId,
|
||||
pInfo->binfo.pCtx, numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup,
|
||||
pTaskInfo);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
|
@ -694,17 +698,17 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
|
|||
}
|
||||
|
||||
assert(!resultRowInterpolated(pResult, RESULT_ROW_END_INTERP));
|
||||
doTimeWindowInterpolation(pOperatorInfo, &pInfo->binfo, pSDataBlock->pDataBlock, *(TSKEY*)pInfo->pRow[0], -1,
|
||||
doTimeWindowInterpolation(pOperatorInfo, &pInfo->binfo, pBlock->pDataBlock, *(TSKEY*)pInfo->pRow[0], -1,
|
||||
tsCols[startPos], startPos, w.ekey, RESULT_ROW_END_INTERP);
|
||||
|
||||
setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
|
||||
setNotInterpoWindowKey(pInfo->binfo.pCtx, pOperatorInfo->numOfExprs, RESULT_ROW_START_INTERP);
|
||||
|
||||
doApplyFunctions(pInfo->binfo.pCtx, &w, &pInfo->timeWindowData, startPos, 0, tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
|
||||
doApplyFunctions(pInfo->binfo.pCtx, &w, &pInfo->timeWindowData, startPos, 0, tsCols, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
|
||||
}
|
||||
|
||||
// restore current time window
|
||||
ret = setTimeWindowOutputBuf(pResultRowInfo, pSDataBlock->info.uid, &win, masterScan, &pResult, tableGroupId,
|
||||
ret = setTimeWindowOutputBuf(pResultRowInfo, pBlock->info.uid, &win, masterScan, &pResult, tableGroupId,
|
||||
pInfo->binfo.pCtx, numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup,
|
||||
pTaskInfo);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
|
@ -714,17 +718,17 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
|
|||
#endif
|
||||
|
||||
// window start key interpolation
|
||||
doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->binfo.pCtx, pResult, &win, startPos, forwardStep,
|
||||
doWindowBorderInterpolation(pOperatorInfo, pBlock, pInfo->binfo.pCtx, pResult, &win, startPos, forwardStep,
|
||||
pInfo->order, false);
|
||||
|
||||
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, true);
|
||||
doApplyFunctions(pTaskInfo, pInfo->binfo.pCtx, &win, &pInfo->twAggSup.timeWindowData, startPos, forwardStep, tsCols,
|
||||
pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
|
||||
pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
|
||||
|
||||
STimeWindow nextWin = win;
|
||||
while (1) {
|
||||
int32_t prevEndPos = (forwardStep - 1) * step + startPos;
|
||||
startPos = getNextQualifiedWindow(&pInfo->interval, &nextWin, &pSDataBlock->info, tsCols, prevEndPos, pInfo->order);
|
||||
startPos = getNextQualifiedWindow(&pInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos, pInfo->order);
|
||||
if (startPos < 0) {
|
||||
break;
|
||||
}
|
||||
|
@ -748,20 +752,20 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
|
|||
|
||||
ekey = ascScan? nextWin.ekey:nextWin.skey;
|
||||
forwardStep =
|
||||
getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, pInfo->order);
|
||||
getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, pInfo->order);
|
||||
|
||||
// window start(end) key interpolation
|
||||
doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->binfo.pCtx, pResult, &nextWin, startPos, forwardStep,
|
||||
doWindowBorderInterpolation(pOperatorInfo, pBlock, pInfo->binfo.pCtx, pResult, &nextWin, startPos, forwardStep,
|
||||
pInfo->order, false);
|
||||
|
||||
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true);
|
||||
doApplyFunctions(pTaskInfo, pInfo->binfo.pCtx, &nextWin, &pInfo->twAggSup.timeWindowData, startPos, forwardStep, tsCols,
|
||||
pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
|
||||
pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
|
||||
}
|
||||
|
||||
if (pInfo->timeWindowInterpo) {
|
||||
int32_t rowIndex = ascScan ? (pSDataBlock->info.rows - 1) : 0;
|
||||
saveDataBlockLastRow(pInfo->pRow, pSDataBlock->pDataBlock, rowIndex, pSDataBlock->info.numOfCols);
|
||||
int32_t rowIndex = ascScan ? (pBlock->info.rows - 1) : 0;
|
||||
saveDataBlockLastRow(pInfo->pRow, pBlock->pDataBlock, rowIndex, pBlock->info.numOfCols);
|
||||
}
|
||||
|
||||
return pUpdated;
|
||||
|
@ -938,8 +942,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
|
|||
return pBInfo->pRes;
|
||||
}
|
||||
|
||||
int32_t order = TSDB_ORDER_ASC;
|
||||
STimeWindow win = pTaskInfo->window;
|
||||
int32_t order = TSDB_ORDER_ASC;
|
||||
|
||||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||
while (1) {
|
||||
|
@ -952,6 +955,8 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
|
|||
}
|
||||
|
||||
setInputDataBlock(pOperator, pBInfo->pCtx, pBlock, order, MAIN_SCAN, true);
|
||||
blockDataUpdateTsWindow(pBlock, pInfo->tsSlotId);
|
||||
|
||||
doStateWindowAggImpl(pOperator, pInfo, pBlock);
|
||||
}
|
||||
|
||||
|
@ -1429,6 +1434,8 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
|
|||
|
||||
// the pDataBlock are always the same one, no need to call this again
|
||||
setInputDataBlock(pOperator, pBInfo->pCtx, pBlock, order, MAIN_SCAN, true);
|
||||
blockDataUpdateTsWindow(pBlock, pInfo->tsSlotId);
|
||||
|
||||
doSessionWindowAggImpl(pOperator, pInfo, pBlock);
|
||||
}
|
||||
|
||||
|
|
|
@ -189,7 +189,7 @@ SSDataBlock* get2ColsDummyBlock(SOperatorInfo* pOperator) {
|
|||
|
||||
pInfo->current += 1;
|
||||
|
||||
blockDataUpdateTsWindow(pBlock);
|
||||
blockDataUpdateTsWindow(pBlock, 0);
|
||||
return pBlock;
|
||||
}
|
||||
|
||||
|
|
|
@ -241,7 +241,7 @@ alter_table_clause(A) ::=
|
|||
alter_table_clause(A) ::=
|
||||
full_table_name(B) RENAME TAG column_name(C) column_name(D). { A = createAlterTableRenameCol(pCxt, B, TSDB_ALTER_TABLE_UPDATE_TAG_NAME, &C, &D); }
|
||||
alter_table_clause(A) ::=
|
||||
full_table_name(B) SET TAG column_name(C) NK_EQ literal(D). { A = createAlterTableSetTag(pCxt, B, &C, releaseRawExprNode(pCxt, D)); }
|
||||
full_table_name(B) SET TAG column_name(C) NK_EQ signed_literal(D). { A = createAlterTableSetTag(pCxt, B, &C, D); }
|
||||
|
||||
%type multi_create_clause { SNodeList* }
|
||||
%destructor multi_create_clause { nodesDestroyList($$); }
|
||||
|
@ -448,7 +448,7 @@ agg_func_opt(A) ::= AGGREGATE.
|
|||
%type bufsize_opt { int32_t }
|
||||
%destructor bufsize_opt { }
|
||||
bufsize_opt(A) ::= . { A = 0; }
|
||||
bufsize_opt(A) ::= BUFSIZE NK_INTEGER(B). { A = strtol(B.z, NULL, 10); }
|
||||
bufsize_opt(A) ::= BUFSIZE NK_INTEGER(B). { A = taosStr2Int32(B.z, NULL, 10); }
|
||||
|
||||
/************************************************ create/drop stream **************************************************/
|
||||
cmd ::= CREATE STREAM not_exists_opt(E) stream_name(A)
|
||||
|
|
|
@ -694,66 +694,110 @@ static EDealRes translateValue(STranslateContext* pCxt, SValueNode* pVal) {
|
|||
return translateValueImpl(pCxt, pVal, pVal->node.resType);
|
||||
}
|
||||
|
||||
static EDealRes translateOperator(STranslateContext* pCxt, SOperatorNode* pOp) {
|
||||
if (nodesIsUnaryOp(pOp)) {
|
||||
if (OP_TYPE_MINUS == pOp->opType) {
|
||||
if (!IS_MATHABLE_TYPE(((SExprNode*)(pOp->pLeft))->resType.type)) {
|
||||
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, ((SExprNode*)(pOp->pLeft))->aliasName);
|
||||
}
|
||||
pOp->node.resType.type = TSDB_DATA_TYPE_DOUBLE;
|
||||
pOp->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes;
|
||||
} else {
|
||||
pOp->node.resType.type = TSDB_DATA_TYPE_BOOL;
|
||||
pOp->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_BOOL].bytes;
|
||||
}
|
||||
return DEAL_RES_CONTINUE;
|
||||
static bool isMultiResFunc(SNode* pNode) {
|
||||
if (NULL == pNode) {
|
||||
return false;
|
||||
}
|
||||
SDataType ldt = ((SExprNode*)(pOp->pLeft))->resType;
|
||||
SDataType rdt = ((SExprNode*)(pOp->pRight))->resType;
|
||||
if (nodesIsArithmeticOp(pOp)) {
|
||||
if (TSDB_DATA_TYPE_JSON == ldt.type || TSDB_DATA_TYPE_BLOB == ldt.type || TSDB_DATA_TYPE_JSON == rdt.type ||
|
||||
TSDB_DATA_TYPE_BLOB == rdt.type) {
|
||||
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, ((SExprNode*)(pOp->pRight))->aliasName);
|
||||
}
|
||||
if ((TSDB_DATA_TYPE_TIMESTAMP == ldt.type && TSDB_DATA_TYPE_TIMESTAMP == rdt.type) ||
|
||||
(TSDB_DATA_TYPE_TIMESTAMP == ldt.type && (IS_VAR_DATA_TYPE(rdt.type) || IS_FLOAT_TYPE(rdt.type))) ||
|
||||
(TSDB_DATA_TYPE_TIMESTAMP == rdt.type && (IS_VAR_DATA_TYPE(ldt.type) || IS_FLOAT_TYPE(ldt.type)))) {
|
||||
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, ((SExprNode*)(pOp->pRight))->aliasName);
|
||||
}
|
||||
if (QUERY_NODE_FUNCTION != nodeType(pNode) || !fmIsMultiResFunc(((SFunctionNode*)pNode)->funcId)) {
|
||||
return false;
|
||||
}
|
||||
SNodeList* pParameterList = ((SFunctionNode*)pNode)->pParameterList;
|
||||
if (LIST_LENGTH(pParameterList) > 1) {
|
||||
return true;
|
||||
}
|
||||
SNode* pParam = nodesListGetNode(pParameterList, 0);
|
||||
return (QUERY_NODE_COLUMN == nodeType(pParam) ? 0 == strcmp(((SColumnNode*)pParam)->colName, "*") : false);
|
||||
}
|
||||
|
||||
if ((TSDB_DATA_TYPE_TIMESTAMP == ldt.type && IS_INTEGER_TYPE(rdt.type)) ||
|
||||
(TSDB_DATA_TYPE_TIMESTAMP == rdt.type && IS_INTEGER_TYPE(ldt.type)) ||
|
||||
(TSDB_DATA_TYPE_TIMESTAMP == ldt.type && TSDB_DATA_TYPE_BOOL == rdt.type) ||
|
||||
(TSDB_DATA_TYPE_TIMESTAMP == rdt.type && TSDB_DATA_TYPE_BOOL == ldt.type)) {
|
||||
pOp->node.resType.type = TSDB_DATA_TYPE_TIMESTAMP;
|
||||
pOp->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes;
|
||||
} else {
|
||||
pOp->node.resType.type = TSDB_DATA_TYPE_DOUBLE;
|
||||
pOp->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes;
|
||||
}
|
||||
} else if (nodesIsComparisonOp(pOp)) {
|
||||
if (TSDB_DATA_TYPE_BLOB == ldt.type || TSDB_DATA_TYPE_JSON == rdt.type || TSDB_DATA_TYPE_BLOB == rdt.type) {
|
||||
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, ((SExprNode*)(pOp->pRight))->aliasName);
|
||||
}
|
||||
if (OP_TYPE_IN == pOp->opType || OP_TYPE_NOT_IN == pOp->opType) {
|
||||
((SExprNode*)pOp->pRight)->resType = ((SExprNode*)pOp->pLeft)->resType;
|
||||
}
|
||||
if (nodesIsRegularOp(pOp)) {
|
||||
if (!IS_STR_DATA_TYPE(((SExprNode*)(pOp->pLeft))->resType.type)) {
|
||||
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, ((SExprNode*)(pOp->pLeft))->aliasName);
|
||||
}
|
||||
if (QUERY_NODE_VALUE != nodeType(pOp->pRight) || !IS_STR_DATA_TYPE(((SExprNode*)(pOp->pRight))->resType.type)) {
|
||||
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, ((SExprNode*)(pOp->pRight))->aliasName);
|
||||
}
|
||||
static EDealRes translateUnaryOperator(STranslateContext* pCxt, SOperatorNode* pOp) {
|
||||
if (OP_TYPE_MINUS == pOp->opType) {
|
||||
if (!IS_MATHABLE_TYPE(((SExprNode*)(pOp->pLeft))->resType.type)) {
|
||||
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, ((SExprNode*)(pOp->pLeft))->aliasName);
|
||||
}
|
||||
pOp->node.resType.type = TSDB_DATA_TYPE_DOUBLE;
|
||||
pOp->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes;
|
||||
} else {
|
||||
pOp->node.resType.type = TSDB_DATA_TYPE_BOOL;
|
||||
pOp->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_BOOL].bytes;
|
||||
} else if (nodesIsJsonOp(pOp)) {
|
||||
if (TSDB_DATA_TYPE_JSON != ldt.type || TSDB_DATA_TYPE_BINARY != rdt.type) {
|
||||
}
|
||||
return DEAL_RES_CONTINUE;
|
||||
}
|
||||
|
||||
static EDealRes translateArithmeticOperator(STranslateContext* pCxt, SOperatorNode* pOp) {
|
||||
SDataType ldt = ((SExprNode*)(pOp->pLeft))->resType;
|
||||
SDataType rdt = ((SExprNode*)(pOp->pRight))->resType;
|
||||
if (TSDB_DATA_TYPE_JSON == ldt.type || TSDB_DATA_TYPE_BLOB == ldt.type || TSDB_DATA_TYPE_JSON == rdt.type ||
|
||||
TSDB_DATA_TYPE_BLOB == rdt.type) {
|
||||
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, ((SExprNode*)(pOp->pRight))->aliasName);
|
||||
}
|
||||
if ((TSDB_DATA_TYPE_TIMESTAMP == ldt.type && TSDB_DATA_TYPE_TIMESTAMP == rdt.type) ||
|
||||
(TSDB_DATA_TYPE_TIMESTAMP == ldt.type && (IS_VAR_DATA_TYPE(rdt.type) || IS_FLOAT_TYPE(rdt.type))) ||
|
||||
(TSDB_DATA_TYPE_TIMESTAMP == rdt.type && (IS_VAR_DATA_TYPE(ldt.type) || IS_FLOAT_TYPE(ldt.type)))) {
|
||||
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, ((SExprNode*)(pOp->pRight))->aliasName);
|
||||
}
|
||||
|
||||
if ((TSDB_DATA_TYPE_TIMESTAMP == ldt.type && IS_INTEGER_TYPE(rdt.type)) ||
|
||||
(TSDB_DATA_TYPE_TIMESTAMP == rdt.type && IS_INTEGER_TYPE(ldt.type)) ||
|
||||
(TSDB_DATA_TYPE_TIMESTAMP == ldt.type && TSDB_DATA_TYPE_BOOL == rdt.type) ||
|
||||
(TSDB_DATA_TYPE_TIMESTAMP == rdt.type && TSDB_DATA_TYPE_BOOL == ldt.type)) {
|
||||
pOp->node.resType.type = TSDB_DATA_TYPE_TIMESTAMP;
|
||||
pOp->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes;
|
||||
} else {
|
||||
pOp->node.resType.type = TSDB_DATA_TYPE_DOUBLE;
|
||||
pOp->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes;
|
||||
}
|
||||
return DEAL_RES_CONTINUE;
|
||||
}
|
||||
|
||||
static EDealRes translateComparisonOperator(STranslateContext* pCxt, SOperatorNode* pOp) {
|
||||
SDataType ldt = ((SExprNode*)(pOp->pLeft))->resType;
|
||||
SDataType rdt = ((SExprNode*)(pOp->pRight))->resType;
|
||||
if (TSDB_DATA_TYPE_BLOB == ldt.type || TSDB_DATA_TYPE_JSON == rdt.type || TSDB_DATA_TYPE_BLOB == rdt.type) {
|
||||
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, ((SExprNode*)(pOp->pRight))->aliasName);
|
||||
}
|
||||
if (OP_TYPE_IN == pOp->opType || OP_TYPE_NOT_IN == pOp->opType) {
|
||||
((SExprNode*)pOp->pRight)->resType = ((SExprNode*)pOp->pLeft)->resType;
|
||||
}
|
||||
if (nodesIsRegularOp(pOp)) {
|
||||
if (!IS_STR_DATA_TYPE(((SExprNode*)(pOp->pLeft))->resType.type)) {
|
||||
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, ((SExprNode*)(pOp->pLeft))->aliasName);
|
||||
}
|
||||
if (QUERY_NODE_VALUE != nodeType(pOp->pRight) || !IS_STR_DATA_TYPE(((SExprNode*)(pOp->pRight))->resType.type)) {
|
||||
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, ((SExprNode*)(pOp->pRight))->aliasName);
|
||||
}
|
||||
pOp->node.resType.type = TSDB_DATA_TYPE_JSON;
|
||||
pOp->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_JSON].bytes;
|
||||
}
|
||||
pOp->node.resType.type = TSDB_DATA_TYPE_BOOL;
|
||||
pOp->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_BOOL].bytes;
|
||||
return DEAL_RES_CONTINUE;
|
||||
}
|
||||
|
||||
static EDealRes translateJsonOperator(STranslateContext* pCxt, SOperatorNode* pOp) {
|
||||
SDataType ldt = ((SExprNode*)(pOp->pLeft))->resType;
|
||||
SDataType rdt = ((SExprNode*)(pOp->pRight))->resType;
|
||||
if (TSDB_DATA_TYPE_JSON != ldt.type || TSDB_DATA_TYPE_BINARY != rdt.type) {
|
||||
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, ((SExprNode*)(pOp->pRight))->aliasName);
|
||||
}
|
||||
pOp->node.resType.type = TSDB_DATA_TYPE_JSON;
|
||||
pOp->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_JSON].bytes;
|
||||
return DEAL_RES_CONTINUE;
|
||||
}
|
||||
|
||||
static EDealRes translateOperator(STranslateContext* pCxt, SOperatorNode* pOp) {
|
||||
if (isMultiResFunc(pOp->pLeft)) {
|
||||
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, ((SExprNode*)(pOp->pLeft))->aliasName);
|
||||
}
|
||||
if (isMultiResFunc(pOp->pRight)) {
|
||||
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, ((SExprNode*)(pOp->pRight))->aliasName);
|
||||
}
|
||||
|
||||
if (nodesIsUnaryOp(pOp)) {
|
||||
return translateUnaryOperator(pCxt, pOp);
|
||||
} else if (nodesIsArithmeticOp(pOp)) {
|
||||
return translateArithmeticOperator(pCxt, pOp);
|
||||
} else if (nodesIsComparisonOp(pOp)) {
|
||||
return translateComparisonOperator(pCxt, pOp);
|
||||
} else if (nodesIsJsonOp(pOp)) {
|
||||
return translateJsonOperator(pCxt, pOp);
|
||||
}
|
||||
return DEAL_RES_CONTINUE;
|
||||
}
|
||||
|
@ -808,6 +852,13 @@ static bool hasInvalidFuncNesting(SNodeList* pParameterList) {
|
|||
}
|
||||
|
||||
static EDealRes translateFunction(STranslateContext* pCxt, SFunctionNode* pFunc) {
|
||||
SNode* pParam = NULL;
|
||||
FOREACH(pParam, pFunc->pParameterList) {
|
||||
if (isMultiResFunc(pParam)) {
|
||||
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, ((SExprNode*)pParam)->aliasName);
|
||||
}
|
||||
}
|
||||
|
||||
SFmGetFuncInfoParam param = {.pCtg = pCxt->pParseCxt->pCatalog,
|
||||
.pRpc = pCxt->pParseCxt->pTransporter,
|
||||
.pMgmtEps = &pCxt->pParseCxt->mgmtEpSet,
|
||||
|
@ -926,9 +977,10 @@ typedef struct SCheckExprForGroupByCxt {
|
|||
STranslateContext* pTranslateCxt;
|
||||
int32_t selectFuncNum;
|
||||
bool hasSelectValFunc;
|
||||
bool hasOtherAggFunc;
|
||||
} SCheckExprForGroupByCxt;
|
||||
|
||||
static EDealRes rewriteColToSelectValFunc(STranslateContext* pCxt, bool* pHasSelectValFunc, SNode** pNode) {
|
||||
static EDealRes rewriteColToSelectValFunc(STranslateContext* pCxt, SNode** pNode) {
|
||||
SFunctionNode* pFunc = nodesMakeNode(QUERY_NODE_FUNCTION);
|
||||
if (NULL == pFunc) {
|
||||
pCxt->errCode = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -942,9 +994,6 @@ static EDealRes rewriteColToSelectValFunc(STranslateContext* pCxt, bool* pHasSel
|
|||
}
|
||||
if (TSDB_CODE_SUCCESS == pCxt->errCode) {
|
||||
*pNode = (SNode*)pFunc;
|
||||
if (NULL != pHasSelectValFunc) {
|
||||
*pHasSelectValFunc = true;
|
||||
}
|
||||
} else {
|
||||
nodesDestroyNode(pFunc);
|
||||
}
|
||||
|
@ -956,8 +1005,12 @@ static EDealRes doCheckExprForGroupBy(SNode** pNode, void* pContext) {
|
|||
if (!nodesIsExprNode(*pNode) || isAliasColumn(*pNode)) {
|
||||
return DEAL_RES_CONTINUE;
|
||||
}
|
||||
pCxt->selectFuncNum += isSelectFunc(*pNode) ? 1 : 0;
|
||||
if (pCxt->selectFuncNum > 1 && pCxt->hasSelectValFunc) {
|
||||
if (isSelectFunc(*pNode)) {
|
||||
++(pCxt->selectFuncNum);
|
||||
} else if (isAggFunc(*pNode)) {
|
||||
pCxt->hasOtherAggFunc = true;
|
||||
}
|
||||
if ((pCxt->selectFuncNum > 1 && pCxt->hasSelectValFunc) || (pCxt->hasOtherAggFunc && pCxt->hasSelectValFunc)) {
|
||||
return generateDealNodeErrMsg(pCxt->pTranslateCxt, getGroupByErrorCode(pCxt->pTranslateCxt));
|
||||
}
|
||||
if (isAggFunc(*pNode) && !isDistinctOrderBy(pCxt->pTranslateCxt)) {
|
||||
|
@ -970,10 +1023,11 @@ static EDealRes doCheckExprForGroupBy(SNode** pNode, void* pContext) {
|
|||
}
|
||||
}
|
||||
if (isScanPseudoColumnFunc(*pNode) || QUERY_NODE_COLUMN == nodeType(*pNode)) {
|
||||
if (pCxt->selectFuncNum > 1) {
|
||||
if (pCxt->selectFuncNum > 1 || pCxt->hasOtherAggFunc) {
|
||||
return generateDealNodeErrMsg(pCxt->pTranslateCxt, getGroupByErrorCode(pCxt->pTranslateCxt));
|
||||
} else {
|
||||
return rewriteColToSelectValFunc(pCxt->pTranslateCxt, &pCxt->hasSelectValFunc, pNode);
|
||||
pCxt->hasSelectValFunc = true;
|
||||
return rewriteColToSelectValFunc(pCxt->pTranslateCxt, pNode);
|
||||
}
|
||||
}
|
||||
if (isAggFunc(*pNode) && isDistinctOrderBy(pCxt->pTranslateCxt)) {
|
||||
|
@ -983,7 +1037,8 @@ static EDealRes doCheckExprForGroupBy(SNode** pNode, void* pContext) {
|
|||
}
|
||||
|
||||
static int32_t checkExprForGroupBy(STranslateContext* pCxt, SNode** pNode) {
|
||||
SCheckExprForGroupByCxt cxt = {.pTranslateCxt = pCxt, .selectFuncNum = 0, .hasSelectValFunc = false};
|
||||
SCheckExprForGroupByCxt cxt = {
|
||||
.pTranslateCxt = pCxt, .selectFuncNum = 0, .hasSelectValFunc = false, .hasOtherAggFunc = false};
|
||||
nodesRewriteExpr(pNode, doCheckExprForGroupBy, &cxt);
|
||||
if (cxt.selectFuncNum != 1 && cxt.hasSelectValFunc) {
|
||||
return generateSyntaxErrMsg(&pCxt->msgBuf, getGroupByErrorCode(pCxt));
|
||||
|
@ -995,7 +1050,8 @@ static int32_t checkExprListForGroupBy(STranslateContext* pCxt, SNodeList* pList
|
|||
if (NULL == getGroupByList(pCxt)) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
SCheckExprForGroupByCxt cxt = {.pTranslateCxt = pCxt, .selectFuncNum = 0, .hasSelectValFunc = false};
|
||||
SCheckExprForGroupByCxt cxt = {
|
||||
.pTranslateCxt = pCxt, .selectFuncNum = 0, .hasSelectValFunc = false, .hasOtherAggFunc = false};
|
||||
nodesRewriteExprs(pList, doCheckExprForGroupBy, &cxt);
|
||||
if (cxt.selectFuncNum != 1 && cxt.hasSelectValFunc) {
|
||||
return generateSyntaxErrMsg(&pCxt->msgBuf, getGroupByErrorCode(pCxt));
|
||||
|
@ -1008,7 +1064,7 @@ static EDealRes rewriteColsToSelectValFuncImpl(SNode** pNode, void* pContext) {
|
|||
return DEAL_RES_IGNORE_CHILD;
|
||||
}
|
||||
if (isScanPseudoColumnFunc(*pNode) || QUERY_NODE_COLUMN == nodeType(*pNode)) {
|
||||
return rewriteColToSelectValFunc((STranslateContext*)pContext, NULL, pNode);
|
||||
return rewriteColToSelectValFunc((STranslateContext*)pContext, pNode);
|
||||
}
|
||||
return DEAL_RES_CONTINUE;
|
||||
}
|
||||
|
@ -1027,11 +1083,16 @@ typedef struct CheckAggColCoexistCxt {
|
|||
bool existCol;
|
||||
bool existNonstdFunc;
|
||||
int32_t selectFuncNum;
|
||||
bool existOtherAggFunc;
|
||||
} CheckAggColCoexistCxt;
|
||||
|
||||
static EDealRes doCheckAggColCoexist(SNode* pNode, void* pContext) {
|
||||
CheckAggColCoexistCxt* pCxt = (CheckAggColCoexistCxt*)pContext;
|
||||
pCxt->selectFuncNum += isSelectFunc(pNode) ? 1 : 0;
|
||||
if (isSelectFunc(pNode)) {
|
||||
++(pCxt->selectFuncNum);
|
||||
} else if (isAggFunc(pNode)) {
|
||||
pCxt->existOtherAggFunc = true;
|
||||
}
|
||||
if (isAggFunc(pNode)) {
|
||||
pCxt->existAggFunc = true;
|
||||
return DEAL_RES_IGNORE_CHILD;
|
||||
|
@ -1050,13 +1111,17 @@ static int32_t checkAggColCoexist(STranslateContext* pCxt, SSelectStmt* pSelect)
|
|||
if (NULL != pSelect->pGroupByList) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
CheckAggColCoexistCxt cxt = {
|
||||
.pTranslateCxt = pCxt, .existAggFunc = false, .existCol = false, .existNonstdFunc = false};
|
||||
CheckAggColCoexistCxt cxt = {.pTranslateCxt = pCxt,
|
||||
.existAggFunc = false,
|
||||
.existCol = false,
|
||||
.existNonstdFunc = false,
|
||||
.selectFuncNum = 0,
|
||||
.existOtherAggFunc = false};
|
||||
nodesWalkExprs(pSelect->pProjectionList, doCheckAggColCoexist, &cxt);
|
||||
if (!pSelect->isDistinct) {
|
||||
nodesWalkExprs(pSelect->pOrderByList, doCheckAggColCoexist, &cxt);
|
||||
}
|
||||
if (1 == cxt.selectFuncNum) {
|
||||
if (1 == cxt.selectFuncNum && !cxt.existOtherAggFunc) {
|
||||
return rewriteColsToSelectValFunc(pCxt, pSelect);
|
||||
}
|
||||
if ((cxt.selectFuncNum > 1 || cxt.existAggFunc || NULL != pSelect->pWindow) && cxt.existCol) {
|
||||
|
@ -1230,18 +1295,6 @@ static int32_t createAllColumns(STranslateContext* pCxt, SNodeList** pCols) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static bool isMultiResFunc(SNode* pNode) {
|
||||
if (QUERY_NODE_FUNCTION != nodeType(pNode) || !fmIsMultiResFunc(((SFunctionNode*)pNode)->funcId)) {
|
||||
return false;
|
||||
}
|
||||
SNodeList* pParameterList = ((SFunctionNode*)pNode)->pParameterList;
|
||||
if (LIST_LENGTH(pParameterList) > 1) {
|
||||
return true;
|
||||
}
|
||||
SNode* pParam = nodesListGetNode(pParameterList, 0);
|
||||
return (QUERY_NODE_COLUMN == nodeType(pParam) ? 0 == strcmp(((SColumnNode*)pParam)->colName, "*") : false);
|
||||
}
|
||||
|
||||
static SNode* createMultiResFunc(SFunctionNode* pSrcFunc, SExprNode* pExpr) {
|
||||
SFunctionNode* pFunc = nodesMakeNode(QUERY_NODE_FUNCTION);
|
||||
if (NULL == pFunc) {
|
||||
|
@ -1872,7 +1925,7 @@ static SNode* createSetOperProject(const char* pTableAlias, SNode* pNode) {
|
|||
}
|
||||
|
||||
static bool dataTypeEqual(const SDataType* l, const SDataType* r) {
|
||||
return (l->type == r->type && l->bytes == l->bytes && l->precision == r->precision && l->scale == l->scale);
|
||||
return (l->type == r->type && l->bytes == r->bytes && l->precision == r->precision && l->scale == r->scale);
|
||||
}
|
||||
|
||||
static int32_t createCastFunc(STranslateContext* pCxt, SNode* pExpr, SDataType dt, SNode** pCast) {
|
||||
|
@ -2726,9 +2779,11 @@ static int32_t buildCreateStbReq(STranslateContext* pCxt, SCreateTableStmt* pStm
|
|||
|
||||
SName tableName;
|
||||
tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, &tableName), pReq->name);
|
||||
collectUseTable(&tableName, pCxt->pTables);
|
||||
|
||||
return buildRollupAst(pCxt, pStmt, pReq);
|
||||
int32_t code = collectUseTable(&tableName, pCxt->pTables);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = buildRollupAst(pCxt, pStmt, pReq);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t translateCreateSuperTable(STranslateContext* pCxt, SCreateTableStmt* pStmt) {
|
||||
|
@ -4033,13 +4088,18 @@ static int32_t createValueFromFunction(STranslateContext* pCxt, SFunctionNode* p
|
|||
return scalarCalculateConstants((SNode*)pFunc, (SNode**)pVal);
|
||||
}
|
||||
|
||||
static int32_t colDataBytesToValueDataBytes(uint8_t type, int32_t bytes) {
|
||||
if (TSDB_DATA_TYPE_VARCHAR == type || TSDB_DATA_TYPE_BINARY == type || TSDB_DATA_TYPE_VARBINARY == type) {
|
||||
return bytes - VARSTR_HEADER_SIZE;
|
||||
} else if (TSDB_DATA_TYPE_NCHAR == type) {
|
||||
return (bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
|
||||
}
|
||||
return bytes;
|
||||
}
|
||||
|
||||
static SDataType schemaToDataType(SSchema* pSchema) {
|
||||
SDataType dt = {.type = pSchema->type, .bytes = pSchema->bytes, .precision = 0, .scale = 0};
|
||||
if (TSDB_DATA_TYPE_VARCHAR == dt.type || TSDB_DATA_TYPE_BINARY == dt.type || TSDB_DATA_TYPE_VARBINARY == dt.type) {
|
||||
dt.bytes -= VARSTR_HEADER_SIZE;
|
||||
} else if (TSDB_DATA_TYPE_NCHAR == dt.type) {
|
||||
dt.bytes = (dt.bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
|
||||
}
|
||||
dt.bytes = colDataBytesToValueDataBytes(pSchema->type, pSchema->bytes);
|
||||
return dt;
|
||||
}
|
||||
|
||||
|
@ -4441,7 +4501,8 @@ static int32_t buildUpdateColReq(STranslateContext* pCxt, SAlterTableStmt* pStmt
|
|||
SSchema* pSchema = getColSchema(pTableMeta, pStmt->colName);
|
||||
if (NULL == pSchema) {
|
||||
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_COLUMN, pStmt->colName);
|
||||
} else if (!IS_VAR_DATA_TYPE(pSchema->type) || pSchema->bytes >= pReq->colModBytes) {
|
||||
} else if (!IS_VAR_DATA_TYPE(pSchema->type) || pSchema->type != pStmt->dataType.type ||
|
||||
pSchema->bytes >= pReq->colModBytes) {
|
||||
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_MODIFY_COL);
|
||||
}
|
||||
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -239,6 +239,10 @@ TEST_F(ParserSelectTest, semanticError) {
|
|||
// TSDB_CODE_PAR_WRONG_VALUE_TYPE
|
||||
run("SELECT timestamp '2010a' FROM t1", TSDB_CODE_PAR_WRONG_VALUE_TYPE, PARSER_STAGE_TRANSLATE);
|
||||
|
||||
run("SELECT LAST(*) + SUM(c1) FROM t1", TSDB_CODE_PAR_WRONG_VALUE_TYPE, PARSER_STAGE_TRANSLATE);
|
||||
|
||||
run("SELECT CEIL(LAST(ts, c1)) FROM t1", TSDB_CODE_PAR_WRONG_VALUE_TYPE, PARSER_STAGE_TRANSLATE);
|
||||
|
||||
// TSDB_CODE_PAR_ILLEGAL_USE_AGG_FUNCTION
|
||||
run("SELECT c2 FROM t1 tt1 join t1 tt2 on COUNT(*) > 0", TSDB_CODE_PAR_ILLEGAL_USE_AGG_FUNCTION,
|
||||
PARSER_STAGE_TRANSLATE);
|
||||
|
|
|
@ -248,6 +248,7 @@ static SLogicSubplan* unionCreateSubplan(SSplitContext* pCxt, SLogicNode* pNode)
|
|||
pSubplan->id.groupId = pCxt->groupId;
|
||||
pSubplan->subplanType = SUBPLAN_TYPE_SCAN;
|
||||
pSubplan->pNode = pNode;
|
||||
pNode->pParent = NULL;
|
||||
return pSubplan;
|
||||
}
|
||||
|
||||
|
@ -408,17 +409,30 @@ static const SSplitRule splitRuleSet[] = {{.pName = "SuperTableScan", .splitFunc
|
|||
|
||||
static const int32_t splitRuleNum = (sizeof(splitRuleSet) / sizeof(SSplitRule));
|
||||
|
||||
static void dumpLogicSubplan(const char* pRuleName, SLogicSubplan* pSubplan) {
|
||||
char* pStr = NULL;
|
||||
nodesNodeToString(pSubplan, false, &pStr, NULL);
|
||||
qDebugL("apply %s rule: %s", pRuleName, pStr);
|
||||
taosMemoryFree(pStr);
|
||||
}
|
||||
|
||||
static int32_t applySplitRule(SLogicSubplan* pSubplan) {
|
||||
SSplitContext cxt = {.queryId = pSubplan->id.queryId, .groupId = pSubplan->id.groupId + 1, .split = false};
|
||||
bool split = false;
|
||||
do {
|
||||
cxt.split = false;
|
||||
split = false;
|
||||
for (int32_t i = 0; i < splitRuleNum; ++i) {
|
||||
cxt.split = false;
|
||||
int32_t code = splitRuleSet[i].splitFunc(&cxt, pSubplan);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
return code;
|
||||
}
|
||||
if (cxt.split) {
|
||||
split = true;
|
||||
dumpLogicSubplan(splitRuleSet[i].pName, pSubplan);
|
||||
}
|
||||
}
|
||||
} while (cxt.split);
|
||||
} while (split);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
|
@ -23,7 +23,12 @@ class PlanSetOpTest : public PlannerTestBase {};
|
|||
TEST_F(PlanSetOpTest, unionAll) {
|
||||
useDb("root", "test");
|
||||
|
||||
// sql 1: single UNION ALL operator
|
||||
run("SELECT c1, c2 FROM t1 WHERE c1 > 10 UNION ALL SELECT c1, c2 FROM t1 WHERE c1 > 20");
|
||||
// sql 2: multi UNION ALL operator
|
||||
run("SELECT c1, c2 FROM t1 WHERE c1 > 10 "
|
||||
"UNION ALL SELECT c1, c2 FROM t1 WHERE c1 > 20 "
|
||||
"UNION ALL SELECT c1, c2 FROM t1 WHERE c1 > 30");
|
||||
}
|
||||
|
||||
TEST_F(PlanSetOpTest, unionAllSubquery) {
|
||||
|
@ -44,7 +49,12 @@ TEST_F(PlanSetOpTest, unionAllWithSubquery) {
|
|||
TEST_F(PlanSetOpTest, union) {
|
||||
useDb("root", "test");
|
||||
|
||||
// single UNION operator
|
||||
run("SELECT c1, c2 FROM t1 WHERE c1 > 10 UNION SELECT c1, c2 FROM t1 WHERE c1 > 20");
|
||||
// multi UNION operator
|
||||
run("SELECT c1, c2 FROM t1 WHERE c1 > 10 "
|
||||
"UNION SELECT c1, c2 FROM t1 WHERE c1 > 20 "
|
||||
"UNION SELECT c1, c2 FROM t1 WHERE c1 > 30");
|
||||
}
|
||||
|
||||
TEST_F(PlanSetOpTest, unionContainJoin) {
|
||||
|
@ -62,3 +72,12 @@ TEST_F(PlanSetOpTest, unionSubquery) {
|
|||
|
||||
run("SELECT * FROM (SELECT c1, c2 FROM t1 UNION SELECT c1, c2 FROM t1)");
|
||||
}
|
||||
|
||||
TEST_F(PlanSetOpTest, bug001) {
|
||||
useDb("root", "test");
|
||||
|
||||
run("SELECT c2 FROM t1 WHERE c1 IS NOT NULL GROUP BY c2 "
|
||||
"UNION "
|
||||
"SELECT 'abcdefghijklmnopqrstuvwxyz' FROM t1 "
|
||||
"WHERE 'abcdefghijklmnopqrstuvwxyz' IS NOT NULL GROUP BY 'abcdefghijklmnopqrstuvwxyz'");
|
||||
}
|
||||
|
|
|
@ -23,9 +23,7 @@ class PlanSubqeuryTest : public PlannerTestBase {};
|
|||
TEST_F(PlanSubqeuryTest, basic) {
|
||||
useDb("root", "test");
|
||||
|
||||
if (0 == g_skipSql) {
|
||||
run("SELECT * FROM (SELECT * FROM t1)");
|
||||
}
|
||||
run("SELECT * FROM (SELECT * FROM t1)");
|
||||
|
||||
run("SELECT LAST(c1) FROM (SELECT * FROM t1)");
|
||||
}
|
||||
|
|
|
@ -35,18 +35,19 @@ class PlannerEnv : public testing::Environment {
|
|||
|
||||
private:
|
||||
void initLog(const char* path) {
|
||||
dDebugFlag = 143;
|
||||
vDebugFlag = 0;
|
||||
mDebugFlag = 143;
|
||||
cDebugFlag = 0;
|
||||
jniDebugFlag = 0;
|
||||
tmrDebugFlag = 135;
|
||||
uDebugFlag = 135;
|
||||
rpcDebugFlag = 143;
|
||||
qDebugFlag = 143;
|
||||
wDebugFlag = 0;
|
||||
sDebugFlag = 0;
|
||||
tsdbDebugFlag = 0;
|
||||
int32_t logLevel = getLogLevel();
|
||||
dDebugFlag = logLevel;
|
||||
vDebugFlag = logLevel;
|
||||
mDebugFlag = logLevel;
|
||||
cDebugFlag = logLevel;
|
||||
jniDebugFlag = logLevel;
|
||||
tmrDebugFlag = logLevel;
|
||||
uDebugFlag = logLevel;
|
||||
rpcDebugFlag = logLevel;
|
||||
qDebugFlag = logLevel;
|
||||
wDebugFlag = logLevel;
|
||||
sDebugFlag = logLevel;
|
||||
tsdbDebugFlag = logLevel;
|
||||
tsLogEmbedded = 1;
|
||||
tsAsyncLog = 0;
|
||||
|
||||
|
@ -60,17 +61,26 @@ class PlannerEnv : public testing::Environment {
|
|||
};
|
||||
|
||||
static void parseArg(int argc, char* argv[]) {
|
||||
int opt = 0;
|
||||
const char* optstring = "";
|
||||
int opt = 0;
|
||||
const char* optstring = "";
|
||||
// clang-format off
|
||||
static struct option long_options[] = {
|
||||
{"dump", optional_argument, NULL, 'd'}, {"skipSql", optional_argument, NULL, 's'}, {0, 0, 0, 0}};
|
||||
{"dump", optional_argument, NULL, 'd'},
|
||||
{"skipSql", required_argument, NULL, 's'},
|
||||
{"log", required_argument, NULL, 'l'},
|
||||
{0, 0, 0, 0}
|
||||
};
|
||||
// clang-format on
|
||||
while ((opt = getopt_long(argc, argv, optstring, long_options, NULL)) != -1) {
|
||||
switch (opt) {
|
||||
case 'd':
|
||||
setDumpModule(optarg);
|
||||
break;
|
||||
case 's':
|
||||
g_skipSql = 1;
|
||||
setSkipSqlNum(optarg);
|
||||
break;
|
||||
case 'l':
|
||||
setLogLevel(optarg);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
|
|
|
@ -48,6 +48,7 @@ enum DumpModule {
|
|||
|
||||
DumpModule g_dumpModule = DUMP_MODULE_NOTHING;
|
||||
int32_t g_skipSql = 0;
|
||||
int32_t g_logLevel = 131;
|
||||
|
||||
void setDumpModule(const char* pModule) {
|
||||
if (NULL == pModule) {
|
||||
|
@ -71,14 +72,26 @@ void setDumpModule(const char* pModule) {
|
|||
}
|
||||
}
|
||||
|
||||
void setSkipSqlNum(const char* pNum) { g_skipSql = stoi(optarg); }
|
||||
|
||||
void setLogLevel(const char* pLogLevel) { g_logLevel = stoi(pLogLevel); }
|
||||
|
||||
int32_t getLogLevel() { return g_logLevel; }
|
||||
|
||||
class PlannerTestBaseImpl {
|
||||
public:
|
||||
void useDb(const string& acctId, const string& db) {
|
||||
caseEnv_.acctId_ = acctId;
|
||||
caseEnv_.db_ = db;
|
||||
caseEnv_.nsql_ = g_skipSql;
|
||||
}
|
||||
|
||||
void run(const string& sql) {
|
||||
if (caseEnv_.nsql_ > 0) {
|
||||
--(caseEnv_.nsql_);
|
||||
return;
|
||||
}
|
||||
|
||||
reset();
|
||||
try {
|
||||
SQuery* pQuery = nullptr;
|
||||
|
@ -109,6 +122,10 @@ class PlannerTestBaseImpl {
|
|||
}
|
||||
|
||||
void prepare(const string& sql) {
|
||||
if (caseEnv_.nsql_ > 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
reset();
|
||||
try {
|
||||
doParseSql(sql, &stmtEnv_.pQuery_, true);
|
||||
|
@ -119,6 +136,10 @@ class PlannerTestBaseImpl {
|
|||
}
|
||||
|
||||
void bindParams(TAOS_MULTI_BIND* pParams, int32_t colIdx) {
|
||||
if (caseEnv_.nsql_ > 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
doBindParams(stmtEnv_.pQuery_, pParams, colIdx);
|
||||
} catch (...) {
|
||||
|
@ -128,6 +149,11 @@ class PlannerTestBaseImpl {
|
|||
}
|
||||
|
||||
void exec() {
|
||||
if (caseEnv_.nsql_ > 0) {
|
||||
--(caseEnv_.nsql_);
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
doParseBoundSql(stmtEnv_.pQuery_);
|
||||
|
||||
|
@ -157,8 +183,9 @@ class PlannerTestBaseImpl {
|
|||
|
||||
private:
|
||||
struct caseEnv {
|
||||
string acctId_;
|
||||
string db_;
|
||||
string acctId_;
|
||||
string db_;
|
||||
int32_t nsql_;
|
||||
};
|
||||
|
||||
struct stmtEnv {
|
||||
|
|
|
@ -37,8 +37,9 @@ class PlannerTestBase : public testing::Test {
|
|||
std::unique_ptr<PlannerTestBaseImpl> impl_;
|
||||
};
|
||||
|
||||
extern int32_t g_skipSql;
|
||||
|
||||
extern void setDumpModule(const char* pModule);
|
||||
extern void setDumpModule(const char* pModule);
|
||||
extern void setSkipSqlNum(const char* pNum);
|
||||
extern void setLogLevel(const char* pLogLevel);
|
||||
extern int32_t getLogLevel();
|
||||
|
||||
#endif // PLAN_TEST_UTIL_H
|
||||
|
|
|
@ -45,32 +45,10 @@ STaosError errors[] = {
|
|||
{.val = 0, .str = "success"},
|
||||
#endif
|
||||
|
||||
// rpc
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_ACTION_IN_PROGRESS, "Action in progress")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_AUTH_REQUIRED, "Authentication required")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_AUTH_FAILURE, "Authentication failure")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_REDIRECT, "Redirect")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_NOT_READY, "System not ready") // peer is not ready to process data
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_ALREADY_PROCESSED, "Message already processed")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_LAST_SESSION_NOT_FINISHED, "Last session not finished")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_MISMATCHED_LINK_ID, "Mismatched meter id")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_TOO_SLOW, "Processing of request timed out")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_MAX_SESSIONS, "Number of sessions reached limit") // too many sessions
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_NETWORK_UNAVAIL, "Unable to establish connection")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_APP_ERROR, "Unexpected generic error in RPC")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_UNEXPECTED_RESPONSE, "Unexpected response")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_INVALID_VALUE, "Invalid value")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_INVALID_TRAN_ID, "Invalid transaction id")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_INVALID_SESSION_ID, "Invalid session id")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_INVALID_MSG_TYPE, "Invalid message type")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_INVALID_RESPONSE_TYPE, "Invalid response type")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_INVALID_TIME_STAMP, "Client and server's time is not synchronized")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_APP_NOT_READY, "Database not ready")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_FQDN_ERROR, "Unable to resolve FQDN")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_INVALID_VERSION, "Invalid app version")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_PORT_EADDRINUSE, "port already in use")
|
||||
|
||||
//common & util
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_ACTION_IN_PROGRESS, "Action in progress")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_APP_ERROR, "Unexpected generic error")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_APP_NOT_READY, "Database not ready")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_OUT_OF_MEMORY, "Out of Memory")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_OUT_OF_RANGE, "Out of range")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_OUT_OF_SHM_MEM, "Out of Shared memory")
|
||||
|
@ -104,6 +82,13 @@ TAOS_DEFINE_ERROR(TSDB_CODE_REF_INVALID_ID, "Invalid Ref ID")
|
|||
TAOS_DEFINE_ERROR(TSDB_CODE_REF_ALREADY_EXIST, "Ref is already there")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_REF_NOT_EXIST, "Ref is not there")
|
||||
|
||||
// rpc
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_REDIRECT, "Redirect")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_AUTH_FAILURE, "Authentication failure")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_NETWORK_UNAVAIL, "Unable to establish connection")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_FQDN_ERROR, "Unable to resolve FQDN")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_PORT_EADDRINUSE, "Port already in use")
|
||||
|
||||
//client
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_OPERATION, "Invalid operation")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_QHANDLE, "Invalid qhandle")
|
||||
|
|
|
@ -15,8 +15,8 @@
|
|||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "thash.h"
|
||||
#include "taoserror.h"
|
||||
#include "os.h"
|
||||
#include "taoserror.h"
|
||||
#include "tlog.h"
|
||||
|
||||
// the add ref count operation may trigger the warning if the reference count is greater than the MAX_WARNING_REF_COUNT
|
||||
|
@ -27,36 +27,36 @@
|
|||
|
||||
#define HASH_NEED_RESIZE(_h) ((_h)->size >= (_h)->capacity * HASH_DEFAULT_LOAD_FACTOR)
|
||||
|
||||
#define GET_HASH_NODE_KEY(_n) ((char*)(_n) + sizeof(SHashNode) + (_n)->dataLen)
|
||||
#define GET_HASH_NODE_DATA(_n) ((char*)(_n) + sizeof(SHashNode))
|
||||
#define GET_HASH_PNODE(_n) ((SHashNode *)((char*)(_n) - sizeof(SHashNode)))
|
||||
#define GET_HASH_NODE_KEY(_n) ((char *)(_n) + sizeof(SHashNode) + (_n)->dataLen)
|
||||
#define GET_HASH_NODE_DATA(_n) ((char *)(_n) + sizeof(SHashNode))
|
||||
#define GET_HASH_PNODE(_n) ((SHashNode *)((char *)(_n) - sizeof(SHashNode)))
|
||||
|
||||
#define FREE_HASH_NODE(_fp, _n) \
|
||||
do { \
|
||||
/* if (_fp != NULL) { \
|
||||
(_fp)(_n); \
|
||||
}*/ \
|
||||
/* if (_fp != NULL) { \
|
||||
(_fp)(_n); \
|
||||
}*/ \
|
||||
taosMemoryFreeClear(_n); \
|
||||
} while (0);
|
||||
|
||||
struct SHashNode {
|
||||
SHashNode *next;
|
||||
uint32_t hashVal; // the hash value of key
|
||||
uint32_t dataLen; // length of data
|
||||
uint32_t keyLen; // length of the key
|
||||
uint16_t refCount; // reference count
|
||||
int8_t removed; // flag to indicate removed
|
||||
char data[];
|
||||
SHashNode *next;
|
||||
uint32_t hashVal; // the hash value of key
|
||||
uint32_t dataLen; // length of data
|
||||
uint32_t keyLen; // length of the key
|
||||
uint16_t refCount; // reference count
|
||||
int8_t removed; // flag to indicate removed
|
||||
char data[];
|
||||
};
|
||||
|
||||
typedef struct SHashEntry {
|
||||
int32_t num; // number of elements in current entry
|
||||
SRWLatch latch; // entry latch
|
||||
SHashNode *next;
|
||||
int32_t num; // number of elements in current entry
|
||||
SRWLatch latch; // entry latch
|
||||
SHashNode *next;
|
||||
} SHashEntry;
|
||||
|
||||
struct SHashObj {
|
||||
SHashEntry ** hashList;
|
||||
SHashEntry **hashList;
|
||||
size_t capacity; // number of slots
|
||||
int64_t size; // number of elements in hash table
|
||||
_hash_fn_t hashFp; // hash function
|
||||
|
@ -65,7 +65,7 @@ struct SHashObj {
|
|||
SRWLatch lock; // read-write spin lock
|
||||
SHashLockTypeE type; // lock type
|
||||
bool enableUpdate; // enable update
|
||||
SArray * pMemBlock; // memory block allocated for SHashEntry
|
||||
SArray *pMemBlock; // memory block allocated for SHashEntry
|
||||
_hash_before_fn_t callbackFp; // function invoked before return the value to caller
|
||||
};
|
||||
|
||||
|
@ -103,14 +103,14 @@ static FORCE_INLINE void taosHashRUnlock(SHashObj *pHashObj) {
|
|||
taosRUnLockLatch(&pHashObj->lock);
|
||||
}
|
||||
|
||||
static FORCE_INLINE void taosHashEntryWLock(const SHashObj *pHashObj, SHashEntry* pe) {
|
||||
static FORCE_INLINE void taosHashEntryWLock(const SHashObj *pHashObj, SHashEntry *pe) {
|
||||
if (pHashObj->type == HASH_NO_LOCK) {
|
||||
return;
|
||||
}
|
||||
taosWLockLatch(&pe->latch);
|
||||
}
|
||||
|
||||
static FORCE_INLINE void taosHashEntryWUnlock(const SHashObj *pHashObj, SHashEntry* pe) {
|
||||
static FORCE_INLINE void taosHashEntryWUnlock(const SHashObj *pHashObj, SHashEntry *pe) {
|
||||
if (pHashObj->type == HASH_NO_LOCK) {
|
||||
return;
|
||||
}
|
||||
|
@ -118,7 +118,7 @@ static FORCE_INLINE void taosHashEntryWUnlock(const SHashObj *pHashObj, SHashEnt
|
|||
taosWUnLockLatch(&pe->latch);
|
||||
}
|
||||
|
||||
static FORCE_INLINE void taosHashEntryRLock(const SHashObj *pHashObj, SHashEntry* pe) {
|
||||
static FORCE_INLINE void taosHashEntryRLock(const SHashObj *pHashObj, SHashEntry *pe) {
|
||||
if (pHashObj->type == HASH_NO_LOCK) {
|
||||
return;
|
||||
}
|
||||
|
@ -126,7 +126,7 @@ static FORCE_INLINE void taosHashEntryRLock(const SHashObj *pHashObj, SHashEntry
|
|||
taosRLockLatch(&pe->latch);
|
||||
}
|
||||
|
||||
static FORCE_INLINE void taosHashEntryRUnlock(const SHashObj *pHashObj, SHashEntry* pe) {
|
||||
static FORCE_INLINE void taosHashEntryRUnlock(const SHashObj *pHashObj, SHashEntry *pe) {
|
||||
if (pHashObj->type == HASH_NO_LOCK) {
|
||||
return;
|
||||
}
|
||||
|
@ -142,12 +142,11 @@ static FORCE_INLINE int32_t taosHashCapacity(int32_t length) {
|
|||
return i;
|
||||
}
|
||||
|
||||
static FORCE_INLINE SHashNode *
|
||||
doSearchInEntryList(SHashObj *pHashObj, SHashEntry *pe, const void *key, size_t keyLen, uint32_t hashVal) {
|
||||
static FORCE_INLINE SHashNode *doSearchInEntryList(SHashObj *pHashObj, SHashEntry *pe, const void *key, size_t keyLen,
|
||||
uint32_t hashVal) {
|
||||
SHashNode *pNode = pe->next;
|
||||
while (pNode) {
|
||||
if ((pNode->keyLen == keyLen) &&
|
||||
((*(pHashObj->equalFp))(GET_HASH_NODE_KEY(pNode), key, keyLen) == 0) &&
|
||||
if ((pNode->keyLen == keyLen) && ((*(pHashObj->equalFp))(GET_HASH_NODE_KEY(pNode), key, keyLen) == 0) &&
|
||||
pNode->removed == 0) {
|
||||
assert(pNode->hashVal == hashVal);
|
||||
break;
|
||||
|
@ -186,7 +185,8 @@ static SHashNode *doCreateHashNode(const void *key, size_t keyLen, const void *p
|
|||
* @param pNode the old node with requested key
|
||||
* @param pNewNode the new node with requested key
|
||||
*/
|
||||
static FORCE_INLINE void doUpdateHashNode(SHashObj *pHashObj, SHashEntry* pe, SHashNode* prev, SHashNode *pNode, SHashNode *pNewNode) {
|
||||
static FORCE_INLINE void doUpdateHashNode(SHashObj *pHashObj, SHashEntry *pe, SHashNode *prev, SHashNode *pNode,
|
||||
SHashNode *pNewNode) {
|
||||
assert(pNode->keyLen == pNewNode->keyLen);
|
||||
|
||||
atomic_sub_fetch_16(&pNode->refCount, 1);
|
||||
|
@ -227,9 +227,7 @@ static FORCE_INLINE bool taosHashTableEmpty(const SHashObj *pHashObj);
|
|||
* @param pHashObj
|
||||
* @return
|
||||
*/
|
||||
static FORCE_INLINE bool taosHashTableEmpty(const SHashObj *pHashObj) {
|
||||
return taosHashGetSize(pHashObj) == 0;
|
||||
}
|
||||
static FORCE_INLINE bool taosHashTableEmpty(const SHashObj *pHashObj) { return taosHashGetSize(pHashObj) == 0; }
|
||||
|
||||
SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool update, SHashLockTypeE type) {
|
||||
if (fn == NULL) {
|
||||
|
@ -251,7 +249,7 @@ SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool update, SHashLockTyp
|
|||
pHashObj->capacity = taosHashCapacity((int32_t)capacity);
|
||||
|
||||
pHashObj->equalFp = memcmp;
|
||||
pHashObj->hashFp = fn;
|
||||
pHashObj->hashFp = fn;
|
||||
pHashObj->type = type;
|
||||
pHashObj->enableUpdate = update;
|
||||
|
||||
|
@ -305,7 +303,7 @@ int32_t taosHashGetSize(const SHashObj *pHashObj) {
|
|||
if (pHashObj == NULL) {
|
||||
return 0;
|
||||
}
|
||||
return (int32_t)atomic_load_64((int64_t*)&pHashObj->size);
|
||||
return (int32_t)atomic_load_64((int64_t *)&pHashObj->size);
|
||||
}
|
||||
|
||||
int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, const void *data, size_t size) {
|
||||
|
@ -340,10 +338,9 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, const vo
|
|||
}
|
||||
#endif
|
||||
|
||||
SHashNode* prev = NULL;
|
||||
SHashNode *prev = NULL;
|
||||
while (pNode) {
|
||||
if ((pNode->keyLen == keyLen) &&
|
||||
(*(pHashObj->equalFp))(GET_HASH_NODE_KEY(pNode), key, keyLen) == 0 &&
|
||||
if ((pNode->keyLen == keyLen) && (*(pHashObj->equalFp))(GET_HASH_NODE_KEY(pNode), key, keyLen) == 0 &&
|
||||
pNode->removed == 0) {
|
||||
assert(pNode->hashVal == hashVal);
|
||||
break;
|
||||
|
@ -391,27 +388,27 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, const vo
|
|||
}
|
||||
}
|
||||
|
||||
static void* taosHashGetImpl(SHashObj *pHashObj, const void *key, size_t keyLen, void** d, int32_t* size, bool addRef);
|
||||
static void *taosHashGetImpl(SHashObj *pHashObj, const void *key, size_t keyLen, void **d, int32_t *size, bool addRef);
|
||||
|
||||
void *taosHashGet(SHashObj *pHashObj, const void *key, size_t keyLen) {
|
||||
void* p = NULL;
|
||||
void *p = NULL;
|
||||
return taosHashGetImpl(pHashObj, key, keyLen, &p, 0, false);
|
||||
}
|
||||
|
||||
int32_t taosHashGetDup(SHashObj *pHashObj, const void *key, size_t keyLen, void *destBuf) {
|
||||
terrno = 0;
|
||||
/*char* p = */taosHashGetImpl(pHashObj, key, keyLen, &destBuf, 0, false);
|
||||
/*char* p = */ taosHashGetImpl(pHashObj, key, keyLen, &destBuf, 0, false);
|
||||
return terrno;
|
||||
}
|
||||
|
||||
int32_t taosHashGetDup_m(SHashObj *pHashObj, const void *key, size_t keyLen, void **destBuf, int32_t* size) {
|
||||
int32_t taosHashGetDup_m(SHashObj *pHashObj, const void *key, size_t keyLen, void **destBuf, int32_t *size) {
|
||||
terrno = 0;
|
||||
|
||||
/*char* p = */taosHashGetImpl(pHashObj, key, keyLen, destBuf, size, false);
|
||||
/*char* p = */ taosHashGetImpl(pHashObj, key, keyLen, destBuf, size, false);
|
||||
return terrno;
|
||||
}
|
||||
|
||||
void* taosHashGetImpl(SHashObj *pHashObj, const void *key, size_t keyLen, void** d, int32_t* size, bool addRef) {
|
||||
void *taosHashGetImpl(SHashObj *pHashObj, const void *key, size_t keyLen, void **d, int32_t *size, bool addRef) {
|
||||
if (pHashObj == NULL || taosHashTableEmpty(pHashObj) || keyLen == 0 || key == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
@ -449,15 +446,15 @@ void* taosHashGetImpl(SHashObj *pHashObj, const void *key, size_t keyLen, void**
|
|||
|
||||
if (size != NULL) {
|
||||
if (*d == NULL) {
|
||||
*size = pNode->dataLen;
|
||||
*size = pNode->dataLen;
|
||||
*d = taosMemoryCalloc(1, *size);
|
||||
if (*d == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
} else if (*size < pNode->dataLen) {
|
||||
*size = pNode->dataLen;
|
||||
char* tmp = taosMemoryRealloc(*d, *size);
|
||||
*size = pNode->dataLen;
|
||||
char *tmp = taosMemoryRealloc(*d, *size);
|
||||
if (tmp == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
|
@ -508,13 +505,12 @@ int32_t taosHashRemove(SHashObj *pHashObj, const void *key, size_t keyLen) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
int code = -1;
|
||||
int code = -1;
|
||||
SHashNode *pNode = pe->next;
|
||||
SHashNode *prevNode = NULL;
|
||||
|
||||
while (pNode) {
|
||||
if ((pNode->keyLen == keyLen) &&
|
||||
((*(pHashObj->equalFp))(GET_HASH_NODE_KEY(pNode), key, keyLen) == 0) &&
|
||||
if ((pNode->keyLen == keyLen) && ((*(pHashObj->equalFp))(GET_HASH_NODE_KEY(pNode), key, keyLen) == 0) &&
|
||||
pNode->removed == 0) {
|
||||
code = 0; // it is found
|
||||
|
||||
|
@ -598,14 +594,14 @@ void taosHashCleanup(SHashObj *pHashObj) {
|
|||
}
|
||||
|
||||
// for profile only
|
||||
int32_t taosHashGetMaxOverflowLinkLength(const SHashObj *pHashObj){
|
||||
int32_t taosHashGetMaxOverflowLinkLength(const SHashObj *pHashObj) {
|
||||
if (pHashObj == NULL || taosHashTableEmpty(pHashObj)) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t num = 0;
|
||||
|
||||
taosHashRLock((SHashObj*) pHashObj);
|
||||
taosHashRLock((SHashObj *)pHashObj);
|
||||
for (int32_t i = 0; i < pHashObj->size; ++i) {
|
||||
SHashEntry *pEntry = pHashObj->hashList[i];
|
||||
|
||||
|
@ -616,7 +612,7 @@ int32_t taosHashGetMaxOverflowLinkLength(const SHashObj *pHashObj){
|
|||
}
|
||||
}
|
||||
|
||||
taosHashRUnlock((SHashObj*) pHashObj);
|
||||
taosHashRUnlock((SHashObj *)pHashObj);
|
||||
return num;
|
||||
}
|
||||
|
||||
|
@ -627,22 +623,22 @@ void taosHashTableResize(SHashObj *pHashObj) {
|
|||
|
||||
int32_t newCapacity = (int32_t)(pHashObj->capacity << 1u);
|
||||
if (newCapacity > HASH_MAX_CAPACITY) {
|
||||
// uDebug("current capacity:%zu, maximum capacity:%d, no resize applied due to limitation is reached",
|
||||
// pHashObj->capacity, HASH_MAX_CAPACITY);
|
||||
// uDebug("current capacity:%zu, maximum capacity:%d, no resize applied due to limitation is reached",
|
||||
// pHashObj->capacity, HASH_MAX_CAPACITY);
|
||||
return;
|
||||
}
|
||||
|
||||
int64_t st = taosGetTimestampUs();
|
||||
void *pNewEntryList = taosMemoryRealloc(pHashObj->hashList, sizeof(void *) * newCapacity);
|
||||
void *pNewEntryList = taosMemoryRealloc(pHashObj->hashList, sizeof(void *) * newCapacity);
|
||||
if (pNewEntryList == NULL) {
|
||||
// uDebug("cache resize failed due to out of memory, capacity remain:%zu", pHashObj->capacity);
|
||||
// uDebug("cache resize failed due to out of memory, capacity remain:%zu", pHashObj->capacity);
|
||||
return;
|
||||
}
|
||||
|
||||
pHashObj->hashList = pNewEntryList;
|
||||
|
||||
size_t inc = newCapacity - pHashObj->capacity;
|
||||
void * p = taosMemoryCalloc(inc, sizeof(SHashEntry));
|
||||
void *p = taosMemoryCalloc(inc, sizeof(SHashEntry));
|
||||
|
||||
for (int32_t i = 0; i < inc; ++i) {
|
||||
pHashObj->hashList[i + pHashObj->capacity] = (void *)((char *)p + i * sizeof(SHashEntry));
|
||||
|
@ -653,9 +649,9 @@ void taosHashTableResize(SHashObj *pHashObj) {
|
|||
pHashObj->capacity = newCapacity;
|
||||
for (int32_t idx = 0; idx < pHashObj->capacity; ++idx) {
|
||||
SHashEntry *pe = pHashObj->hashList[idx];
|
||||
SHashNode *pNode;
|
||||
SHashNode *pNext;
|
||||
SHashNode *pPrev = NULL;
|
||||
SHashNode *pNode;
|
||||
SHashNode *pNext;
|
||||
SHashNode *pPrev = NULL;
|
||||
|
||||
if (pe->num == 0) {
|
||||
assert(pe->next == NULL);
|
||||
|
@ -688,24 +684,25 @@ void taosHashTableResize(SHashObj *pHashObj) {
|
|||
|
||||
int64_t et = taosGetTimestampUs();
|
||||
|
||||
// uDebug("hash table resize completed, new capacity:%d, load factor:%f, elapsed time:%fms", (int32_t)pHashObj->capacity,
|
||||
// ((double)pHashObj->size) / pHashObj->capacity, (et - st) / 1000.0);
|
||||
// uDebug("hash table resize completed, new capacity:%d, load factor:%f, elapsed time:%fms",
|
||||
// (int32_t)pHashObj->capacity,
|
||||
// ((double)pHashObj->size) / pHashObj->capacity, (et - st) / 1000.0);
|
||||
}
|
||||
|
||||
SHashNode *doCreateHashNode(const void *key, size_t keyLen, const void *pData, size_t dsize, uint32_t hashVal) {
|
||||
SHashNode *pNewNode = taosMemoryMalloc(sizeof(SHashNode) + keyLen + dsize);
|
||||
SHashNode *pNewNode = taosMemoryMalloc(sizeof(SHashNode) + keyLen + dsize + 1);
|
||||
|
||||
if (pNewNode == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
pNewNode->keyLen = (uint32_t)keyLen;
|
||||
pNewNode->keyLen = (uint32_t)keyLen;
|
||||
pNewNode->hashVal = hashVal;
|
||||
pNewNode->dataLen = (uint32_t)dsize;
|
||||
pNewNode->refCount= 1;
|
||||
pNewNode->refCount = 1;
|
||||
pNewNode->removed = 0;
|
||||
pNewNode->next = NULL;
|
||||
pNewNode->next = NULL;
|
||||
|
||||
memcpy(GET_HASH_NODE_DATA(pNewNode), pData, dsize);
|
||||
memcpy(GET_HASH_NODE_KEY(pNewNode), key, keyLen);
|
||||
|
@ -727,11 +724,12 @@ size_t taosHashGetMemSize(const SHashObj *pHashObj) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
return (pHashObj->capacity * (sizeof(SHashEntry) + sizeof(void*))) + sizeof(SHashNode) * taosHashGetSize(pHashObj) + sizeof(SHashObj);
|
||||
return (pHashObj->capacity * (sizeof(SHashEntry) + sizeof(void *))) + sizeof(SHashNode) * taosHashGetSize(pHashObj) +
|
||||
sizeof(SHashObj);
|
||||
}
|
||||
|
||||
void *taosHashGetKey(void *data, size_t* keyLen) {
|
||||
SHashNode * node = GET_HASH_PNODE(data);
|
||||
void *taosHashGetKey(void *data, size_t *keyLen) {
|
||||
SHashNode *node = GET_HASH_PNODE(data);
|
||||
if (keyLen != NULL) {
|
||||
*keyLen = node->keyLen;
|
||||
}
|
||||
|
@ -751,8 +749,7 @@ static void *taosHashReleaseNode(SHashObj *pHashObj, void *p, int *slot) {
|
|||
|
||||
SHashNode *pNode = pe->next;
|
||||
while (pNode) {
|
||||
if (pNode == pOld)
|
||||
break;
|
||||
if (pNode == pOld) break;
|
||||
|
||||
prevNode = pNode;
|
||||
pNode = pNode->next;
|
||||
|
@ -766,7 +763,7 @@ static void *taosHashReleaseNode(SHashObj *pHashObj, void *p, int *slot) {
|
|||
}
|
||||
|
||||
atomic_sub_fetch_16(&pOld->refCount, 1);
|
||||
if (pOld->refCount <=0) {
|
||||
if (pOld->refCount <= 0) {
|
||||
if (prevNode) {
|
||||
prevNode->next = pOld->next;
|
||||
} else {
|
||||
|
@ -778,7 +775,7 @@ static void *taosHashReleaseNode(SHashObj *pHashObj, void *p, int *slot) {
|
|||
FREE_HASH_NODE(pHashObj->freeFp, pOld);
|
||||
}
|
||||
} else {
|
||||
// uError("pNode:%p data:%p is not there!!!", pNode, p);
|
||||
// uError("pNode:%p data:%p is not there!!!", pNode, p);
|
||||
}
|
||||
|
||||
return pNode;
|
||||
|
@ -787,7 +784,7 @@ static void *taosHashReleaseNode(SHashObj *pHashObj, void *p, int *slot) {
|
|||
void *taosHashIterate(SHashObj *pHashObj, void *p) {
|
||||
if (pHashObj == NULL) return NULL;
|
||||
|
||||
int slot = 0;
|
||||
int slot = 0;
|
||||
char *data = NULL;
|
||||
|
||||
// only add the read lock to disable the resize process
|
||||
|
@ -865,9 +862,9 @@ void taosHashCancelIterate(SHashObj *pHashObj, void *p) {
|
|||
taosHashRUnlock(pHashObj);
|
||||
}
|
||||
|
||||
//TODO remove it
|
||||
// TODO remove it
|
||||
void *taosHashAcquire(SHashObj *pHashObj, const void *key, size_t keyLen) {
|
||||
void* p = NULL;
|
||||
void *p = NULL;
|
||||
return taosHashGetImpl(pHashObj, key, keyLen, &p, 0, true);
|
||||
}
|
||||
|
||||
|
|
|
@ -1380,8 +1380,8 @@ class TDTestCase:
|
|||
self.tmqCase3(cfgPath, buildPath)
|
||||
self.tmqCase4(cfgPath, buildPath)
|
||||
self.tmqCase5(cfgPath, buildPath)
|
||||
self.tmqCase6(cfgPath, buildPath)
|
||||
self.tmqCase7(cfgPath, buildPath)
|
||||
#self.tmqCase6(cfgPath, buildPath)
|
||||
#self.tmqCase7(cfgPath, buildPath)
|
||||
#self.tmqCase8(cfgPath, buildPath)
|
||||
#self.tmqCase9(cfgPath, buildPath)
|
||||
#self.tmqCase10(cfgPath, buildPath)
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -65,3 +65,4 @@ python3 ./test.py -f 7-tmq/basic5.py
|
|||
python3 ./test.py -f 7-tmq/subscribeDb.py
|
||||
python3 ./test.py -f 7-tmq/subscribeDb1.py
|
||||
python3 ./test.py -f 7-tmq/subscribeStb.py
|
||||
python3 ./test.py -f 7-tmq/subscribeStb1.py
|
||||
|
|
|
@ -261,7 +261,7 @@ void *threadFunc(void *param) {
|
|||
int64_t startTs = taosGetTimestampUs();
|
||||
TAOS_RES *pRes = taos_query(con, qstr);
|
||||
code = taos_errno(pRes);
|
||||
if ((code != 0) && (code != TSDB_CODE_RPC_AUTH_REQUIRED)) {
|
||||
if (code != 0) {
|
||||
pError("failed to insert %s_t%" PRId64 ", reason:%s", stbName, t, tstrerror(code));
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
|
|
@ -233,7 +233,6 @@ int64_t getDirectorySize(char* dir) {
|
|||
int queryDB(TAOS* taos, char* command) {
|
||||
TAOS_RES* pRes = taos_query(taos, command);
|
||||
int code = taos_errno(pRes);
|
||||
// if ((code != 0) && (code != TSDB_CODE_RPC_AUTH_REQUIRED)) {
|
||||
if (code != 0) {
|
||||
pError("failed to reason:%s, sql: %s", tstrerror(code), command);
|
||||
taos_free_result(pRes);
|
||||
|
|
|
@ -258,7 +258,6 @@ static int32_t msg_process(TAOS_RES* msg, int64_t msgIndex, int32_t threadLable)
|
|||
int queryDB(TAOS* taos, char* command) {
|
||||
TAOS_RES* pRes = taos_query(taos, command);
|
||||
int code = taos_errno(pRes);
|
||||
// if ((code != 0) && (code != TSDB_CODE_RPC_AUTH_REQUIRED)) {
|
||||
if (code != 0) {
|
||||
pError("failed to reason:%s, sql: %s", tstrerror(code), command);
|
||||
taos_free_result(pRes);
|
||||
|
|
Loading…
Reference in New Issue