Merge branch '3.0' of github.com:taosdata/TDengine into feature/udf
This commit is contained in:
commit
8e0eda6c0c
|
@ -67,7 +67,13 @@ ELSE ()
|
||||||
IF (${CMAKE_SYSTEM_PROCESSOR} MATCHES "arm64" OR ${CMAKE_SYSTEM_PROCESSOR} MATCHES "aarch64")
|
IF (${CMAKE_SYSTEM_PROCESSOR} MATCHES "arm64" OR ${CMAKE_SYSTEM_PROCESSOR} MATCHES "aarch64")
|
||||||
ADD_DEFINITIONS("-D_TD_ARM_")
|
ADD_DEFINITIONS("-D_TD_ARM_")
|
||||||
ELSE ()
|
ELSE ()
|
||||||
ADD_DEFINITIONS("-msse4.2 -mfma")
|
ADD_DEFINITIONS("-msse4.2")
|
||||||
|
IF("${FMA_SUPPORT}" MATCHES "true")
|
||||||
|
MESSAGE(STATUS "turn fma function support on")
|
||||||
|
ADD_DEFINITIONS("-mfma")
|
||||||
|
ELSE ()
|
||||||
|
MESSAGE(STATUS "turn fma function support off")
|
||||||
|
ENDIF()
|
||||||
ENDIF ()
|
ENDIF ()
|
||||||
|
|
||||||
ENDIF ()
|
ENDIF ()
|
||||||
|
|
|
@ -81,9 +81,10 @@ int32_t create_stream() {
|
||||||
/*const char* sql = "select min(k), max(k), sum(k) as sum_of_k from st1";*/
|
/*const char* sql = "select min(k), max(k), sum(k) as sum_of_k from st1";*/
|
||||||
/*const char* sql = "select sum(k) from tu1 interval(10m)";*/
|
/*const char* sql = "select sum(k) from tu1 interval(10m)";*/
|
||||||
/*pRes = tmq_create_stream(pConn, "stream1", "out1", sql);*/
|
/*pRes = tmq_create_stream(pConn, "stream1", "out1", sql);*/
|
||||||
pRes = taos_query(
|
pRes =
|
||||||
pConn,
|
taos_query(pConn,
|
||||||
"create stream stream1 trigger window_close as select min(k), max(k), sum(k) as sum_of_k from tu1 interval(10m)");
|
"create stream stream1 trigger window_close as select _wstartts, min(k), max(k), sum(k) as sum_of_k "
|
||||||
|
"from tu1 interval(10m)");
|
||||||
if (taos_errno(pRes) != 0) {
|
if (taos_errno(pRes) != 0) {
|
||||||
printf("failed to create stream stream1, reason:%s\n", taos_errstr(pRes));
|
printf("failed to create stream stream1, reason:%s\n", taos_errstr(pRes));
|
||||||
return -1;
|
return -1;
|
||||||
|
|
|
@ -54,6 +54,7 @@ SEpSet getEpSet_s(SCorEpSet* pEpSet);
|
||||||
BMCharPos(bm_, r_) |= (1u << (7u - BitPos(r_))); \
|
BMCharPos(bm_, r_) |= (1u << (7u - BitPos(r_))); \
|
||||||
} while (0)
|
} while (0)
|
||||||
|
|
||||||
|
<<<<<<< HEAD
|
||||||
#define colDataSetNotNull_f(bm_, r_) \
|
#define colDataSetNotNull_f(bm_, r_) \
|
||||||
do { \
|
do { \
|
||||||
BMCharPos(bm_, r_) &= ~(1u << (7u - BitPos(r_))); \
|
BMCharPos(bm_, r_) &= ~(1u << (7u - BitPos(r_))); \
|
||||||
|
@ -68,16 +69,15 @@ SEpSet getEpSet_s(SCorEpSet* pEpSet);
|
||||||
|
|
||||||
#define colDataGetNumData(p1_, r_) ((p1_)->pData + ((r_) * (p1_)->info.bytes))
|
#define colDataGetNumData(p1_, r_) ((p1_)->pData + ((r_) * (p1_)->info.bytes))
|
||||||
// SColumnInfoData, rowNumber
|
// SColumnInfoData, rowNumber
|
||||||
#define colDataGetData(p1_, r_) \
|
#define colDataGetData(p1_, r_) \
|
||||||
((IS_VAR_DATA_TYPE((p1_)->info.type)) ? colDataGetVarData(p1_, r_) \
|
((IS_VAR_DATA_TYPE((p1_)->info.type)) ? colDataGetVarData(p1_, r_) : colDataGetNumData(p1_, r_))
|
||||||
: colDataGetNumData(p1_, r_))
|
|
||||||
|
|
||||||
static FORCE_INLINE bool colDataIsNull_s(const SColumnInfoData* pColumnInfoData, uint32_t row) {
|
static FORCE_INLINE bool colDataIsNull_s(const SColumnInfoData* pColumnInfoData, uint32_t row) {
|
||||||
if (pColumnInfoData->info.type == TSDB_DATA_TYPE_JSON){
|
if (pColumnInfoData->info.type == TSDB_DATA_TYPE_JSON) {
|
||||||
if(colDataIsNull_var(pColumnInfoData, row)){
|
if (colDataIsNull_var(pColumnInfoData, row)) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
char *data = colDataGetVarData(pColumnInfoData, row);
|
char* data = colDataGetVarData(pColumnInfoData, row);
|
||||||
return (*data == TSDB_DATA_TYPE_NULL);
|
return (*data == TSDB_DATA_TYPE_NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -85,7 +85,7 @@ static FORCE_INLINE bool colDataIsNull_s(const SColumnInfoData* pColumnInfoData,
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pColumnInfoData->info.type== TSDB_DATA_TYPE_VARCHAR || pColumnInfoData->info.type == TSDB_DATA_TYPE_NCHAR) {
|
if (pColumnInfoData->info.type == TSDB_DATA_TYPE_VARCHAR || pColumnInfoData->info.type == TSDB_DATA_TYPE_NCHAR) {
|
||||||
return colDataIsNull_var(pColumnInfoData, row);
|
return colDataIsNull_var(pColumnInfoData, row);
|
||||||
} else {
|
} else {
|
||||||
if (pColumnInfoData->nullbitmap == NULL) {
|
if (pColumnInfoData->nullbitmap == NULL) {
|
||||||
|
@ -137,7 +137,7 @@ static FORCE_INLINE void colDataAppendNULL(SColumnInfoData* pColumnInfoData, uin
|
||||||
static FORCE_INLINE void colDataAppendNNULL(SColumnInfoData* pColumnInfoData, uint32_t start, size_t nRows) {
|
static FORCE_INLINE void colDataAppendNNULL(SColumnInfoData* pColumnInfoData, uint32_t start, size_t nRows) {
|
||||||
if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
|
if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
|
||||||
for (int32_t i = start; i < start + nRows; ++i) {
|
for (int32_t i = start; i < start + nRows; ++i) {
|
||||||
colDataSetNull_var(pColumnInfoData,i); // it is a null value of VAR type.
|
colDataSetNull_var(pColumnInfoData, i); // it is a null value of VAR type.
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
for (int32_t i = start; i < start + nRows; ++i) {
|
for (int32_t i = start; i < start + nRows; ++i) {
|
||||||
|
@ -233,6 +233,8 @@ void blockDebugShowData(const SArray* dataBlocks);
|
||||||
int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema* pTSchema, int32_t vgId,
|
int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema* pTSchema, int32_t vgId,
|
||||||
tb_uid_t uid, tb_uid_t suid);
|
tb_uid_t uid, tb_uid_t suid);
|
||||||
|
|
||||||
|
SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pSchema);
|
||||||
|
|
||||||
static FORCE_INLINE int32_t blockGetEncodeSize(const SSDataBlock* pBlock) {
|
static FORCE_INLINE int32_t blockGetEncodeSize(const SSDataBlock* pBlock) {
|
||||||
return blockDataGetSerialMetaSize(pBlock) + blockDataGetSize(pBlock);
|
return blockDataGetSerialMetaSize(pBlock) + blockDataGetSize(pBlock);
|
||||||
}
|
}
|
||||||
|
@ -246,10 +248,10 @@ static FORCE_INLINE int32_t blockCompressColData(SColumnInfoData* pColRes, int32
|
||||||
|
|
||||||
static FORCE_INLINE void blockCompressEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_t numOfCols,
|
static FORCE_INLINE void blockCompressEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_t numOfCols,
|
||||||
int8_t needCompress) {
|
int8_t needCompress) {
|
||||||
int32_t* actualLen = (int32_t*) data;
|
int32_t* actualLen = (int32_t*)data;
|
||||||
data += sizeof(int32_t);
|
data += sizeof(int32_t);
|
||||||
|
|
||||||
uint64_t* groupId = (uint64_t*) data;
|
uint64_t* groupId = (uint64_t*)data;
|
||||||
data += sizeof(uint64_t);
|
data += sizeof(uint64_t);
|
||||||
|
|
||||||
int32_t* colSizes = (int32_t*)data;
|
int32_t* colSizes = (int32_t*)data;
|
||||||
|
|
|
@ -61,11 +61,11 @@ extern "C" {
|
||||||
// ----------------- TSDB COLUMN DEFINITION
|
// ----------------- TSDB COLUMN DEFINITION
|
||||||
#pragma pack(push, 1)
|
#pragma pack(push, 1)
|
||||||
typedef struct {
|
typedef struct {
|
||||||
col_id_t colId; // column ID(start from PRIMARYKEY_TIMESTAMP_COL_ID(1))
|
col_id_t colId; // column ID(start from PRIMARYKEY_TIMESTAMP_COL_ID(1))
|
||||||
int32_t type : 8; // column type
|
int8_t type; // column type
|
||||||
int32_t bytes : 24; // column bytes (0~16M)
|
int8_t flags; // flags: 0 no index, 1 SCHEMA_SMA_ON, 2 SCHEMA_IDX_ON
|
||||||
int32_t flags : 8; // flags: 0 no index, 1 SCHEMA_SMA_ON, 2 SCHEMA_IDX_ON
|
int32_t bytes; // column bytes (0~16M)
|
||||||
int32_t offset : 24; // point offset in STpRow after the header part.
|
int32_t offset; // point offset in STpRow after the header part.
|
||||||
} STColumn;
|
} STColumn;
|
||||||
#pragma pack(pop)
|
#pragma pack(pop)
|
||||||
|
|
||||||
|
|
|
@ -245,6 +245,8 @@ int32_t tInitSubmitMsgIter(const SSubmitReq* pMsg, SSubmitMsgIter* pIter);
|
||||||
int32_t tGetSubmitMsgNext(SSubmitMsgIter* pIter, SSubmitBlk** pPBlock);
|
int32_t tGetSubmitMsgNext(SSubmitMsgIter* pIter, SSubmitBlk** pPBlock);
|
||||||
int32_t tInitSubmitBlkIter(SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock, SSubmitBlkIter* pIter);
|
int32_t tInitSubmitBlkIter(SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock, SSubmitBlkIter* pIter);
|
||||||
STSRow* tGetSubmitBlkNext(SSubmitBlkIter* pIter);
|
STSRow* tGetSubmitBlkNext(SSubmitBlkIter* pIter);
|
||||||
|
// for debug
|
||||||
|
int32_t tPrintFixedSchemaSubmitReq(const SSubmitReq* pReq, STSchema* pSchema);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t index; // index of failed block in submit blocks
|
int32_t index; // index of failed block in submit blocks
|
||||||
|
@ -2130,7 +2132,7 @@ static FORCE_INLINE int32_t tDecodeSSchemaWrapper(SCoder* pDecoder, SSchemaWrapp
|
||||||
if (tDecodeI32v(pDecoder, &pSW->nCols) < 0) return -1;
|
if (tDecodeI32v(pDecoder, &pSW->nCols) < 0) return -1;
|
||||||
if (tDecodeI32v(pDecoder, &pSW->sver) < 0) return -1;
|
if (tDecodeI32v(pDecoder, &pSW->sver) < 0) return -1;
|
||||||
|
|
||||||
pSW->pSchema = (SSchema*)tCoderMalloc(pDecoder, sizeof(SSchema) * pSW->nCols);
|
pSW->pSchema = (SSchema*)taosMemoryCalloc(pSW->nCols, sizeof(SSchema));
|
||||||
if (pSW->pSchema == NULL) return -1;
|
if (pSW->pSchema == NULL) return -1;
|
||||||
for (int32_t i = 0; i < pSW->nCols; i++) {
|
for (int32_t i = 0; i < pSW->nCols; i++) {
|
||||||
if (tDecodeSSchema(pDecoder, &pSW->pSchema[i]) < 0) return -1;
|
if (tDecodeSSchema(pDecoder, &pSW->pSchema[i]) < 0) return -1;
|
||||||
|
|
|
@ -622,7 +622,6 @@ static FORCE_INLINE int32_t tdSRowSetTpInfo(SRowBuilder *pBuilder, int32_t nCols
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief To judge row type: STpRow/SKvRow
|
* @brief To judge row type: STpRow/SKvRow
|
||||||
*
|
*
|
||||||
|
@ -758,7 +757,6 @@ static int32_t tdSRowGetBuf(SRowBuilder *pBuilder, void *pBuf) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief 由调用方管理存储空间的分配及释放,一次输入多个参数
|
* @brief 由调用方管理存储空间的分配及释放,一次输入多个参数
|
||||||
*
|
*
|
||||||
|
@ -1250,16 +1248,16 @@ static FORCE_INLINE int32_t tdGetColDataOfRow(SCellVal *pVal, SDataCol *pCol, in
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief
|
* @brief
|
||||||
*
|
*
|
||||||
* @param pRow
|
* @param pRow
|
||||||
* @param colId
|
* @param colId
|
||||||
* @param colType
|
* @param colType
|
||||||
* @param flen
|
* @param flen
|
||||||
* @param offset
|
* @param offset
|
||||||
* @param colIdx start from 0
|
* @param colIdx start from 0
|
||||||
* @param pVal
|
* @param pVal
|
||||||
* @return FORCE_INLINE
|
* @return FORCE_INLINE
|
||||||
*/
|
*/
|
||||||
static FORCE_INLINE bool tdSTpRowGetVal(STSRow *pRow, col_id_t colId, col_type_t colType, int32_t flen, uint32_t offset,
|
static FORCE_INLINE bool tdSTpRowGetVal(STSRow *pRow, col_id_t colId, col_type_t colType, int32_t flen, uint32_t offset,
|
||||||
col_id_t colIdx, SCellVal *pVal) {
|
col_id_t colIdx, SCellVal *pVal) {
|
||||||
|
@ -1273,14 +1271,14 @@ static FORCE_INLINE bool tdSTpRowGetVal(STSRow *pRow, col_id_t colId, col_type_t
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief
|
* @brief
|
||||||
*
|
*
|
||||||
* @param pRow
|
* @param pRow
|
||||||
* @param colId
|
* @param colId
|
||||||
* @param offset
|
* @param offset
|
||||||
* @param colIdx start from 0
|
* @param colIdx start from 0
|
||||||
* @param pVal
|
* @param pVal
|
||||||
* @return FORCE_INLINE
|
* @return FORCE_INLINE
|
||||||
*/
|
*/
|
||||||
static FORCE_INLINE bool tdSKvRowGetVal(STSRow *pRow, col_id_t colId, uint32_t offset, col_id_t colIdx,
|
static FORCE_INLINE bool tdSKvRowGetVal(STSRow *pRow, col_id_t colId, uint32_t offset, col_id_t colIdx,
|
||||||
SCellVal *pVal) {
|
SCellVal *pVal) {
|
||||||
|
@ -1397,14 +1395,14 @@ static void tdSCellValPrint(SCellVal *pVal, int8_t colType) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void tdSRowPrint(STSRow *row, STSchema *pSchema, const char* tag) {
|
static void tdSRowPrint(STSRow *row, STSchema *pSchema, const char *tag) {
|
||||||
STSRowIter iter = {0};
|
STSRowIter iter = {0};
|
||||||
tdSTSRowIterInit(&iter, pSchema);
|
tdSTSRowIterInit(&iter, pSchema);
|
||||||
tdSTSRowIterReset(&iter, row);
|
tdSTSRowIterReset(&iter, row);
|
||||||
printf("%s >>>", tag);
|
printf("%s >>>", tag);
|
||||||
for (int i = 0; i < pSchema->numOfCols; ++i) {
|
for (int i = 0; i < pSchema->numOfCols; ++i) {
|
||||||
STColumn *stCol = pSchema->columns + i;
|
STColumn *stCol = pSchema->columns + i;
|
||||||
SCellVal sVal = { 255, NULL};
|
SCellVal sVal = {255, NULL};
|
||||||
if (!tdSTSRowIterNext(&iter, stCol->colId, stCol->type, &sVal)) {
|
if (!tdSTSRowIterNext(&iter, stCol->colId, stCol->type, &sVal)) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,8 +20,8 @@
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#include "querynodes.h"
|
|
||||||
#include "query.h"
|
#include "query.h"
|
||||||
|
#include "querynodes.h"
|
||||||
|
|
||||||
typedef struct SStmtCallback {
|
typedef struct SStmtCallback {
|
||||||
TAOS_STMT* pStmt;
|
TAOS_STMT* pStmt;
|
||||||
|
@ -34,24 +34,26 @@ typedef struct SStmtCallback {
|
||||||
typedef struct SParseContext {
|
typedef struct SParseContext {
|
||||||
uint64_t requestId;
|
uint64_t requestId;
|
||||||
int32_t acctId;
|
int32_t acctId;
|
||||||
const char *db;
|
const char* db;
|
||||||
bool topicQuery;
|
bool topicQuery;
|
||||||
void *pTransporter;
|
void* pTransporter;
|
||||||
SEpSet mgmtEpSet;
|
SEpSet mgmtEpSet;
|
||||||
const char *pSql; // sql string
|
const char* pSql; // sql string
|
||||||
size_t sqlLen; // length of the sql string
|
size_t sqlLen; // length of the sql string
|
||||||
char *pMsg; // extended error message if exists to help identifying the problem in sql statement.
|
char* pMsg; // extended error message if exists to help identifying the problem in sql statement.
|
||||||
int32_t msgLen; // max length of the msg
|
int32_t msgLen; // max length of the msg
|
||||||
struct SCatalog *pCatalog;
|
struct SCatalog* pCatalog;
|
||||||
SStmtCallback *pStmtCb;
|
SStmtCallback* pStmtCb;
|
||||||
|
const char* pUser;
|
||||||
|
bool isSuperUser;
|
||||||
} SParseContext;
|
} SParseContext;
|
||||||
|
|
||||||
typedef struct SCmdMsgInfo {
|
typedef struct SCmdMsgInfo {
|
||||||
int16_t msgType;
|
int16_t msgType;
|
||||||
SEpSet epSet;
|
SEpSet epSet;
|
||||||
void* pMsg;
|
void* pMsg;
|
||||||
int32_t msgLen;
|
int32_t msgLen;
|
||||||
void* pExtension; // todo remove it soon
|
void* pExtension; // todo remove it soon
|
||||||
} SCmdMsgInfo;
|
} SCmdMsgInfo;
|
||||||
|
|
||||||
typedef enum EQueryExecMode {
|
typedef enum EQueryExecMode {
|
||||||
|
@ -63,21 +65,21 @@ typedef enum EQueryExecMode {
|
||||||
|
|
||||||
typedef struct SQuery {
|
typedef struct SQuery {
|
||||||
EQueryExecMode execMode;
|
EQueryExecMode execMode;
|
||||||
bool haveResultSet;
|
bool haveResultSet;
|
||||||
SNode* pRoot;
|
SNode* pRoot;
|
||||||
int32_t numOfResCols;
|
int32_t numOfResCols;
|
||||||
SSchema* pResSchema;
|
SSchema* pResSchema;
|
||||||
int8_t precision;
|
int8_t precision;
|
||||||
SCmdMsgInfo* pCmdMsg;
|
SCmdMsgInfo* pCmdMsg;
|
||||||
int32_t msgType;
|
int32_t msgType;
|
||||||
SArray* pDbList;
|
SArray* pDbList;
|
||||||
SArray* pTableList;
|
SArray* pTableList;
|
||||||
bool showRewrite;
|
bool showRewrite;
|
||||||
int32_t placeholderNum;
|
int32_t placeholderNum;
|
||||||
} SQuery;
|
} SQuery;
|
||||||
|
|
||||||
int32_t qParseQuerySql(SParseContext* pCxt, SQuery** pQuery);
|
int32_t qParseQuerySql(SParseContext* pCxt, SQuery** pQuery);
|
||||||
bool isInsertSql(const char* pStr, size_t length);
|
bool isInsertSql(const char* pStr, size_t length);
|
||||||
|
|
||||||
void qDestroyQuery(SQuery* pQueryNode);
|
void qDestroyQuery(SQuery* pQueryNode);
|
||||||
|
|
||||||
|
@ -89,14 +91,16 @@ int32_t qCloneStmtDataBlock(void** pDst, void* pSrc);
|
||||||
void qFreeStmtDataBlock(void* pDataBlock);
|
void qFreeStmtDataBlock(void* pDataBlock);
|
||||||
int32_t qRebuildStmtDataBlock(void** pDst, void* pSrc);
|
int32_t qRebuildStmtDataBlock(void** pDst, void* pSrc);
|
||||||
void qDestroyStmtDataBlock(void* pBlock);
|
void qDestroyStmtDataBlock(void* pBlock);
|
||||||
int32_t qBindStmtColsValue(void *pBlock, TAOS_MULTI_BIND *bind, char *msgBuf, int32_t msgBufLen);
|
int32_t qBindStmtColsValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen);
|
||||||
int32_t qBindStmtSingleColValue(void *pBlock, TAOS_MULTI_BIND *bind, char *msgBuf, int32_t msgBufLen, int32_t colIdx, int32_t rowNum);
|
int32_t qBindStmtSingleColValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen, int32_t colIdx,
|
||||||
int32_t qBuildStmtColFields(void *pDataBlock, int32_t *fieldNum, TAOS_FIELD** fields);
|
int32_t rowNum);
|
||||||
int32_t qBuildStmtTagFields(void *pBlock, void *boundTags, int32_t *fieldNum, TAOS_FIELD** fields);
|
int32_t qBuildStmtColFields(void* pDataBlock, int32_t* fieldNum, TAOS_FIELD** fields);
|
||||||
int32_t qBindStmtTagsValue(void *pBlock, void *boundTags, int64_t suid, SName *pName, TAOS_MULTI_BIND *bind, char *msgBuf, int32_t msgBufLen);
|
int32_t qBuildStmtTagFields(void* pBlock, void* boundTags, int32_t* fieldNum, TAOS_FIELD** fields);
|
||||||
void destroyBoundColumnInfo(void* pBoundInfo);
|
int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, SName* pName, TAOS_MULTI_BIND* bind,
|
||||||
int32_t qCreateSName(SName* pName, const char* pTableName, int32_t acctId, char* dbName, char *msgBuf, int32_t msgBufLen);
|
char* msgBuf, int32_t msgBufLen);
|
||||||
|
void destroyBoundColumnInfo(void* pBoundInfo);
|
||||||
|
int32_t qCreateSName(SName* pName, const char* pTableName, int32_t acctId, char* dbName, char* msgBuf,
|
||||||
|
int32_t msgBufLen);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -70,8 +70,10 @@ typedef struct {
|
||||||
} STaskDispatcherShuffle;
|
} STaskDispatcherShuffle;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int8_t reserved;
|
int8_t reserved;
|
||||||
|
SSchemaWrapper* pSchemaWrapper;
|
||||||
// not applicable to encoder and decoder
|
// not applicable to encoder and decoder
|
||||||
|
STSchema* pTSchema;
|
||||||
SHashObj* pHash; // groupId to tbuid
|
SHashObj* pHash; // groupId to tbuid
|
||||||
} STaskSinkTb;
|
} STaskSinkTb;
|
||||||
|
|
||||||
|
|
|
@ -649,6 +649,19 @@ int stmtGetParamNum(TAOS_STMT *stmt, int *nums) {
|
||||||
|
|
||||||
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_FETCH_FIELDS));
|
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_FETCH_FIELDS));
|
||||||
|
|
||||||
|
if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 && STMT_TYPE_MULTI_INSERT != pStmt->sql.type) {
|
||||||
|
pStmt->bInfo.needParse = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pStmt->exec.pRequest && STMT_TYPE_QUERY == pStmt->sql.type && pStmt->sql.runTimes) {
|
||||||
|
taos_free_result(pStmt->exec.pRequest);
|
||||||
|
pStmt->exec.pRequest = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (NULL == pStmt->exec.pRequest) {
|
||||||
|
STMT_ERR_RET(buildRequest(pStmt->taos, pStmt->sql.sqlStr, pStmt->sql.sqlLen, &pStmt->exec.pRequest));
|
||||||
|
}
|
||||||
|
|
||||||
if (pStmt->bInfo.needParse) {
|
if (pStmt->bInfo.needParse) {
|
||||||
STMT_ERR_RET(stmtParseSql(pStmt));
|
STMT_ERR_RET(stmtParseSql(pStmt));
|
||||||
}
|
}
|
||||||
|
@ -658,8 +671,11 @@ int stmtGetParamNum(TAOS_STMT *stmt, int *nums) {
|
||||||
STMT_ERR_RET(getQueryPlan(pStmt->exec.pRequest, pStmt->sql.pQuery, &pStmt->sql.nodeList));
|
STMT_ERR_RET(getQueryPlan(pStmt->exec.pRequest, pStmt->sql.pQuery, &pStmt->sql.nodeList));
|
||||||
pStmt->sql.pQueryPlan = pStmt->exec.pRequest->body.pDag;
|
pStmt->sql.pQueryPlan = pStmt->exec.pRequest->body.pDag;
|
||||||
pStmt->exec.pRequest->body.pDag = NULL;
|
pStmt->exec.pRequest->body.pDag = NULL;
|
||||||
|
STMT_ERR_RET(stmtBackupQueryFields(pStmt));
|
||||||
|
} else {
|
||||||
|
STMT_ERR_RET(stmtRestoreQueryFields(pStmt));
|
||||||
}
|
}
|
||||||
|
|
||||||
*nums = taosArrayGetSize(pStmt->sql.pQueryPlan->pPlaceholderValues);
|
*nums = taosArrayGetSize(pStmt->sql.pQueryPlan->pPlaceholderValues);
|
||||||
} else {
|
} else {
|
||||||
STMT_ERR_RET(stmtFetchColFields(stmt, nums, NULL));
|
STMT_ERR_RET(stmtFetchColFields(stmt, nums, NULL));
|
||||||
|
|
|
@ -1594,3 +1594,68 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema) {
|
||||||
|
SSubmitReq* ret = NULL;
|
||||||
|
|
||||||
|
// cal size
|
||||||
|
int32_t cap = sizeof(SSubmitReq);
|
||||||
|
int32_t sz = taosArrayGetSize(pBlocks);
|
||||||
|
for (int32_t i = 0; i < sz; i++) {
|
||||||
|
SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
|
||||||
|
int32_t rows = pDataBlock->info.rows;
|
||||||
|
// TODO min
|
||||||
|
int32_t rowSize = pDataBlock->info.rowSize;
|
||||||
|
int32_t maxLen = TD_ROW_MAX_BYTES_FROM_SCHEMA(pTSchema);
|
||||||
|
cap += sizeof(SSubmitBlk) + rows * maxLen;
|
||||||
|
}
|
||||||
|
|
||||||
|
// assign data
|
||||||
|
ret = taosMemoryCalloc(1, cap);
|
||||||
|
ret->version = htonl(1);
|
||||||
|
ret->length = htonl(cap - sizeof(SSubmitReq));
|
||||||
|
ret->numOfBlocks = htonl(sz);
|
||||||
|
|
||||||
|
void* submitBlk = POINTER_SHIFT(ret, sizeof(SSubmitReq));
|
||||||
|
for (int32_t i = 0; i < sz; i++) {
|
||||||
|
SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
|
||||||
|
|
||||||
|
SSubmitBlk* blkHead = submitBlk;
|
||||||
|
blkHead->numOfRows = htons(pDataBlock->info.rows);
|
||||||
|
blkHead->schemaLen = 0;
|
||||||
|
blkHead->sversion = htonl(pTSchema->version);
|
||||||
|
// TODO
|
||||||
|
blkHead->suid = 0;
|
||||||
|
blkHead->uid = htobe64(pDataBlock->info.uid);
|
||||||
|
|
||||||
|
int32_t rows = pDataBlock->info.rows;
|
||||||
|
int32_t maxLen = TD_ROW_MAX_BYTES_FROM_SCHEMA(pTSchema);
|
||||||
|
/*blkHead->dataLen = htonl(rows * maxLen);*/
|
||||||
|
blkHead->dataLen = 0;
|
||||||
|
|
||||||
|
void* blockData = POINTER_SHIFT(submitBlk, sizeof(SSubmitBlk));
|
||||||
|
STSRow* rowData = blockData;
|
||||||
|
|
||||||
|
for (int32_t j = 0; j < pDataBlock->info.rows; j++) {
|
||||||
|
SRowBuilder rb = {0};
|
||||||
|
tdSRowInit(&rb, pTSchema->version);
|
||||||
|
tdSRowSetTpInfo(&rb, pTSchema->numOfCols, pTSchema->flen);
|
||||||
|
tdSRowResetBuf(&rb, rowData);
|
||||||
|
|
||||||
|
for (int32_t k = 0; k < pTSchema->numOfCols; k++) {
|
||||||
|
const STColumn* pColumn = &pTSchema->columns[k];
|
||||||
|
SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, k);
|
||||||
|
void* data = colDataGetData(pColData, j);
|
||||||
|
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, data, true, pColumn->offset, k);
|
||||||
|
}
|
||||||
|
int32_t rowLen = TD_ROW_LEN(rowData);
|
||||||
|
rowData = POINTER_SHIFT(rowData, rowLen);
|
||||||
|
blkHead->dataLen += rowLen;
|
||||||
|
}
|
||||||
|
int32_t len = blkHead->dataLen;
|
||||||
|
blkHead->dataLen = htonl(len);
|
||||||
|
blkHead = POINTER_SHIFT(blkHead, len);
|
||||||
|
}
|
||||||
|
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
|
@ -16,6 +16,7 @@
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
#include "tdataformat.h"
|
#include "tdataformat.h"
|
||||||
#include "tcoding.h"
|
#include "tcoding.h"
|
||||||
|
#include "tdatablock.h"
|
||||||
#include "tlog.h"
|
#include "tlog.h"
|
||||||
|
|
||||||
static void dataColSetNEleNull(SDataCol *pCol, int nEle);
|
static void dataColSetNEleNull(SDataCol *pCol, int nEle);
|
||||||
|
@ -128,6 +129,50 @@ void *tdDecodeSchema(void *buf, STSchema **pRSchema) {
|
||||||
return buf;
|
return buf;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if 0
|
||||||
|
int32_t tEncodeSTColumn(SCoder *pEncoder, const STColumn *pCol) {
|
||||||
|
if (tEncodeI16(pEncoder, pCol->colId) < 0) return -1;
|
||||||
|
if (tEncodeI8(pEncoder, pCol->type) < 0) return -1;
|
||||||
|
if (tEncodeI8(pEncoder, pCol->sma) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, pCol->bytes) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, pCol->offset) < 0) return -1;
|
||||||
|
return pEncoder->pos;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tDecodeSTColumn(SCoder *pDecoder, STColumn *pCol) {
|
||||||
|
if (tDecodeI16(pDecoder, &pCol->colId) < 0) return -1;
|
||||||
|
if (tDecodeI8(pDecoder, &pCol->type) < 0) return -1;
|
||||||
|
if (tDecodeI8(pDecoder, &pCol->sma) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &pCol->bytes) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &pCol->offset) < 0) return -1;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tEncodeSchema(SCoder *pEncoder, const STSchema *pSchema) {
|
||||||
|
if (tEncodeI32(pEncoder, pSchema->numOfCols) < 0) return -1;
|
||||||
|
if (tEncodeI16(pEncoder, pSchema->version) < 0) return -1;
|
||||||
|
if (tEncodeU16(pEncoder, pSchema->flen) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, pSchema->vlen) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, pSchema->tlen) < 0) return -1;
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < schemaNCols(pSchema); i++) {
|
||||||
|
const STColumn *pCol = schemaColAt(pSchema, i);
|
||||||
|
if (tEncodeSTColumn(pEncoder, pCol) < 0) return -1;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tDecodeSchema(SCoder *pDecoder, STSchema *pSchema) {
|
||||||
|
if (tDecodeI32(pDecoder, &pSchema->numOfCols) < 0) return -1;
|
||||||
|
if (tDecodeI16(pDecoder, &pSchema->version) < 0) return -1;
|
||||||
|
if (tDecodeU16(pDecoder, &pSchema->flen) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &pSchema->vlen) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &pSchema->tlen) < 0) return -1;
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
int tdInitTSchemaBuilder(STSchemaBuilder *pBuilder, schema_ver_t version) {
|
int tdInitTSchemaBuilder(STSchemaBuilder *pBuilder, schema_ver_t version) {
|
||||||
if (pBuilder == NULL) return -1;
|
if (pBuilder == NULL) return -1;
|
||||||
|
|
||||||
|
@ -908,4 +953,4 @@ SMemRow mergeTwoMemRows(void *buffer, SMemRow row1, SMemRow row2, STSchema *pSch
|
||||||
taosArrayDestroy(stashRow);
|
taosArrayDestroy(stashRow);
|
||||||
return buffer;
|
return buffer;
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -224,7 +224,8 @@ struct SConfig *taosGetCfg() {
|
||||||
return tsCfg;
|
return tsCfg;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t taosLoadCfg(SConfig *pCfg, const char **envCmd, const char *inputCfgDir, const char *envFile, char *apolloUrl) {
|
static int32_t taosLoadCfg(SConfig *pCfg, const char **envCmd, const char *inputCfgDir, const char *envFile,
|
||||||
|
char *apolloUrl) {
|
||||||
char cfgDir[PATH_MAX] = {0};
|
char cfgDir[PATH_MAX] = {0};
|
||||||
char cfgFile[PATH_MAX + 100] = {0};
|
char cfgFile[PATH_MAX + 100] = {0};
|
||||||
|
|
||||||
|
@ -300,15 +301,10 @@ static int32_t taosAddServerLogCfg(SConfig *pCfg) {
|
||||||
static int32_t taosAddClientCfg(SConfig *pCfg) {
|
static int32_t taosAddClientCfg(SConfig *pCfg) {
|
||||||
char defaultFqdn[TSDB_FQDN_LEN] = {0};
|
char defaultFqdn[TSDB_FQDN_LEN] = {0};
|
||||||
int32_t defaultServerPort = 6030;
|
int32_t defaultServerPort = 6030;
|
||||||
char defaultFirstEp[TSDB_EP_LEN] = {0};
|
|
||||||
char defaultSecondEp[TSDB_EP_LEN] = {0};
|
|
||||||
|
|
||||||
if (taosGetFqdn(defaultFqdn) != 0) return -1;
|
if (taosGetFqdn(defaultFqdn) != 0) return -1;
|
||||||
snprintf(defaultFirstEp, TSDB_EP_LEN, "%s:%d", defaultFqdn, defaultServerPort);
|
|
||||||
snprintf(defaultSecondEp, TSDB_EP_LEN, "%s:%d", defaultFqdn, defaultServerPort);
|
|
||||||
|
|
||||||
if (cfgAddString(pCfg, "firstEp", defaultFirstEp, 1) != 0) return -1;
|
if (cfgAddString(pCfg, "firstEp", "", 1) != 0) return -1;
|
||||||
if (cfgAddString(pCfg, "secondEp", defaultSecondEp, 1) != 0) return -1;
|
if (cfgAddString(pCfg, "secondEp", "", 1) != 0) return -1;
|
||||||
if (cfgAddString(pCfg, "fqdn", defaultFqdn, 1) != 0) return -1;
|
if (cfgAddString(pCfg, "fqdn", defaultFqdn, 1) != 0) return -1;
|
||||||
if (cfgAddInt32(pCfg, "serverPort", defaultServerPort, 1, 65056, 1) != 0) return -1;
|
if (cfgAddInt32(pCfg, "serverPort", defaultServerPort, 1, 65056, 1) != 0) return -1;
|
||||||
if (cfgAddDir(pCfg, "tempDir", tsTempDir, 1) != 0) return -1;
|
if (cfgAddDir(pCfg, "tempDir", tsTempDir, 1) != 0) return -1;
|
||||||
|
@ -478,15 +474,18 @@ static int32_t taosSetClientCfg(SConfig *pCfg) {
|
||||||
tsServerPort = (uint16_t)cfgGetItem(pCfg, "serverPort")->i32;
|
tsServerPort = (uint16_t)cfgGetItem(pCfg, "serverPort")->i32;
|
||||||
snprintf(tsLocalEp, sizeof(tsLocalEp), "%s:%u", tsLocalFqdn, tsServerPort);
|
snprintf(tsLocalEp, sizeof(tsLocalEp), "%s:%u", tsLocalFqdn, tsServerPort);
|
||||||
|
|
||||||
|
char defaultFirstEp[TSDB_EP_LEN] = {0};
|
||||||
|
snprintf(defaultFirstEp, TSDB_EP_LEN, "%s:%u", tsLocalFqdn, tsServerPort);
|
||||||
|
|
||||||
SConfigItem *pFirstEpItem = cfgGetItem(pCfg, "firstEp");
|
SConfigItem *pFirstEpItem = cfgGetItem(pCfg, "firstEp");
|
||||||
SEp firstEp = {0};
|
SEp firstEp = {0};
|
||||||
taosGetFqdnPortFromEp(pFirstEpItem->str, &firstEp);
|
taosGetFqdnPortFromEp(strlen(pFirstEpItem->str) == 0 ? defaultFirstEp : pFirstEpItem->str, &firstEp);
|
||||||
snprintf(tsFirst, sizeof(tsFirst), "%s:%u", firstEp.fqdn, firstEp.port);
|
snprintf(tsFirst, sizeof(tsFirst), "%s:%u", firstEp.fqdn, firstEp.port);
|
||||||
cfgSetItem(pCfg, "firstEp", tsFirst, pFirstEpItem->stype);
|
cfgSetItem(pCfg, "firstEp", tsFirst, pFirstEpItem->stype);
|
||||||
|
|
||||||
SConfigItem *pSecondpItem = cfgGetItem(pCfg, "secondEp");
|
SConfigItem *pSecondpItem = cfgGetItem(pCfg, "secondEp");
|
||||||
SEp secondEp = {0};
|
SEp secondEp = {0};
|
||||||
taosGetFqdnPortFromEp(pSecondpItem->str, &secondEp);
|
taosGetFqdnPortFromEp(strlen(pSecondpItem->str) == 0 ? defaultFirstEp : pSecondpItem->str, &secondEp);
|
||||||
snprintf(tsSecond, sizeof(tsSecond), "%s:%u", secondEp.fqdn, secondEp.port);
|
snprintf(tsSecond, sizeof(tsSecond), "%s:%u", secondEp.fqdn, secondEp.port);
|
||||||
cfgSetItem(pCfg, "secondEp", tsSecond, pSecondpItem->stype);
|
cfgSetItem(pCfg, "secondEp", tsSecond, pSecondpItem->stype);
|
||||||
|
|
||||||
|
@ -583,8 +582,8 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t taosCreateLog(const char *logname, int32_t logFileNum, const char *cfgDir, const char **envCmd, const char *envFile,
|
int32_t taosCreateLog(const char *logname, int32_t logFileNum, const char *cfgDir, const char **envCmd,
|
||||||
char *apolloUrl, SArray *pArgs, bool tsc) {
|
const char *envFile, char *apolloUrl, SArray *pArgs, bool tsc) {
|
||||||
osDefaultInit();
|
osDefaultInit();
|
||||||
|
|
||||||
SConfig *pCfg = cfgInit();
|
SConfig *pCfg = cfgInit();
|
||||||
|
@ -636,7 +635,24 @@ int32_t taosCreateLog(const char *logname, int32_t logFileNum, const char *cfgDi
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t taosInitCfg(const char *cfgDir, const char **envCmd, const char *envFile, char *apolloUrl, SArray *pArgs, bool tsc) {
|
static int32_t taosCheckGlobalCfg() {
|
||||||
|
uint32_t ipv4 = taosGetIpv4FromFqdn(tsLocalFqdn);
|
||||||
|
if (ipv4 == 0xffffffff) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
uError("failed to get ip from fqdn:%s since %s, dnode can not be initialized", tsLocalFqdn, terrstr());
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tsServerPort <= 0) {
|
||||||
|
uError("invalid server port:%u, dnode can not be initialized", tsServerPort);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t taosInitCfg(const char *cfgDir, const char **envCmd, const char *envFile, char *apolloUrl, SArray *pArgs,
|
||||||
|
bool tsc) {
|
||||||
if (tsCfg != NULL) return 0;
|
if (tsCfg != NULL) return 0;
|
||||||
tsCfg = cfgInit();
|
tsCfg = cfgInit();
|
||||||
|
|
||||||
|
@ -674,6 +690,11 @@ int32_t taosInitCfg(const char *cfgDir, const char **envCmd, const char *envFile
|
||||||
taosSetSystemCfg(tsCfg);
|
taosSetSystemCfg(tsCfg);
|
||||||
|
|
||||||
cfgDumpCfg(tsCfg, tsc, false);
|
cfgDumpCfg(tsCfg, tsc, false);
|
||||||
|
|
||||||
|
if (taosCheckGlobalCfg() != 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -103,6 +103,25 @@ STSRow *tGetSubmitBlkNext(SSubmitBlkIter *pIter) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t tPrintFixedSchemaSubmitReq(const SSubmitReq *pReq, STSchema *pTschema) {
|
||||||
|
SSubmitMsgIter msgIter = {0};
|
||||||
|
if (tInitSubmitMsgIter(pReq, &msgIter) < 0) return -1;
|
||||||
|
while (true) {
|
||||||
|
SSubmitBlk *pBlock = NULL;
|
||||||
|
if (tGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1;
|
||||||
|
if (pBlock == NULL) break;
|
||||||
|
SSubmitBlkIter blkIter = {0};
|
||||||
|
tInitSubmitBlkIter(&msgIter, pBlock, &blkIter);
|
||||||
|
STSRowIter rowIter = {0};
|
||||||
|
tdSTSRowIterInit(&rowIter, pTschema);
|
||||||
|
STSRow *row;
|
||||||
|
while ((row = tGetSubmitBlkNext(&blkIter)) != NULL) {
|
||||||
|
tdSRowPrint(row, pTschema, "stream");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t tEncodeSEpSet(SCoder *pEncoder, const SEpSet *pEp) {
|
int32_t tEncodeSEpSet(SCoder *pEncoder, const SEpSet *pEp) {
|
||||||
if (tEncodeI8(pEncoder, pEp->inUse) < 0) return -1;
|
if (tEncodeI8(pEncoder, pEp->inUse) < 0) return -1;
|
||||||
if (tEncodeI8(pEncoder, pEp->numOfEps) < 0) return -1;
|
if (tEncodeI8(pEncoder, pEp->numOfEps) < 0) return -1;
|
||||||
|
|
|
@ -419,6 +419,7 @@ typedef struct {
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char key[TSDB_PARTITION_KEY_LEN];
|
char key[TSDB_PARTITION_KEY_LEN];
|
||||||
|
int64_t dbUid;
|
||||||
int64_t offset;
|
int64_t offset;
|
||||||
} SMqOffsetObj;
|
} SMqOffsetObj;
|
||||||
|
|
||||||
|
|
|
@ -37,6 +37,8 @@ static FORCE_INLINE int32_t mndMakePartitionKey(char *key, const char *cgroup, c
|
||||||
return snprintf(key, TSDB_PARTITION_KEY_LEN, "%d:%s:%s", vgId, cgroup, topicName);
|
return snprintf(key, TSDB_PARTITION_KEY_LEN, "%d:%s:%s", vgId, cgroup, topicName);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t mndDropOffsetByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -31,6 +31,8 @@ void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub);
|
||||||
|
|
||||||
int32_t mndMakeSubscribeKey(char *key, const char *cgroup, const char *topicName);
|
int32_t mndMakeSubscribeKey(char *key, const char *cgroup, const char *topicName);
|
||||||
|
|
||||||
|
int32_t mndDropSubByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -61,6 +61,7 @@ void mndTransSetDbInfo(STrans *pTrans, SDbObj *pDb);
|
||||||
int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans);
|
int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans);
|
||||||
void mndTransProcessRsp(SNodeMsg *pRsp);
|
void mndTransProcessRsp(SNodeMsg *pRsp);
|
||||||
void mndTransPullup(SMnode *pMnode);
|
void mndTransPullup(SMnode *pMnode);
|
||||||
|
int32_t mndKillTrans(SMnode *pMnode, STrans *pTrans);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,9 +17,12 @@
|
||||||
#include "mndDb.h"
|
#include "mndDb.h"
|
||||||
#include "mndAuth.h"
|
#include "mndAuth.h"
|
||||||
#include "mndDnode.h"
|
#include "mndDnode.h"
|
||||||
|
#include "mndOffset.h"
|
||||||
#include "mndShow.h"
|
#include "mndShow.h"
|
||||||
#include "mndSma.h"
|
#include "mndSma.h"
|
||||||
#include "mndStb.h"
|
#include "mndStb.h"
|
||||||
|
#include "mndSubscribe.h"
|
||||||
|
#include "mndTopic.h"
|
||||||
#include "mndTrans.h"
|
#include "mndTrans.h"
|
||||||
#include "mndUser.h"
|
#include "mndUser.h"
|
||||||
#include "mndVgroup.h"
|
#include "mndVgroup.h"
|
||||||
|
@ -1027,6 +1030,9 @@ static int32_t mndDropDb(SMnode *pMnode, SNodeMsg *pReq, SDbObj *pDb) {
|
||||||
|
|
||||||
if (mndSetDropDbRedoLogs(pMnode, pTrans, pDb) != 0) goto _OVER;
|
if (mndSetDropDbRedoLogs(pMnode, pTrans, pDb) != 0) goto _OVER;
|
||||||
if (mndSetDropDbCommitLogs(pMnode, pTrans, pDb) != 0) goto _OVER;
|
if (mndSetDropDbCommitLogs(pMnode, pTrans, pDb) != 0) goto _OVER;
|
||||||
|
/*if (mndDropOffsetByDB(pMnode, pTrans, pDb) != 0) goto _OVER;*/
|
||||||
|
/*if (mndDropSubByDB(pMnode, pTrans, pDb) != 0) goto _OVER;*/
|
||||||
|
/*if (mndDropTopicByDB(pMnode, pTrans, pDb) != 0) goto _OVER;*/
|
||||||
if (mndSetDropDbRedoActions(pMnode, pTrans, pDb) != 0) goto _OVER;
|
if (mndSetDropDbRedoActions(pMnode, pTrans, pDb) != 0) goto _OVER;
|
||||||
|
|
||||||
int32_t rspLen = 0;
|
int32_t rspLen = 0;
|
||||||
|
@ -1387,7 +1393,7 @@ static void dumpDbInfoData(SSDataBlock *pBlock, SDbObj *pDb, SShowObj *pShow, in
|
||||||
bool sysDb) {
|
bool sysDb) {
|
||||||
int32_t cols = 0;
|
int32_t cols = 0;
|
||||||
|
|
||||||
int32_t bytes = pShow->pMeta->pSchemas[cols].bytes;
|
int32_t bytes = pShow->pMeta->pSchemas[cols].bytes;
|
||||||
char *buf = taosMemoryMalloc(bytes);
|
char *buf = taosMemoryMalloc(bytes);
|
||||||
const char *name = mndGetDbStr(pDb->name);
|
const char *name = mndGetDbStr(pDb->name);
|
||||||
if (name != NULL) {
|
if (name != NULL) {
|
||||||
|
|
|
@ -231,3 +231,36 @@ static void mndCancelGetNextOffset(SMnode *pMnode, void *pIter) {
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
sdbCancelFetch(pSdb, pIter);
|
sdbCancelFetch(pSdb, pIter);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t mndSetDropOffsetCommitLogs(SMnode *pMnode, STrans *pTrans, SMqOffsetObj *pOffset) {
|
||||||
|
SSdbRaw *pCommitRaw = mndOffsetActionEncode(pOffset);
|
||||||
|
if (pCommitRaw == NULL) return -1;
|
||||||
|
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
|
||||||
|
if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED) != 0) return -1;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t mndDropOffsetByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
|
||||||
|
int32_t code = -1;
|
||||||
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
|
|
||||||
|
void *pIter = NULL;
|
||||||
|
SMqOffsetObj *pOffset = NULL;
|
||||||
|
while (1) {
|
||||||
|
pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pOffset);
|
||||||
|
if (pIter == NULL) break;
|
||||||
|
|
||||||
|
if (pOffset->dbUid != pDb->uid) {
|
||||||
|
sdbRelease(pSdb, pOffset);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (mndSetDropOffsetCommitLogs(pMnode, pTrans, pOffset) < 0) {
|
||||||
|
goto END;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
code = 0;
|
||||||
|
END:
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
|
@ -204,6 +204,8 @@ int32_t mndAddShuffledSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* p
|
||||||
pTask->smaSink.smaId = pStream->smaId;
|
pTask->smaSink.smaId = pStream->smaId;
|
||||||
} else {
|
} else {
|
||||||
pTask->sinkType = TASK_SINK__TABLE;
|
pTask->sinkType = TASK_SINK__TABLE;
|
||||||
|
pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
|
||||||
|
ASSERT(pTask->tbSink.pSchemaWrapper);
|
||||||
}
|
}
|
||||||
|
|
||||||
// dispatch
|
// dispatch
|
||||||
|
@ -242,6 +244,7 @@ int32_t mndAddFixedSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStr
|
||||||
pTask->smaSink.smaId = pStream->smaId;
|
pTask->smaSink.smaId = pStream->smaId;
|
||||||
} else {
|
} else {
|
||||||
pTask->sinkType = TASK_SINK__TABLE;
|
pTask->sinkType = TASK_SINK__TABLE;
|
||||||
|
pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
|
||||||
}
|
}
|
||||||
//
|
//
|
||||||
// dispatch
|
// dispatch
|
||||||
|
@ -316,6 +319,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
|
||||||
pTask->smaSink.smaId = pStream->smaId;
|
pTask->smaSink.smaId = pStream->smaId;
|
||||||
} else {
|
} else {
|
||||||
pTask->sinkType = TASK_SINK__TABLE;
|
pTask->sinkType = TASK_SINK__TABLE;
|
||||||
|
pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
|
@ -997,7 +997,12 @@ static int32_t mndTransSendActionMsg(SMnode *pMnode, STrans *pTrans, SArray *pAr
|
||||||
pAction->msgReceived = 0;
|
pAction->msgReceived = 0;
|
||||||
pAction->errCode = 0;
|
pAction->errCode = 0;
|
||||||
} else {
|
} else {
|
||||||
if (terrno == TSDB_CODE_INVALID_PTR) rpcFreeCont(rpcMsg.pCont);
|
pAction->msgSent = 0;
|
||||||
|
pAction->msgReceived = 0;
|
||||||
|
pAction->errCode = terrno;
|
||||||
|
if (terrno == TSDB_CODE_INVALID_PTR || terrno == TSDB_CODE_NODE_OFFLINE) {
|
||||||
|
rpcFreeCont(rpcMsg.pCont);
|
||||||
|
}
|
||||||
mError("trans:%d, action:%d not send since %s", pTrans->id, action, terrstr());
|
mError("trans:%d, action:%d not send since %s", pTrans->id, action, terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -1275,7 +1280,7 @@ static int32_t mndProcessTransReq(SNodeMsg *pReq) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndKillTrans(SMnode *pMnode, STrans *pTrans) {
|
int32_t mndKillTrans(SMnode *pMnode, STrans *pTrans) {
|
||||||
SArray *pArray = NULL;
|
SArray *pArray = NULL;
|
||||||
if (pTrans->stage == TRN_STAGE_REDO_ACTION) {
|
if (pTrans->stage == TRN_STAGE_REDO_ACTION) {
|
||||||
pArray = pTrans->redoActions;
|
pArray = pTrans->redoActions;
|
||||||
|
@ -1293,14 +1298,14 @@ static int32_t mndKillTrans(SMnode *pMnode, STrans *pTrans) {
|
||||||
if (pAction == NULL) continue;
|
if (pAction == NULL) continue;
|
||||||
|
|
||||||
if (pAction->msgReceived == 0) {
|
if (pAction->msgReceived == 0) {
|
||||||
mInfo("trans:%d, action:%d set processed", pTrans->id, i);
|
mInfo("trans:%d, action:%d set processed for kill msg received", pTrans->id, i);
|
||||||
pAction->msgSent = 1;
|
pAction->msgSent = 1;
|
||||||
pAction->msgReceived = 1;
|
pAction->msgReceived = 1;
|
||||||
pAction->errCode = 0;
|
pAction->errCode = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pAction->errCode != 0) {
|
if (pAction->errCode != 0) {
|
||||||
mInfo("trans:%d, action:%d set processed, errCode from %s to success", pTrans->id, i,
|
mInfo("trans:%d, action:%d set processed for kill msg received, errCode from %s to success", pTrans->id, i,
|
||||||
tstrerror(pAction->errCode));
|
tstrerror(pAction->errCode));
|
||||||
pAction->msgSent = 1;
|
pAction->msgSent = 1;
|
||||||
pAction->msgReceived = 1;
|
pAction->msgReceived = 1;
|
||||||
|
|
|
@ -86,7 +86,7 @@ class MndTestTrans2 : public ::testing::Test {
|
||||||
void SetUp() override {}
|
void SetUp() override {}
|
||||||
void TearDown() override {}
|
void TearDown() override {}
|
||||||
|
|
||||||
int32_t CreateUserLog(const char *acct, const char *user) {
|
int32_t CreateUserLog(const char *acct, const char *user, ETrnType type, SDbObj *pDb) {
|
||||||
SUserObj userObj = {0};
|
SUserObj userObj = {0};
|
||||||
taosEncryptPass_c((uint8_t *)"taosdata", strlen("taosdata"), userObj.pass);
|
taosEncryptPass_c((uint8_t *)"taosdata", strlen("taosdata"), userObj.pass);
|
||||||
tstrncpy(userObj.user, user, TSDB_USER_LEN);
|
tstrncpy(userObj.user, user, TSDB_USER_LEN);
|
||||||
|
@ -96,7 +96,7 @@ class MndTestTrans2 : public ::testing::Test {
|
||||||
userObj.superUser = 1;
|
userObj.superUser = 1;
|
||||||
|
|
||||||
SRpcMsg rpcMsg = {0};
|
SRpcMsg rpcMsg = {0};
|
||||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_USER, &rpcMsg);
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, type, &rpcMsg);
|
||||||
SSdbRaw *pRedoRaw = mndUserActionEncode(&userObj);
|
SSdbRaw *pRedoRaw = mndUserActionEncode(&userObj);
|
||||||
mndTransAppendRedolog(pTrans, pRedoRaw);
|
mndTransAppendRedolog(pTrans, pRedoRaw);
|
||||||
sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY);
|
sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY);
|
||||||
|
@ -108,13 +108,18 @@ class MndTestTrans2 : public ::testing::Test {
|
||||||
char *param = strdup("====> test log <=====");
|
char *param = strdup("====> test log <=====");
|
||||||
mndTransSetCb(pTrans, TEST_TRANS_START_FUNC, TEST_TRANS_STOP_FUNC, param, strlen(param) + 1);
|
mndTransSetCb(pTrans, TEST_TRANS_START_FUNC, TEST_TRANS_STOP_FUNC, param, strlen(param) + 1);
|
||||||
|
|
||||||
|
if (pDb != NULL) {
|
||||||
|
mndTransSetDbInfo(pTrans, pDb);
|
||||||
|
}
|
||||||
|
|
||||||
int32_t code = mndTransPrepare(pMnode, pTrans);
|
int32_t code = mndTransPrepare(pMnode, pTrans);
|
||||||
mndTransDrop(pTrans);
|
mndTransDrop(pTrans);
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t CreateUserAction(const char *acct, const char *user, bool hasUndoAction, ETrnPolicy policy) {
|
int32_t CreateUserAction(const char *acct, const char *user, bool hasUndoAction, ETrnPolicy policy, ETrnType type,
|
||||||
|
SDbObj *pDb) {
|
||||||
SUserObj userObj = {0};
|
SUserObj userObj = {0};
|
||||||
taosEncryptPass_c((uint8_t *)"taosdata", strlen("taosdata"), userObj.pass);
|
taosEncryptPass_c((uint8_t *)"taosdata", strlen("taosdata"), userObj.pass);
|
||||||
tstrncpy(userObj.user, user, TSDB_USER_LEN);
|
tstrncpy(userObj.user, user, TSDB_USER_LEN);
|
||||||
|
@ -124,7 +129,7 @@ class MndTestTrans2 : public ::testing::Test {
|
||||||
userObj.superUser = 1;
|
userObj.superUser = 1;
|
||||||
|
|
||||||
SRpcMsg rpcMsg = {0};
|
SRpcMsg rpcMsg = {0};
|
||||||
STrans *pTrans = mndTransCreate(pMnode, policy, TRN_TYPE_CREATE_USER, &rpcMsg);
|
STrans *pTrans = mndTransCreate(pMnode, policy, type, &rpcMsg);
|
||||||
SSdbRaw *pRedoRaw = mndUserActionEncode(&userObj);
|
SSdbRaw *pRedoRaw = mndUserActionEncode(&userObj);
|
||||||
mndTransAppendRedolog(pTrans, pRedoRaw);
|
mndTransAppendRedolog(pTrans, pRedoRaw);
|
||||||
sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY);
|
sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY);
|
||||||
|
@ -170,6 +175,44 @@ class MndTestTrans2 : public ::testing::Test {
|
||||||
mndTransAppendUndoAction(pTrans, &action);
|
mndTransAppendUndoAction(pTrans, &action);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
void *pRsp = taosMemoryCalloc(1, 256);
|
||||||
|
strcpy((char *)pRsp, "simple rsponse");
|
||||||
|
mndTransSetRpcRsp(pTrans, pRsp, 256);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pDb != NULL) {
|
||||||
|
mndTransSetDbInfo(pTrans, pDb);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t code = mndTransPrepare(pMnode, pTrans);
|
||||||
|
mndTransDrop(pTrans);
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t CreateUserGlobal(const char *acct, const char *user) {
|
||||||
|
SUserObj userObj = {0};
|
||||||
|
taosEncryptPass_c((uint8_t *)"taosdata", strlen("taosdata"), userObj.pass);
|
||||||
|
tstrncpy(userObj.user, user, TSDB_USER_LEN);
|
||||||
|
tstrncpy(userObj.acct, acct, TSDB_USER_LEN);
|
||||||
|
userObj.createdTime = taosGetTimestampMs();
|
||||||
|
userObj.updateTime = userObj.createdTime;
|
||||||
|
userObj.superUser = 1;
|
||||||
|
|
||||||
|
SRpcMsg rpcMsg = {0};
|
||||||
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_USER, &rpcMsg);
|
||||||
|
SSdbRaw *pRedoRaw = mndUserActionEncode(&userObj);
|
||||||
|
mndTransAppendRedolog(pTrans, pRedoRaw);
|
||||||
|
sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY);
|
||||||
|
|
||||||
|
SSdbRaw *pUndoRaw = mndUserActionEncode(&userObj);
|
||||||
|
mndTransAppendUndolog(pTrans, pUndoRaw);
|
||||||
|
sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED);
|
||||||
|
|
||||||
|
char *param = strdup("====> test log <=====");
|
||||||
|
mndTransSetCb(pTrans, TEST_TRANS_START_FUNC, TEST_TRANS_STOP_FUNC, param, strlen(param) + 1);
|
||||||
|
|
||||||
int32_t code = mndTransPrepare(pMnode, pTrans);
|
int32_t code = mndTransPrepare(pMnode, pTrans);
|
||||||
mndTransDrop(pTrans);
|
mndTransDrop(pTrans);
|
||||||
|
|
||||||
|
@ -189,12 +232,12 @@ TEST_F(MndTestTrans2, 01_Log) {
|
||||||
|
|
||||||
ASSERT_NE(pMnode, nullptr);
|
ASSERT_NE(pMnode, nullptr);
|
||||||
|
|
||||||
EXPECT_EQ(CreateUserLog(acct, user1), 0);
|
EXPECT_EQ(CreateUserLog(acct, user1, TRN_TYPE_CREATE_USER, NULL), 0);
|
||||||
pUser1 = mndAcquireUser(pMnode, user1);
|
pUser1 = mndAcquireUser(pMnode, user1);
|
||||||
ASSERT_NE(pUser1, nullptr);
|
ASSERT_NE(pUser1, nullptr);
|
||||||
|
|
||||||
// failed to create user and rollback
|
// failed to create user and rollback
|
||||||
EXPECT_EQ(CreateUserLog(acct_invalid, user2), 0);
|
EXPECT_EQ(CreateUserLog(acct_invalid, user2, TRN_TYPE_CREATE_USER, NULL), 0);
|
||||||
pUser2 = mndAcquireUser(pMnode, user2);
|
pUser2 = mndAcquireUser(pMnode, user2);
|
||||||
ASSERT_EQ(pUser2, nullptr);
|
ASSERT_EQ(pUser2, nullptr);
|
||||||
|
|
||||||
|
@ -214,44 +257,46 @@ TEST_F(MndTestTrans2, 02_Action) {
|
||||||
|
|
||||||
ASSERT_NE(pMnode, nullptr);
|
ASSERT_NE(pMnode, nullptr);
|
||||||
|
|
||||||
// failed to create user and rollback
|
|
||||||
EXPECT_EQ(CreateUserAction(acct, user1, false, TRN_POLICY_ROLLBACK), 0);
|
|
||||||
pUser1 = mndAcquireUser(pMnode, user1);
|
|
||||||
ASSERT_EQ(pUser1, nullptr);
|
|
||||||
mndReleaseUser(pMnode, pUser1);
|
|
||||||
|
|
||||||
// create user, and fake a response
|
|
||||||
{
|
{
|
||||||
EXPECT_EQ(CreateUserAction(acct, user1, true, TRN_POLICY_ROLLBACK), 0);
|
// failed to create user and rollback
|
||||||
pUser1 = mndAcquireUser(pMnode, user1);
|
EXPECT_EQ(CreateUserAction(acct, user1, false, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_USER, NULL), 0);
|
||||||
ASSERT_NE(pUser1, nullptr);
|
|
||||||
mndReleaseUser(pMnode, pUser1);
|
|
||||||
|
|
||||||
transId = 4;
|
|
||||||
pTrans = mndAcquireTrans(pMnode, transId);
|
|
||||||
EXPECT_EQ(pTrans->code, TSDB_CODE_INVALID_PTR);
|
|
||||||
EXPECT_EQ(pTrans->stage, TRN_STAGE_UNDO_ACTION);
|
|
||||||
EXPECT_EQ(pTrans->failedTimes, 1);
|
|
||||||
|
|
||||||
STransAction *pAction = (STransAction *)taosArrayGet(pTrans->undoActions, action);
|
|
||||||
pAction->msgSent = 1;
|
|
||||||
|
|
||||||
SNodeMsg rspMsg = {0};
|
|
||||||
rspMsg.pNode = pMnode;
|
|
||||||
int64_t signature = transId;
|
|
||||||
signature = (signature << 32);
|
|
||||||
signature += action;
|
|
||||||
rspMsg.rpcMsg.ahandle = (void *)signature;
|
|
||||||
mndTransProcessRsp(&rspMsg);
|
|
||||||
mndReleaseTrans(pMnode, pTrans);
|
|
||||||
|
|
||||||
pUser1 = mndAcquireUser(pMnode, user1);
|
pUser1 = mndAcquireUser(pMnode, user1);
|
||||||
ASSERT_EQ(pUser1, nullptr);
|
ASSERT_EQ(pUser1, nullptr);
|
||||||
mndReleaseUser(pMnode, pUser1);
|
mndReleaseUser(pMnode, pUser1);
|
||||||
|
|
||||||
|
// create user, and fake a response
|
||||||
|
{
|
||||||
|
EXPECT_EQ(CreateUserAction(acct, user1, true, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_USER, NULL), 0);
|
||||||
|
pUser1 = mndAcquireUser(pMnode, user1);
|
||||||
|
ASSERT_NE(pUser1, nullptr);
|
||||||
|
mndReleaseUser(pMnode, pUser1);
|
||||||
|
|
||||||
|
transId = 4;
|
||||||
|
pTrans = mndAcquireTrans(pMnode, transId);
|
||||||
|
EXPECT_EQ(pTrans->code, TSDB_CODE_INVALID_PTR);
|
||||||
|
EXPECT_EQ(pTrans->stage, TRN_STAGE_UNDO_ACTION);
|
||||||
|
EXPECT_EQ(pTrans->failedTimes, 1);
|
||||||
|
|
||||||
|
STransAction *pAction = (STransAction *)taosArrayGet(pTrans->undoActions, action);
|
||||||
|
pAction->msgSent = 1;
|
||||||
|
|
||||||
|
SNodeMsg rspMsg = {0};
|
||||||
|
rspMsg.pNode = pMnode;
|
||||||
|
int64_t signature = transId;
|
||||||
|
signature = (signature << 32);
|
||||||
|
signature += action;
|
||||||
|
rspMsg.rpcMsg.ahandle = (void *)signature;
|
||||||
|
mndTransProcessRsp(&rspMsg);
|
||||||
|
mndReleaseTrans(pMnode, pTrans);
|
||||||
|
|
||||||
|
pUser1 = mndAcquireUser(pMnode, user1);
|
||||||
|
ASSERT_EQ(pUser1, nullptr);
|
||||||
|
mndReleaseUser(pMnode, pUser1);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
EXPECT_EQ(CreateUserAction(acct, user1, false, TRN_POLICY_RETRY), 0);
|
EXPECT_EQ(CreateUserAction(acct, user1, false, TRN_POLICY_RETRY, TRN_TYPE_CREATE_USER, NULL), 0);
|
||||||
pUser1 = mndAcquireUser(pMnode, user1);
|
pUser1 = mndAcquireUser(pMnode, user1);
|
||||||
ASSERT_NE(pUser1, nullptr);
|
ASSERT_NE(pUser1, nullptr);
|
||||||
mndReleaseUser(pMnode, pUser1);
|
mndReleaseUser(pMnode, pUser1);
|
||||||
|
@ -305,4 +350,164 @@ TEST_F(MndTestTrans2, 02_Action) {
|
||||||
mndReleaseUser(pMnode, pUser1);
|
mndReleaseUser(pMnode, pUser1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
EXPECT_EQ(CreateUserAction(acct, user2, true, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_USER, NULL), 0);
|
||||||
|
SUserObj *pUser2 = (SUserObj *)sdbAcquire(pMnode->pSdb, SDB_USER, user2);
|
||||||
|
ASSERT_NE(pUser2, nullptr);
|
||||||
|
mndReleaseUser(pMnode, pUser2);
|
||||||
|
|
||||||
|
{
|
||||||
|
transId = 6;
|
||||||
|
pTrans = mndAcquireTrans(pMnode, transId);
|
||||||
|
EXPECT_EQ(pTrans->code, TSDB_CODE_INVALID_PTR);
|
||||||
|
EXPECT_EQ(pTrans->stage, TRN_STAGE_UNDO_ACTION);
|
||||||
|
EXPECT_EQ(pTrans->failedTimes, 1);
|
||||||
|
|
||||||
|
SNodeMsg rspMsg = {0};
|
||||||
|
rspMsg.pNode = pMnode;
|
||||||
|
int64_t signature = transId;
|
||||||
|
signature = (signature << 32);
|
||||||
|
signature += action;
|
||||||
|
rspMsg.rpcMsg.ahandle = (void *)signature;
|
||||||
|
rspMsg.rpcMsg.code = 0;
|
||||||
|
mndTransProcessRsp(&rspMsg);
|
||||||
|
mndReleaseTrans(pMnode, pTrans);
|
||||||
|
|
||||||
|
pUser2 = mndAcquireUser(pMnode, user2);
|
||||||
|
ASSERT_NE(pUser2, nullptr);
|
||||||
|
mndReleaseUser(pMnode, pUser2);
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
transId = 6;
|
||||||
|
pTrans = mndAcquireTrans(pMnode, transId);
|
||||||
|
EXPECT_EQ(pTrans->code, TSDB_CODE_INVALID_PTR);
|
||||||
|
EXPECT_EQ(pTrans->stage, TRN_STAGE_UNDO_ACTION);
|
||||||
|
EXPECT_EQ(pTrans->failedTimes, 2);
|
||||||
|
|
||||||
|
STransAction *pAction = (STransAction *)taosArrayGet(pTrans->undoActions, action);
|
||||||
|
pAction->msgSent = 1;
|
||||||
|
|
||||||
|
SNodeMsg rspMsg = {0};
|
||||||
|
rspMsg.pNode = pMnode;
|
||||||
|
int64_t signature = transId;
|
||||||
|
signature = (signature << 32);
|
||||||
|
signature += action;
|
||||||
|
rspMsg.rpcMsg.ahandle = (void *)signature;
|
||||||
|
mndTransProcessRsp(&rspMsg);
|
||||||
|
mndReleaseTrans(pMnode, pTrans);
|
||||||
|
|
||||||
|
pUser2 = mndAcquireUser(pMnode, user2);
|
||||||
|
ASSERT_EQ(pUser2, nullptr);
|
||||||
|
mndReleaseUser(pMnode, pUser2);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_F(MndTestTrans2, 03_Kill) {
|
||||||
|
const char *acct = "root";
|
||||||
|
const char *user1 = "kill1";
|
||||||
|
const char *user2 = "kill2";
|
||||||
|
SUserObj *pUser1 = NULL;
|
||||||
|
SUserObj *pUser2 = NULL;
|
||||||
|
STrans *pTrans = NULL;
|
||||||
|
int32_t transId = 0;
|
||||||
|
int32_t action = 0;
|
||||||
|
|
||||||
|
ASSERT_NE(pMnode, nullptr);
|
||||||
|
|
||||||
|
{
|
||||||
|
EXPECT_EQ(CreateUserAction(acct, user1, true, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_USER, NULL), 0);
|
||||||
|
pUser1 = mndAcquireUser(pMnode, user1);
|
||||||
|
ASSERT_NE(pUser1, nullptr);
|
||||||
|
mndReleaseUser(pMnode, pUser1);
|
||||||
|
|
||||||
|
transId = 7;
|
||||||
|
pTrans = mndAcquireTrans(pMnode, transId);
|
||||||
|
EXPECT_EQ(pTrans->code, TSDB_CODE_INVALID_PTR);
|
||||||
|
EXPECT_EQ(pTrans->stage, TRN_STAGE_UNDO_ACTION);
|
||||||
|
EXPECT_EQ(pTrans->failedTimes, 1);
|
||||||
|
|
||||||
|
mndKillTrans(pMnode, pTrans);
|
||||||
|
mndReleaseTrans(pMnode, pTrans);
|
||||||
|
|
||||||
|
pUser1 = mndAcquireUser(pMnode, user1);
|
||||||
|
ASSERT_EQ(pUser1, nullptr);
|
||||||
|
mndReleaseUser(pMnode, pUser1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(MndTestTrans2, 04_Conflict) {
|
||||||
|
const char *acct = "root";
|
||||||
|
const char *user1 = "conflict1";
|
||||||
|
const char *user2 = "conflict2";
|
||||||
|
const char *user3 = "conflict3";
|
||||||
|
const char *user4 = "conflict4";
|
||||||
|
const char *user5 = "conflict5";
|
||||||
|
const char *user6 = "conflict6";
|
||||||
|
const char *user7 = "conflict7";
|
||||||
|
const char *user8 = "conflict8";
|
||||||
|
SUserObj *pUser = NULL;
|
||||||
|
STrans *pTrans = NULL;
|
||||||
|
int32_t transId = 0;
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
ASSERT_NE(pMnode, nullptr);
|
||||||
|
|
||||||
|
{
|
||||||
|
SDbObj dbObj1 = {0};
|
||||||
|
dbObj1.uid = 9521;
|
||||||
|
strcpy(dbObj1.name, "db");
|
||||||
|
SDbObj dbObj2 = {0};
|
||||||
|
dbObj2.uid = 9522;
|
||||||
|
strcpy(dbObj2.name, "conflict db");
|
||||||
|
|
||||||
|
EXPECT_EQ(CreateUserAction(acct, user1, true, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_STB, &dbObj1), 0);
|
||||||
|
pUser = mndAcquireUser(pMnode, user1);
|
||||||
|
ASSERT_NE(pUser, nullptr);
|
||||||
|
mndReleaseUser(pMnode, pUser);
|
||||||
|
|
||||||
|
transId = 8;
|
||||||
|
pTrans = mndAcquireTrans(pMnode, transId);
|
||||||
|
EXPECT_EQ(pTrans->code, TSDB_CODE_INVALID_PTR);
|
||||||
|
EXPECT_EQ(pTrans->stage, TRN_STAGE_UNDO_ACTION);
|
||||||
|
|
||||||
|
// stb scope
|
||||||
|
EXPECT_EQ(CreateUserLog(acct, user2, TRN_TYPE_CREATE_DNODE, NULL), -1);
|
||||||
|
code = terrno;
|
||||||
|
EXPECT_EQ(code, TSDB_CODE_MND_TRANS_CONFLICT);
|
||||||
|
|
||||||
|
EXPECT_EQ(CreateUserLog(acct, user2, TRN_TYPE_CREATE_DB, &dbObj1), -1);
|
||||||
|
EXPECT_EQ(CreateUserLog(acct, user2, TRN_TYPE_CREATE_DB, &dbObj2), 0);
|
||||||
|
EXPECT_EQ(CreateUserLog(acct, user3, TRN_TYPE_CREATE_STB, &dbObj1), 0);
|
||||||
|
|
||||||
|
// db scope
|
||||||
|
pTrans->type = TRN_TYPE_CREATE_DB;
|
||||||
|
EXPECT_EQ(CreateUserLog(acct, user4, TRN_TYPE_CREATE_DNODE, NULL), -1);
|
||||||
|
EXPECT_EQ(CreateUserLog(acct, user4, TRN_TYPE_CREATE_DB, &dbObj1), -1);
|
||||||
|
EXPECT_EQ(CreateUserLog(acct, user4, TRN_TYPE_CREATE_DB, &dbObj2), 0);
|
||||||
|
EXPECT_EQ(CreateUserLog(acct, user5, TRN_TYPE_CREATE_STB, &dbObj1), -1);
|
||||||
|
EXPECT_EQ(CreateUserLog(acct, user5, TRN_TYPE_CREATE_STB, &dbObj2), 0);
|
||||||
|
|
||||||
|
// global scope
|
||||||
|
pTrans->type = TRN_TYPE_CREATE_DNODE;
|
||||||
|
EXPECT_EQ(CreateUserLog(acct, user6, TRN_TYPE_CREATE_DNODE, NULL), 0);
|
||||||
|
EXPECT_EQ(CreateUserLog(acct, user7, TRN_TYPE_CREATE_DB, &dbObj1), -1);
|
||||||
|
EXPECT_EQ(CreateUserLog(acct, user7, TRN_TYPE_CREATE_DB, &dbObj2), -1);
|
||||||
|
EXPECT_EQ(CreateUserLog(acct, user7, TRN_TYPE_CREATE_STB, &dbObj1), -1);
|
||||||
|
EXPECT_EQ(CreateUserLog(acct, user7, TRN_TYPE_CREATE_STB, &dbObj2), -1);
|
||||||
|
|
||||||
|
// global scope
|
||||||
|
pTrans->type = TRN_TYPE_CREATE_USER;
|
||||||
|
EXPECT_EQ(CreateUserLog(acct, user7, TRN_TYPE_CREATE_DB, &dbObj1), 0);
|
||||||
|
EXPECT_EQ(CreateUserLog(acct, user8, TRN_TYPE_CREATE_DB, &dbObj2), 0);
|
||||||
|
|
||||||
|
mndKillTrans(pMnode, pTrans);
|
||||||
|
mndReleaseTrans(pMnode, pTrans);
|
||||||
|
|
||||||
|
pUser = mndAcquireUser(pMnode, user1);
|
||||||
|
ASSERT_EQ(pUser, nullptr);
|
||||||
|
mndReleaseUser(pMnode, pUser);
|
||||||
|
}
|
||||||
|
}
|
|
@ -926,6 +926,12 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) {
|
||||||
pTask->ahandle = pTq->pVnode;
|
pTask->ahandle = pTq->pVnode;
|
||||||
if (pTask->sinkType == TASK_SINK__SMA) {
|
if (pTask->sinkType == TASK_SINK__SMA) {
|
||||||
pTask->smaSink.smaHandle = smaHandleRes;
|
pTask->smaSink.smaHandle = smaHandleRes;
|
||||||
|
} else if (pTask->sinkType == TASK_SINK__TABLE) {
|
||||||
|
ASSERT(pTask->tbSink.pSchemaWrapper);
|
||||||
|
ASSERT(pTask->tbSink.pSchemaWrapper->pSchema);
|
||||||
|
pTask->tbSink.pTSchema =
|
||||||
|
tdGetSTSChemaFromSSChema(&pTask->tbSink.pSchemaWrapper->pSchema, pTask->tbSink.pSchemaWrapper->nCols);
|
||||||
|
ASSERT(pTask->tbSink.pTSchema);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosHashPut(pTq->pStreamTasks, &pTask->taskId, sizeof(int32_t), pTask, sizeof(SStreamTask));
|
taosHashPut(pTq->pStreamTasks, &pTask->taskId, sizeof(int32_t), pTask, sizeof(SStreamTask));
|
||||||
|
|
|
@ -4830,6 +4830,7 @@ static SArray* createSortInfo(SNodeList* pNodeList, SNodeList* pNodeListTarget);
|
||||||
static SArray* createIndexMap(SNodeList* pNodeList);
|
static SArray* createIndexMap(SNodeList* pNodeList);
|
||||||
static SArray* extractPartitionColInfo(SNodeList* pNodeList);
|
static SArray* extractPartitionColInfo(SNodeList* pNodeList);
|
||||||
static int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode);
|
static int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode);
|
||||||
|
static void setJoinColumnInfo(SColumnInfo* pInfo, const SColumnNode* pLeftNode);
|
||||||
|
|
||||||
static SInterval extractIntervalInfo(const STableScanPhysiNode* pTableScanNode) {
|
static SInterval extractIntervalInfo(const STableScanPhysiNode* pTableScanNode) {
|
||||||
SInterval interval = {
|
SInterval interval = {
|
||||||
|
@ -5624,25 +5625,29 @@ SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOf
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
pOperator->resultInfo.capacity = 4096;
|
initResultSizeInfo(pOperator, 4096);
|
||||||
pOperator->resultInfo.threshold = 4096 * 0.75;
|
|
||||||
|
|
||||||
// initResultRowInf
|
pInfo->pRes = pResBlock;
|
||||||
// o(&pInfo->binfo.resultRowInfo, 8);
|
pOperator->name = "MergeJoinOperator";
|
||||||
pInfo->pRes = pResBlock;
|
|
||||||
|
|
||||||
pOperator->name = "JoinOperator";
|
|
||||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_JOIN;
|
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_JOIN;
|
||||||
pOperator->blocking = false;
|
pOperator->blocking = false;
|
||||||
pOperator->status = OP_NOT_OPENED;
|
pOperator->status = OP_NOT_OPENED;
|
||||||
pOperator->pExpr = pExprInfo;
|
pOperator->pExpr = pExprInfo;
|
||||||
pOperator->numOfOutput = numOfCols;
|
pOperator->numOfOutput = numOfCols;
|
||||||
pOperator->info = pInfo;
|
pOperator->info = pInfo;
|
||||||
pOperator->pTaskInfo = pTaskInfo;
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
|
|
||||||
|
SOperatorNode* pNode = (SOperatorNode*)pOnCondition;
|
||||||
|
setJoinColumnInfo(&pInfo->leftCol, (SColumnNode*)pNode->pLeft);
|
||||||
|
setJoinColumnInfo(&pInfo->rightCol, (SColumnNode*)pNode->pRight);
|
||||||
|
|
||||||
pOperator->fpSet =
|
pOperator->fpSet =
|
||||||
createOperatorFpSet(operatorDummyOpenFn, doMergeJoin, NULL, NULL, destroyBasicOperatorInfo, NULL, NULL, NULL);
|
createOperatorFpSet(operatorDummyOpenFn, doMergeJoin, NULL, NULL, destroyBasicOperatorInfo, NULL, NULL, NULL);
|
||||||
int32_t code = appendDownstream(pOperator, pDownstream, numOfDownstream);
|
int32_t code = appendDownstream(pOperator, pDownstream, numOfDownstream);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
goto _error;
|
||||||
|
}
|
||||||
|
|
||||||
return pOperator;
|
return pOperator;
|
||||||
|
|
||||||
_error:
|
_error:
|
||||||
|
@ -5651,3 +5656,11 @@ _error:
|
||||||
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
|
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode) {
|
||||||
|
pColumn->slotId = pColumnNode->slotId;
|
||||||
|
pColumn->type = pColumnNode->node.resType.type;
|
||||||
|
pColumn->bytes = pColumnNode->node.resType.bytes;
|
||||||
|
pColumn->precision = pColumnNode->node.resType.precision;
|
||||||
|
pColumn->scale = pColumnNode->node.resType.scale;
|
||||||
|
}
|
||||||
|
|
|
@ -913,7 +913,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
{
|
{
|
||||||
.name = "tbname",
|
.name = "tbname",
|
||||||
.type = FUNCTION_TYPE_TBNAME,
|
.type = FUNCTION_TYPE_TBNAME,
|
||||||
.classification = FUNC_MGT_PSEUDO_COLUMN_FUNC,
|
.classification = FUNC_MGT_PSEUDO_COLUMN_FUNC | FUNC_MGT_SCAN_PC_FUNC,
|
||||||
.translateFunc = translateTbnameColumn,
|
.translateFunc = translateTbnameColumn,
|
||||||
.getEnvFunc = NULL,
|
.getEnvFunc = NULL,
|
||||||
.initFunc = NULL,
|
.initFunc = NULL,
|
||||||
|
|
|
@ -100,6 +100,17 @@ void generateInformationSchema(MockCatalogService* mcs) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Table:t1
|
||||||
|
* Field | Type | DataType | Bytes |
|
||||||
|
* ==========================================================================
|
||||||
|
* ts | column | TIMESTAMP | 8 |
|
||||||
|
* c1 | column | INT | 4 |
|
||||||
|
* c2 | column | VARCHAR | 20 |
|
||||||
|
* c3 | column | BIGINT | 8 |
|
||||||
|
* c4 | column | DOUBLE | 8 |
|
||||||
|
* c5 | column | DOUBLE | 8 |
|
||||||
|
*/
|
||||||
void generateTestT1(MockCatalogService* mcs) {
|
void generateTestT1(MockCatalogService* mcs) {
|
||||||
ITableBuilder& builder = mcs->createTableBuilder("test", "t1", TSDB_NORMAL_TABLE, 6)
|
ITableBuilder& builder = mcs->createTableBuilder("test", "t1", TSDB_NORMAL_TABLE, 6)
|
||||||
.setPrecision(TSDB_TIME_PRECISION_MILLI)
|
.setPrecision(TSDB_TIME_PRECISION_MILLI)
|
||||||
|
@ -113,6 +124,17 @@ void generateTestT1(MockCatalogService* mcs) {
|
||||||
builder.done();
|
builder.done();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Super Table: st1
|
||||||
|
* Field | Type | DataType | Bytes |
|
||||||
|
* ==========================================================================
|
||||||
|
* ts | column | TIMESTAMP | 8 |
|
||||||
|
* c1 | column | INT | 4 |
|
||||||
|
* c2 | column | VARCHAR | 20 |
|
||||||
|
* tag1 | tag | INT | 4 |
|
||||||
|
* tag2 | tag | VARCHAR | 20 |
|
||||||
|
* Child Table: st1s1, st1s2
|
||||||
|
*/
|
||||||
void generateTestST1(MockCatalogService* mcs) {
|
void generateTestST1(MockCatalogService* mcs) {
|
||||||
ITableBuilder& builder = mcs->createTableBuilder("test", "st1", TSDB_SUPER_TABLE, 3, 2)
|
ITableBuilder& builder = mcs->createTableBuilder("test", "st1", TSDB_SUPER_TABLE, 3, 2)
|
||||||
.setPrecision(TSDB_TIME_PRECISION_MILLI)
|
.setPrecision(TSDB_TIME_PRECISION_MILLI)
|
||||||
|
|
|
@ -23,4 +23,6 @@ TEST_F(PlanSuperTableTest, tbname) {
|
||||||
useDb("root", "test");
|
useDb("root", "test");
|
||||||
|
|
||||||
run("select tbname from st1");
|
run("select tbname from st1");
|
||||||
|
|
||||||
|
run("select tbname, tag1, tag2 from st1");
|
||||||
}
|
}
|
||||||
|
|
|
@ -36,11 +36,11 @@ class PlannerEnv : public testing::Environment {
|
||||||
static void parseArg(int argc, char* argv[]) {
|
static void parseArg(int argc, char* argv[]) {
|
||||||
int opt = 0;
|
int opt = 0;
|
||||||
const char* optstring = "";
|
const char* optstring = "";
|
||||||
static struct option long_options[] = {{"dump", no_argument, NULL, 'd'}, {0, 0, 0, 0}};
|
static struct option long_options[] = {{"dump", optional_argument, NULL, 'd'}, {0, 0, 0, 0}};
|
||||||
while ((opt = getopt_long(argc, argv, optstring, long_options, NULL)) != -1) {
|
while ((opt = getopt_long(argc, argv, optstring, long_options, NULL)) != -1) {
|
||||||
switch (opt) {
|
switch (opt) {
|
||||||
case 'd':
|
case 'd':
|
||||||
g_isDump = true;
|
setDumpModule(optarg);
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -34,7 +34,41 @@ using namespace testing;
|
||||||
} \
|
} \
|
||||||
} while (0);
|
} while (0);
|
||||||
|
|
||||||
bool g_isDump = false;
|
enum DumpModule {
|
||||||
|
DUMP_MODULE_NOTHING = 1,
|
||||||
|
DUMP_MODULE_PARSER,
|
||||||
|
DUMP_MODULE_LOGIC,
|
||||||
|
DUMP_MODULE_OPTIMIZED,
|
||||||
|
DUMP_MODULE_SPLIT,
|
||||||
|
DUMP_MODULE_SCALED,
|
||||||
|
DUMP_MODULE_PHYSICAL,
|
||||||
|
DUMP_MODULE_SUBPLAN,
|
||||||
|
DUMP_MODULE_ALL
|
||||||
|
};
|
||||||
|
|
||||||
|
DumpModule g_dumpModule = DUMP_MODULE_NOTHING;
|
||||||
|
|
||||||
|
void setDumpModule(const char* pModule) {
|
||||||
|
if (NULL == pModule) {
|
||||||
|
g_dumpModule = DUMP_MODULE_ALL;
|
||||||
|
} else if (0 == strncasecmp(pModule, "parser", strlen(pModule))) {
|
||||||
|
g_dumpModule = DUMP_MODULE_PARSER;
|
||||||
|
} else if (0 == strncasecmp(pModule, "logic", strlen(pModule))) {
|
||||||
|
g_dumpModule = DUMP_MODULE_LOGIC;
|
||||||
|
} else if (0 == strncasecmp(pModule, "optimized", strlen(pModule))) {
|
||||||
|
g_dumpModule = DUMP_MODULE_OPTIMIZED;
|
||||||
|
} else if (0 == strncasecmp(pModule, "split", strlen(pModule))) {
|
||||||
|
g_dumpModule = DUMP_MODULE_SPLIT;
|
||||||
|
} else if (0 == strncasecmp(pModule, "scaled", strlen(pModule))) {
|
||||||
|
g_dumpModule = DUMP_MODULE_SCALED;
|
||||||
|
} else if (0 == strncasecmp(pModule, "physical", strlen(pModule))) {
|
||||||
|
g_dumpModule = DUMP_MODULE_PHYSICAL;
|
||||||
|
} else if (0 == strncasecmp(pModule, "subplan", strlen(pModule))) {
|
||||||
|
g_dumpModule = DUMP_MODULE_SUBPLAN;
|
||||||
|
} else if (0 == strncasecmp(pModule, "all", strlen(pModule))) {
|
||||||
|
g_dumpModule = DUMP_MODULE_PHYSICAL;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
class PlannerTestBaseImpl {
|
class PlannerTestBaseImpl {
|
||||||
public:
|
public:
|
||||||
|
@ -66,11 +100,9 @@ class PlannerTestBaseImpl {
|
||||||
SQueryPlan* pPlan = nullptr;
|
SQueryPlan* pPlan = nullptr;
|
||||||
doCreatePhysiPlan(&cxt, pLogicPlan, &pPlan);
|
doCreatePhysiPlan(&cxt, pLogicPlan, &pPlan);
|
||||||
|
|
||||||
if (g_isDump) {
|
dump(g_dumpModule);
|
||||||
dump();
|
|
||||||
}
|
|
||||||
} catch (...) {
|
} catch (...) {
|
||||||
dump();
|
dump(DUMP_MODULE_ALL);
|
||||||
throw;
|
throw;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -109,23 +141,48 @@ class PlannerTestBaseImpl {
|
||||||
res_.physiSubplans_.clear();
|
res_.physiSubplans_.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
void dump() {
|
void dump(DumpModule module) {
|
||||||
|
if (DUMP_MODULE_NOTHING == module) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
cout << "==========================================sql : [" << stmtEnv_.sql_ << "]" << endl;
|
cout << "==========================================sql : [" << stmtEnv_.sql_ << "]" << endl;
|
||||||
cout << "syntax tree : " << endl;
|
|
||||||
cout << res_.ast_ << endl;
|
if (DUMP_MODULE_ALL == module || DUMP_MODULE_PARSER == module) {
|
||||||
cout << "raw logic plan : " << endl;
|
cout << "syntax tree : " << endl;
|
||||||
cout << res_.rawLogicPlan_ << endl;
|
cout << res_.ast_ << endl;
|
||||||
cout << "optimized logic plan : " << endl;
|
}
|
||||||
cout << res_.optimizedLogicPlan_ << endl;
|
|
||||||
cout << "split logic plan : " << endl;
|
if (DUMP_MODULE_ALL == module || DUMP_MODULE_LOGIC == module) {
|
||||||
cout << res_.splitLogicPlan_ << endl;
|
cout << "raw logic plan : " << endl;
|
||||||
cout << "scaled logic plan : " << endl;
|
cout << res_.rawLogicPlan_ << endl;
|
||||||
cout << res_.scaledLogicPlan_ << endl;
|
}
|
||||||
cout << "physical plan : " << endl;
|
|
||||||
cout << res_.physiPlan_ << endl;
|
if (DUMP_MODULE_ALL == module || DUMP_MODULE_OPTIMIZED == module) {
|
||||||
cout << "physical subplan : " << endl;
|
cout << "optimized logic plan : " << endl;
|
||||||
for (const auto& subplan : res_.physiSubplans_) {
|
cout << res_.optimizedLogicPlan_ << endl;
|
||||||
cout << subplan << endl;
|
}
|
||||||
|
|
||||||
|
if (DUMP_MODULE_ALL == module || DUMP_MODULE_SPLIT == module) {
|
||||||
|
cout << "split logic plan : " << endl;
|
||||||
|
cout << res_.splitLogicPlan_ << endl;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (DUMP_MODULE_ALL == module || DUMP_MODULE_SCALED == module) {
|
||||||
|
cout << "scaled logic plan : " << endl;
|
||||||
|
cout << res_.scaledLogicPlan_ << endl;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (DUMP_MODULE_ALL == module || DUMP_MODULE_PHYSICAL == module) {
|
||||||
|
cout << "physical plan : " << endl;
|
||||||
|
cout << res_.physiPlan_ << endl;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (DUMP_MODULE_ALL == module || DUMP_MODULE_SUBPLAN == module) {
|
||||||
|
cout << "physical subplan : " << endl;
|
||||||
|
for (const auto& subplan : res_.physiSubplans_) {
|
||||||
|
cout << subplan << endl;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -32,6 +32,6 @@ class PlannerTestBase : public testing::Test {
|
||||||
std::unique_ptr<PlannerTestBaseImpl> impl_;
|
std::unique_ptr<PlannerTestBaseImpl> impl_;
|
||||||
};
|
};
|
||||||
|
|
||||||
extern bool g_isDump;
|
extern void setDumpModule(const char* pModule);
|
||||||
|
|
||||||
#endif // PLAN_TEST_UTIL_H
|
#endif // PLAN_TEST_UTIL_H
|
||||||
|
|
|
@ -297,6 +297,22 @@ static FORCE_INLINE void varToNchar(char* buf, SScalarParam* pOut, int32_t rowIn
|
||||||
taosMemoryFree(t);
|
taosMemoryFree(t);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static FORCE_INLINE void ncharToVar(char* buf, SScalarParam* pOut, int32_t rowIndex) {
|
||||||
|
int32_t inputLen = varDataLen(buf);
|
||||||
|
|
||||||
|
char* t = taosMemoryCalloc(1, inputLen + VARSTR_HEADER_SIZE);
|
||||||
|
int32_t len = taosUcs4ToMbs((TdUcs4 *)varDataVal(buf), varDataLen(buf), varDataVal(t));
|
||||||
|
if (len < 0) {
|
||||||
|
taosMemoryFree(t);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
varDataSetLen(t, len);
|
||||||
|
|
||||||
|
colDataAppend(pOut->columnData, rowIndex, t, false);
|
||||||
|
taosMemoryFree(t);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
//TODO opt performance, tmp is not needed.
|
//TODO opt performance, tmp is not needed.
|
||||||
int32_t vectorConvertFromVarData(const SScalarParam* pIn, SScalarParam* pOut, int32_t inType, int32_t outType) {
|
int32_t vectorConvertFromVarData(const SScalarParam* pIn, SScalarParam* pOut, int32_t inType, int32_t outType) {
|
||||||
int32_t bufSize = pIn->columnData->info.bytes;
|
int32_t bufSize = pIn->columnData->info.bytes;
|
||||||
|
@ -313,6 +329,10 @@ int32_t vectorConvertFromVarData(const SScalarParam* pIn, SScalarParam* pOut, in
|
||||||
func = varToUnsigned;
|
func = varToUnsigned;
|
||||||
} else if (IS_FLOAT_TYPE(outType)) {
|
} else if (IS_FLOAT_TYPE(outType)) {
|
||||||
func = varToFloat;
|
func = varToFloat;
|
||||||
|
} else if (outType == TSDB_DATA_TYPE_BINARY) { // nchar -> binary
|
||||||
|
ASSERT(inType == TSDB_DATA_TYPE_NCHAR);
|
||||||
|
func = ncharToVar;
|
||||||
|
vton = true;
|
||||||
} else if (outType == TSDB_DATA_TYPE_NCHAR) { // binary -> nchar
|
} else if (outType == TSDB_DATA_TYPE_NCHAR) { // binary -> nchar
|
||||||
ASSERT(inType == TSDB_DATA_TYPE_VARCHAR);
|
ASSERT(inType == TSDB_DATA_TYPE_VARCHAR);
|
||||||
func = varToNchar;
|
func = varToNchar;
|
||||||
|
@ -608,7 +628,7 @@ int8_t gConvertTypes[TSDB_DATA_TYPE_BLOB+1][TSDB_DATA_TYPE_BLOB+1] = {
|
||||||
/*BIGI*/ 0, 0, 0, 0, 0, 0, 6, 7, 7, 0, 7, 5, 5, 5, 7, 0, 7, 0, 0,
|
/*BIGI*/ 0, 0, 0, 0, 0, 0, 6, 7, 7, 0, 7, 5, 5, 5, 7, 0, 7, 0, 0,
|
||||||
/*FLOA*/ 0, 0, 0, 0, 0, 0, 0, 7, 7, 6, 7, 6, 6, 6, 6, 0, 7, 0, 0,
|
/*FLOA*/ 0, 0, 0, 0, 0, 0, 0, 7, 7, 6, 7, 6, 6, 6, 6, 0, 7, 0, 0,
|
||||||
/*DOUB*/ 0, 0, 0, 0, 0, 0, 0, 0, 7, 7, 7, 7, 7, 7, 7, 0, 7, 0, 0,
|
/*DOUB*/ 0, 0, 0, 0, 0, 0, 0, 0, 7, 7, 7, 7, 7, 7, 7, 0, 7, 0, 0,
|
||||||
/*VARC*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 9, 0, 7, 7, 7, 7, 0, 0, 0, 0,
|
/*VARC*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 9, 8, 7, 7, 7, 7, 0, 0, 0, 0,
|
||||||
/*TIME*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 9, 9, 9, 9, 7, 0, 7, 0, 0,
|
/*TIME*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 9, 9, 9, 9, 7, 0, 7, 0, 0,
|
||||||
/*NCHA*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 7, 7, 7, 7, 0, 0, 0, 0,
|
/*NCHA*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 7, 7, 7, 7, 0, 0, 0, 0,
|
||||||
/*UTIN*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 12, 13, 14, 0, 7, 0, 0,
|
/*UTIN*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 12, 13, 14, 0, 7, 0, 0,
|
||||||
|
|
|
@ -152,8 +152,10 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in
|
||||||
|
|
||||||
// sink
|
// sink
|
||||||
if (pTask->sinkType == TASK_SINK__TABLE) {
|
if (pTask->sinkType == TASK_SINK__TABLE) {
|
||||||
//
|
/*blockDebugShowData(pRes);*/
|
||||||
blockDebugShowData(pRes);
|
ASSERT(pTask->tbSink.pTSchema);
|
||||||
|
SSubmitReq* pReq = tdBlockToSubmit(pRes, pTask->tbSink.pTSchema);
|
||||||
|
tPrintFixedSchemaSubmitReq(pReq, pTask->tbSink.pTSchema);
|
||||||
} else if (pTask->sinkType == TASK_SINK__SMA) {
|
} else if (pTask->sinkType == TASK_SINK__SMA) {
|
||||||
pTask->smaSink.smaHandle(pTask->ahandle, pTask->smaSink.smaId, pRes);
|
pTask->smaSink.smaHandle(pTask->ahandle, pTask->smaSink.smaId, pRes);
|
||||||
//
|
//
|
||||||
|
@ -274,7 +276,8 @@ int32_t tEncodeSStreamTask(SCoder* pEncoder, const SStreamTask* pTask) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pTask->sinkType == TASK_SINK__TABLE) {
|
if (pTask->sinkType == TASK_SINK__TABLE) {
|
||||||
if (tEncodeI8(pEncoder, pTask->tbSink.reserved) < 0) return -1;
|
/*if (tEncodeI8(pEncoder, pTask->tbSink.reserved) < 0) return -1;*/
|
||||||
|
if (tEncodeSSchemaWrapper(pEncoder, pTask->tbSink.pSchemaWrapper) < 0) return -1;
|
||||||
} else if (pTask->sinkType == TASK_SINK__SMA) {
|
} else if (pTask->sinkType == TASK_SINK__SMA) {
|
||||||
if (tEncodeI64(pEncoder, pTask->smaSink.smaId) < 0) return -1;
|
if (tEncodeI64(pEncoder, pTask->smaSink.smaId) < 0) return -1;
|
||||||
} else if (pTask->sinkType == TASK_SINK__FETCH) {
|
} else if (pTask->sinkType == TASK_SINK__FETCH) {
|
||||||
|
@ -318,7 +321,10 @@ int32_t tDecodeSStreamTask(SCoder* pDecoder, SStreamTask* pTask) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pTask->sinkType == TASK_SINK__TABLE) {
|
if (pTask->sinkType == TASK_SINK__TABLE) {
|
||||||
if (tDecodeI8(pDecoder, &pTask->tbSink.reserved) < 0) return -1;
|
/*if (tDecodeI8(pDecoder, &pTask->tbSink.reserved) < 0) return -1;*/
|
||||||
|
pTask->tbSink.pSchemaWrapper = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
|
||||||
|
if (pTask->tbSink.pSchemaWrapper == NULL) return -1;
|
||||||
|
if (tDecodeSSchemaWrapper(pDecoder, pTask->tbSink.pSchemaWrapper) < 0) return -1;
|
||||||
} else if (pTask->sinkType == TASK_SINK__SMA) {
|
} else if (pTask->sinkType == TASK_SINK__SMA) {
|
||||||
if (tDecodeI64(pDecoder, &pTask->smaSink.smaId) < 0) return -1;
|
if (tDecodeI64(pDecoder, &pTask->smaSink.smaId) < 0) return -1;
|
||||||
} else if (pTask->sinkType == TASK_SINK__FETCH) {
|
} else if (pTask->sinkType == TASK_SINK__FETCH) {
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue