Merge branch '3.0' of https://github.com/taosdata/TDengine into feat/row_refact

This commit is contained in:
Hongze Cheng 2022-06-07 08:10:35 +00:00
commit fa8c6f26d5
28 changed files with 546 additions and 373 deletions

View File

@ -199,10 +199,9 @@ curl -u root:taosdata http://<FQDN>:<PORT>/rest/sql -d "select server_version()"
`connect()` 函数的所有参数都是可选的关键字参数。下面是连接参数的具体说明: `connect()` 函数的所有参数都是可选的关键字参数。下面是连接参数的具体说明:
- `host` 要连接的主机。默认是 localhost - `url` taosAdapter REST 服务的 URL。默认是 <http://localhost:6041>
- `user` TDenigne 用户名。默认是 root。 - `user` TDenigne 用户名。默认是 root。
- `password` TDeingine 用户密码。默认是 taosdata。 - `password` TDeingine 用户密码。默认是 taosdata。
- `port`: taosAdapter REST 服务监听端口。默认是 6041.
- `timeout`: HTTP 请求超时时间。单位为秒。默认为 `socket._GLOBAL_DEFAULT_TIMEOUT`。 一般无需配置。 - `timeout`: HTTP 请求超时时间。单位为秒。默认为 `socket._GLOBAL_DEFAULT_TIMEOUT`。 一般无需配置。
</TabItem> </TabItem>

View File

@ -74,7 +74,7 @@ title: 常见问题及反馈
检查服务器侧 TCP 端口连接是否工作:`nc -l {port}` 检查服务器侧 TCP 端口连接是否工作:`nc -l {port}`
检查客户端侧 TCP 端口连接是否工作:`nc {hostIP} {port}` 检查客户端侧 TCP 端口连接是否工作:`nc {hostIP} {port}`
- Windows 系统请使用 PowerShell 命令 Net-TestConnection -ComputerName {fqdn} -Port {port} 检测服务段端口是否访问 - Windows 系统请使用 PowerShell 命令 Test-NetConnection -ComputerName {fqdn} -Port {port} 检测服务段端口是否访问
10. 也可以使用 taos 程序内嵌的网络连通检测功能,来验证服务器和客户端之间指定的端口连接是否通畅(包括 TCP 和 UDP[TDengine 内嵌网络检测工具使用指南](https://www.taosdata.com/blog/2020/09/08/1816.html)。 10. 也可以使用 taos 程序内嵌的网络连通检测功能,来验证服务器和客户端之间指定的端口连接是否通畅(包括 TCP 和 UDP[TDengine 内嵌网络检测工具使用指南](https://www.taosdata.com/blog/2020/09/08/1816.html)。

View File

@ -199,10 +199,10 @@ The `connect()` function returns a `taos.TaosConnection` instance. In client-sid
All arguments to the `connect()` function are optional keyword arguments. The following are the connection parameters specified. All arguments to the `connect()` function are optional keyword arguments. The following are the connection parameters specified.
- - `url` The URL of taosAdapter REST service. The default is <http://localhost:6041>.
- `host`: The host to connect to. The default is localhost. - `host`: The host to connect to. The default is localhost.
- `user`: TDengine user name. The default is `root`. - `user`: TDengine user name. The default is `root`.
- `password`: TDengine user password. The default is `taosdata`. - `password`: TDengine user password. The default is `taosdata`.
- `port`: The port on which the taosAdapter REST service listens. Default is 6041.
- `timeout`: HTTP request timeout in seconds. The default is `socket._GLOBAL_DEFAULT_TIMEOUT`. Usually, no configuration is needed. - `timeout`: HTTP request timeout in seconds. The default is `socket._GLOBAL_DEFAULT_TIMEOUT`. Usually, no configuration is needed.
</TabItem> </TabItem>

View File

@ -1,10 +1,9 @@
# ANCHOR: connect # ANCHOR: connect
from taosrest import connect, TaosRestConnection, TaosRestCursor from taosrest import connect, TaosRestConnection, TaosRestCursor
conn: TaosRestConnection = connect(host="localhost", conn: TaosRestConnection = connect(url="http://localhost:6041",
user="root", user="root",
password="taosdata", password="taosdata",
port=6041,
timeout=30) timeout=30)
# ANCHOR_END: connect # ANCHOR_END: connect

View File

@ -1,6 +1,6 @@
from taosrest import RestClient from taosrest import RestClient
client = RestClient("localhost", 6041, "root", "taosdata") client = RestClient("http://localhost:6041", user="root", password="taosdata")
res: dict = client.sql("SELECT ts, current FROM power.meters LIMIT 1") res: dict = client.sql("SELECT ts, current FROM power.meters LIMIT 1")
print(res) print(res)

View File

@ -71,7 +71,7 @@ SEpSet getEpSet_s(SCorEpSet* pEpSet);
#define colDataGetData(p1_, r_) \ #define colDataGetData(p1_, r_) \
((IS_VAR_DATA_TYPE((p1_)->info.type)) ? colDataGetVarData(p1_, r_) : colDataGetNumData(p1_, r_)) ((IS_VAR_DATA_TYPE((p1_)->info.type)) ? colDataGetVarData(p1_, r_) : colDataGetNumData(p1_, r_))
#define IS_JSON_NULL(type,data) ((type) == TSDB_DATA_TYPE_JSON && *(data) == TSDB_DATA_TYPE_NULL) #define IS_JSON_NULL(type, data) ((type) == TSDB_DATA_TYPE_JSON && *(data) == TSDB_DATA_TYPE_NULL)
static FORCE_INLINE bool colDataIsNull_s(const SColumnInfoData* pColumnInfoData, uint32_t row) { static FORCE_INLINE bool colDataIsNull_s(const SColumnInfoData* pColumnInfoData, uint32_t row) {
if (!pColumnInfoData->hasNull) { if (!pColumnInfoData->hasNull) {
@ -180,7 +180,7 @@ static FORCE_INLINE void colDataAppendDouble(SColumnInfoData* pColumnInfoData, u
*(double*)p = *(double*)v; *(double*)p = *(double*)v;
} }
int32_t getJsonValueLen(const char *data); int32_t getJsonValueLen(const char* data);
int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, bool isNull); int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, bool isNull);
int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, int32_t* capacity, int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, int32_t* capacity,
@ -223,7 +223,8 @@ int32_t blockDataTrimFirstNRows(SSDataBlock* pBlock, size_t n);
SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData); SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData);
void blockCompressEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_t numOfCols, int8_t needCompress); void blockCompressEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_t numOfCols,
int8_t needCompress);
const char* blockCompressDecode(SSDataBlock* pBlock, int32_t numOfCols, int32_t numOfRows, const char* pData); const char* blockCompressDecode(SSDataBlock* pBlock, int32_t numOfCols, int32_t numOfRows, const char* pData);
void blockDebugShowData(const SArray* dataBlocks, const char* flag); void blockDebugShowData(const SArray* dataBlocks, const char* flag);
@ -231,6 +232,8 @@ void blockDebugShowData(const SArray* dataBlocks, const char* flag);
int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema* pTSchema, int32_t vgId, int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema* pTSchema, int32_t vgId,
tb_uid_t suid); tb_uid_t suid);
char* buildCtbNameByGroupId(const char* stbName, uint64_t groupId);
SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pSchema, bool createTb, int64_t suid, SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pSchema, bool createTb, int64_t suid,
const char* stbFullName, int32_t vgId); const char* stbFullName, int32_t vgId);

View File

@ -145,26 +145,6 @@ SStreamDataSubmit* streamSubmitRefClone(SStreamDataSubmit* pSubmit);
#if 0 #if 0
int32_t streamDataBlockEncode(void** buf, const SStreamDataBlock* pOutput); int32_t streamDataBlockEncode(void** buf, const SStreamDataBlock* pOutput);
void* streamDataBlockDecode(const void* buf, SStreamDataBlock* pInput); void* streamDataBlockDecode(const void* buf, SStreamDataBlock* pInput);
static FORCE_INLINE int32_t streamEnqueue1(SStreamQueue* queue, SStreamQueueItem* pItem) {
int8_t inputStatus = atomic_load_8(&queue->enqueueStatus);
if (inputStatus == TASK_INPUT_STATUS__NORMAL) {
if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
SStreamDataSubmit* pSubmitClone = streamSubmitRefClone((SStreamDataSubmit*)pItem);
if (pSubmitClone == NULL) {
atomic_store_8(&queue->enqueueStatus, TASK_INPUT_STATUS__FAILED);
return -1;
}
taosWriteQitem(queue->queue, pSubmitClone);
} else if (pItem->type == STREAM_INPUT__DATA_BLOCK) {
taosWriteQitem(queue->queue, pItem);
} else if (pItem->type == STREAM_INPUT__CHECKPOINT) {
taosWriteQitem(queue->queue, pItem);
}
return 0;
}
return 0;
}
#endif #endif
typedef struct { typedef struct {

View File

@ -185,7 +185,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MND_BNODE_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0357) #define TSDB_CODE_MND_BNODE_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0357)
#define TSDB_CODE_MND_TOO_FEW_MNODES TAOS_DEF_ERROR_CODE(0, 0x0358) #define TSDB_CODE_MND_TOO_FEW_MNODES TAOS_DEF_ERROR_CODE(0, 0x0358)
#define TSDB_CODE_MND_TOO_MANY_MNODES TAOS_DEF_ERROR_CODE(0, 0x0359) #define TSDB_CODE_MND_TOO_MANY_MNODES TAOS_DEF_ERROR_CODE(0, 0x0359)
#define TSDB_CODE_MND_CANT_DROP_MASTER TAOS_DEF_ERROR_CODE(0, 0x035A) #define TSDB_CODE_MND_CANT_DROP_LEADER TAOS_DEF_ERROR_CODE(0, 0x035A)
// mnode-acct // mnode-acct
#define TSDB_CODE_MND_ACCT_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0360) #define TSDB_CODE_MND_ACCT_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0360)

View File

@ -195,7 +195,7 @@ typedef struct {
tmq_conf_t* tmq_conf_new() { tmq_conf_t* tmq_conf_new() {
tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t)); tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t));
conf->withTbName = -1; conf->withTbName = false;
conf->autoCommit = true; conf->autoCommit = true;
conf->autoCommitInterval = 5000; conf->autoCommitInterval = 5000;
conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST; conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST;
@ -256,13 +256,10 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
if (strcmp(key, "msg.with.table.name") == 0) { if (strcmp(key, "msg.with.table.name") == 0) {
if (strcmp(value, "true") == 0) { if (strcmp(value, "true") == 0) {
conf->withTbName = 1; conf->withTbName = true;
return TMQ_CONF_OK; return TMQ_CONF_OK;
} else if (strcmp(value, "false") == 0) { } else if (strcmp(value, "false") == 0) {
conf->withTbName = 0; conf->withTbName = false;
return TMQ_CONF_OK;
} else if (strcmp(value, "none") == 0) {
conf->withTbName = -1;
return TMQ_CONF_OK; return TMQ_CONF_OK;
} else { } else {
return TMQ_CONF_INVALID; return TMQ_CONF_INVALID;

View File

@ -18,6 +18,7 @@
#include "tcompare.h" #include "tcompare.h"
#include "tglobal.h" #include "tglobal.h"
#include "tlog.h" #include "tlog.h"
#include "tname.h"
int32_t taosGetFqdnPortFromEp(const char* ep, SEp* pEp) { int32_t taosGetFqdnPortFromEp(const char* ep, SEp* pEp) {
pEp->port = 0; pEp->port = 0;
@ -99,7 +100,7 @@ void colDataTrim(SColumnInfoData* pColumnInfoData) {
// TODO // TODO
} }
int32_t getJsonValueLen(const char *data) { int32_t getJsonValueLen(const char* data) {
int32_t dataLen = 0; int32_t dataLen = 0;
if (*data == TSDB_DATA_TYPE_NULL) { if (*data == TSDB_DATA_TYPE_NULL) {
dataLen = CHAR_BYTES; dataLen = CHAR_BYTES;
@ -109,7 +110,7 @@ int32_t getJsonValueLen(const char *data) {
dataLen = DOUBLE_BYTES + CHAR_BYTES; dataLen = DOUBLE_BYTES + CHAR_BYTES;
} else if (*data == TSDB_DATA_TYPE_BOOL) { } else if (*data == TSDB_DATA_TYPE_BOOL) {
dataLen = CHAR_BYTES + CHAR_BYTES; dataLen = CHAR_BYTES + CHAR_BYTES;
} else if (*data & TD_TAG_JSON) { // json string } else if (*data & TD_TAG_JSON) { // json string
dataLen = ((STag*)(data))->len; dataLen = ((STag*)(data))->len;
} else { } else {
ASSERT(0); ASSERT(0);
@ -137,7 +138,7 @@ int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, con
int32_t dataLen = 0; int32_t dataLen = 0;
if (type == TSDB_DATA_TYPE_JSON) { if (type == TSDB_DATA_TYPE_JSON) {
dataLen = getJsonValueLen(pData); dataLen = getJsonValueLen(pData);
}else { } else {
dataLen = varDataTLen(pData); dataLen = varDataTLen(pData);
} }
@ -1283,7 +1284,7 @@ static void doShiftBitmap(char* nullBitmap, size_t n, size_t total) {
if (n % 8 == 0) { if (n % 8 == 0) {
memmove(nullBitmap, nullBitmap + n / 8, newLen); memmove(nullBitmap, nullBitmap + n / 8, newLen);
} else { } else {
int32_t tail = n % 8; int32_t tail = n % 8;
int32_t i = 0; int32_t i = 0;
uint8_t* p = (uint8_t*)nullBitmap; uint8_t* p = (uint8_t*)nullBitmap;
@ -1301,7 +1302,7 @@ static void doShiftBitmap(char* nullBitmap, size_t n, size_t total) {
} }
} else if (n > 8) { } else if (n > 8) {
int32_t gap = len - newLen; int32_t gap = len - newLen;
while(i < newLen) { while (i < newLen) {
uint8_t v = p[i + gap]; uint8_t v = p[i + gap];
p[i] = (v << tail); p[i] = (v << tail);
@ -1316,7 +1317,6 @@ static void doShiftBitmap(char* nullBitmap, size_t n, size_t total) {
} }
} }
static void colDataTrimFirstNRows(SColumnInfoData* pColInfoData, size_t n, size_t total) { static void colDataTrimFirstNRows(SColumnInfoData* pColInfoData, size_t n, size_t total) {
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) { if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
memmove(pColInfoData->varmeta.offset, &pColInfoData->varmeta.offset[n], (total - n) * sizeof(int32_t)); memmove(pColInfoData->varmeta.offset, &pColInfoData->varmeta.offset[n], (total - n) * sizeof(int32_t));
@ -1544,7 +1544,8 @@ void blockDebugShowData(const SArray* dataBlocks, const char* flag) {
* *
* TODO: colId should be set * TODO: colId should be set
*/ */
int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema* pTSchema, int32_t vgId, tb_uid_t suid) { int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema* pTSchema, int32_t vgId,
tb_uid_t suid) {
int32_t sz = taosArrayGetSize(pDataBlocks); int32_t sz = taosArrayGetSize(pDataBlocks);
int32_t bufSize = sizeof(SSubmitReq); int32_t bufSize = sizeof(SSubmitReq);
for (int32_t i = 0; i < sz; ++i) { for (int32_t i = 0; i < sz; ++i) {
@ -1585,12 +1586,12 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks
int32_t dataLen = 0; int32_t dataLen = 0;
for (int32_t j = 0; j < rows; ++j) { // iterate by row for (int32_t j = 0; j < rows; ++j) { // iterate by row
tdSRowResetBuf(&rb, POINTER_SHIFT(pDataBuf, msgLen)); // set row buf tdSRowResetBuf(&rb, POINTER_SHIFT(pDataBuf, msgLen)); // set row buf
bool isStartKey = false; bool isStartKey = false;
int32_t offset = 0; int32_t offset = 0;
for (int32_t k = 0; k < colNum; ++k) { // iterate by column for (int32_t k = 0; k < colNum; ++k) { // iterate by column
SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k); SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k);
STColumn* pCol = &pTSchema->columns[k]; STColumn* pCol = &pTSchema->columns[k];
void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes); void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
switch (pColInfoData->info.type) { switch (pColInfoData->info.type) {
case TSDB_DATA_TYPE_TIMESTAMP: case TSDB_DATA_TYPE_TIMESTAMP:
if (!isStartKey) { if (!isStartKey) {
@ -1599,15 +1600,18 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks
offset, k); offset, k);
} else { } else {
tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, TSDB_DATA_TYPE_TIMESTAMP, TD_VTYPE_NORM, var, true, offset, k); tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, TSDB_DATA_TYPE_TIMESTAMP, TD_VTYPE_NORM, var,
true, offset, k);
} }
break; break;
case TSDB_DATA_TYPE_NCHAR: { case TSDB_DATA_TYPE_NCHAR: {
tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, TSDB_DATA_TYPE_NCHAR, TD_VTYPE_NORM, var, true, offset, k); tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, TSDB_DATA_TYPE_NCHAR, TD_VTYPE_NORM, var, true,
offset, k);
break; break;
} }
case TSDB_DATA_TYPE_VARCHAR: { // TSDB_DATA_TYPE_BINARY case TSDB_DATA_TYPE_VARCHAR: { // TSDB_DATA_TYPE_BINARY
tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, TSDB_DATA_TYPE_VARCHAR, TD_VTYPE_NORM, var, true, offset, k); tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, TSDB_DATA_TYPE_VARCHAR, TD_VTYPE_NORM, var, true,
offset, k);
break; break;
} }
case TSDB_DATA_TYPE_VARBINARY: case TSDB_DATA_TYPE_VARBINARY:
@ -1645,7 +1649,7 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks
} }
break; break;
} }
offset += TYPE_BYTES[pCol->type]; // sum/avg would convert to int64_t/uint64_t/double during aggregation offset += TYPE_BYTES[pCol->type]; // sum/avg would convert to int64_t/uint64_t/double during aggregation
} }
dataLen += TD_ROW_LEN(rb.pBuf); dataLen += TD_ROW_LEN(rb.pBuf);
#ifdef TD_DEBUG_PRINT_ROW #ifdef TD_DEBUG_PRINT_ROW
@ -1681,11 +1685,38 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
char* buildCtbNameByGroupId(const char* stbName, uint64_t groupId) {
SArray* tags = taosArrayInit(0, sizeof(void*));
SSmlKv* pTag = taosMemoryCalloc(1, sizeof(SSmlKv));
pTag->key = "group_id";
pTag->keyLen = strlen(pTag->key);
pTag->type = TSDB_DATA_TYPE_UBIGINT;
pTag->u = groupId;
taosArrayPush(tags, &pTag);
void* cname = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN + 1);
RandTableName rname = {
.tags = tags,
.sTableName = stbName,
.sTableNameLen = strlen(stbName),
.childTableName = cname,
};
buildChildTableName(&rname);
taosMemoryFree(pTag);
taosArrayDestroy(tags);
ASSERT(rname.childTableName && rname.childTableName[0]);
return rname.childTableName;
}
SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, bool createTb, int64_t suid, SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, bool createTb, int64_t suid,
const char* stbFullName, int32_t vgId) { const char* stbFullName, int32_t vgId) {
SSubmitReq* ret = NULL; SSubmitReq* ret = NULL;
SArray* tagArray = taosArrayInit(1, sizeof(STagVal)); SArray* tagArray = taosArrayInit(1, sizeof(STagVal));
if(!tagArray) { if (!tagArray) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
} }
@ -1703,15 +1734,12 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
if (createTb) { if (createTb) {
SVCreateTbReq createTbReq = {0}; SVCreateTbReq createTbReq = {0};
char* cname = taosMemoryCalloc(1, TSDB_TABLE_FNAME_LEN); char* cname = buildCtbNameByGroupId(stbFullName, pDataBlock->info.groupId);
snprintf(cname, TSDB_TABLE_FNAME_LEN, "%s:%ld", stbFullName, pDataBlock->info.groupId);
createTbReq.name = cname; createTbReq.name = cname;
createTbReq.flags = 0; createTbReq.flags = 0;
createTbReq.type = TSDB_CHILD_TABLE; createTbReq.type = TSDB_CHILD_TABLE;
createTbReq.ctb.suid = suid; createTbReq.ctb.suid = suid;
STagVal tagVal = {.cid = 1, STagVal tagVal = {.cid = 1,
.type = TSDB_DATA_TYPE_UBIGINT, .type = TSDB_DATA_TYPE_UBIGINT,
.pData = (uint8_t*)&pDataBlock->info.groupId, .pData = (uint8_t*)&pDataBlock->info.groupId,

View File

@ -131,7 +131,8 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
_OVER: _OVER:
if (code != 0) { if (code != 0) {
dTrace("msg:%p, failed to process since %s, type:%s", pMsg, terrstr(), TMSG_INFO(pRpc->msgType)); dTrace("failed to process msg:%p since %s, handle:%p", pMsg, terrstr(), pRpc->info.handle);
if (terrno != 0) code = terrno; if (terrno != 0) code = terrno;
if (IsReq(pRpc)) { if (IsReq(pRpc)) {
@ -149,8 +150,10 @@ _OVER:
} }
} }
dTrace("msg:%p, is freed", pMsg); if (pMsg != NULL) {
taosFreeQitem(pMsg); dTrace("msg:%p, is freed", pMsg);
taosFreeQitem(pMsg);
}
rpcFreeCont(pRpc->pCont); rpcFreeCont(pRpc->pCont);
} }

View File

@ -318,9 +318,9 @@ void dmGetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet) {
void dmGetMnodeEpSetForRedirect(SDnodeData *pData, SRpcMsg *pMsg, SEpSet *pEpSet) { void dmGetMnodeEpSetForRedirect(SDnodeData *pData, SRpcMsg *pMsg, SEpSet *pEpSet) {
dmGetMnodeEpSet(pData, pEpSet); dmGetMnodeEpSet(pData, pEpSet);
dDebug("msg:%p, is redirected, num:%d use:%d", pMsg, pEpSet->numOfEps, pEpSet->inUse); dTrace("msg is redirected, handle:%p num:%d use:%d", pMsg->info.handle, pEpSet->numOfEps, pEpSet->inUse);
for (int32_t i = 0; i < pEpSet->numOfEps; ++i) { for (int32_t i = 0; i < pEpSet->numOfEps; ++i) {
dDebug("mnode index:%d %s:%u", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port); dTrace("mnode index:%d %s:%u", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port);
if (strcmp(pEpSet->eps[i].fqdn, tsLocalFqdn) == 0 && pEpSet->eps[i].port == tsServerPort) { if (strcmp(pEpSet->eps[i].fqdn, tsLocalFqdn) == 0 && pEpSet->eps[i].port == tsServerPort) {
pEpSet->inUse = (i + 1) % pEpSet->numOfEps; pEpSet->inUse = (i + 1) % pEpSet->numOfEps;
} }

View File

@ -36,19 +36,19 @@ typedef enum {
} ETrnAct; } ETrnAct;
typedef struct { typedef struct {
int32_t id; int32_t id;
int32_t errCode; int32_t errCode;
int32_t acceptableCode; int32_t acceptableCode;
int8_t stage; ETrnStage stage;
ETrnAct actionType; ETrnAct actionType;
int8_t rawWritten; int8_t rawWritten;
int8_t msgSent; int8_t msgSent;
int8_t msgReceived; int8_t msgReceived;
tmsg_t msgType; tmsg_t msgType;
SEpSet epSet; SEpSet epSet;
int32_t contLen; int32_t contLen;
void *pCont; void *pCont;
SSdbRaw *pRaw; SSdbRaw *pRaw;
} STransAction; } STransAction;
typedef void (*TransCbFp)(SMnode *pMnode, void *param, int32_t paramLen); typedef void (*TransCbFp)(SMnode *pMnode, void *param, int32_t paramLen);

View File

@ -87,6 +87,10 @@ int32_t mndInitDnode(SMnode *pMnode) {
void mndCleanupDnode(SMnode *pMnode) {} void mndCleanupDnode(SMnode *pMnode) {}
static int32_t mndCreateDefaultDnode(SMnode *pMnode) { static int32_t mndCreateDefaultDnode(SMnode *pMnode) {
int32_t code = -1;
SSdbRaw *pRaw = NULL;
STrans *pTrans = NULL;
SDnodeObj dnodeObj = {0}; SDnodeObj dnodeObj = {0};
dnodeObj.id = 1; dnodeObj.id = 1;
dnodeObj.createdTime = taosGetTimestampMs(); dnodeObj.createdTime = taosGetTimestampMs();
@ -95,54 +99,42 @@ static int32_t mndCreateDefaultDnode(SMnode *pMnode) {
memcpy(&dnodeObj.fqdn, pMnode->replicas[0].fqdn, TSDB_FQDN_LEN); memcpy(&dnodeObj.fqdn, pMnode->replicas[0].fqdn, TSDB_FQDN_LEN);
snprintf(dnodeObj.ep, TSDB_EP_LEN, "%s:%u", dnodeObj.fqdn, dnodeObj.port); snprintf(dnodeObj.ep, TSDB_EP_LEN, "%s:%u", dnodeObj.fqdn, dnodeObj.port);
SSdbRaw *pRaw = mndDnodeActionEncode(&dnodeObj); pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, NULL);
if (pRaw == NULL) return -1; if (pTrans == NULL) goto _OVER;
if (sdbSetRawStatus(pRaw, SDB_STATUS_READY) != 0) return -1; mDebug("trans:%d, used to create dnode:%s on first deploy", pTrans->id, dnodeObj.ep);
mDebug("dnode:%d, will be created when deploying, raw:%p", dnodeObj.id, pRaw); pRaw = mndDnodeActionEncode(&dnodeObj);
if (pRaw == NULL || mndTransAppendCommitlog(pTrans, pRaw) != 0) goto _OVER;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, NULL);
if (pTrans == NULL) {
mError("dnode:%s, failed to create since %s", dnodeObj.ep, terrstr());
return -1;
}
mDebug("trans:%d, used to create dnode:%s", pTrans->id, dnodeObj.ep);
if (mndTransAppendCommitlog(pTrans, pRaw) != 0) {
mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
return -1;
}
sdbSetRawStatus(pRaw, SDB_STATUS_READY); sdbSetRawStatus(pRaw, SDB_STATUS_READY);
pRaw = NULL;
if (mndTransPrepare(pMnode, pTrans) != 0) { if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); code = 0;
mndTransDrop(pTrans);
return -1;
}
_OVER:
mndTransDrop(pTrans); mndTransDrop(pTrans);
return 0; sdbFreeRaw(pRaw);
return code;
} }
static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode) { static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
SSdbRaw *pRaw = sdbAllocRaw(SDB_DNODE, TSDB_DNODE_VER_NUMBER, sizeof(SDnodeObj) + TSDB_DNODE_RESERVE_SIZE); SSdbRaw *pRaw = sdbAllocRaw(SDB_DNODE, TSDB_DNODE_VER_NUMBER, sizeof(SDnodeObj) + TSDB_DNODE_RESERVE_SIZE);
if (pRaw == NULL) goto DNODE_ENCODE_OVER; if (pRaw == NULL) goto _OVER;
int32_t dataPos = 0; int32_t dataPos = 0;
SDB_SET_INT32(pRaw, dataPos, pDnode->id, DNODE_ENCODE_OVER) SDB_SET_INT32(pRaw, dataPos, pDnode->id, _OVER)
SDB_SET_INT64(pRaw, dataPos, pDnode->createdTime, DNODE_ENCODE_OVER) SDB_SET_INT64(pRaw, dataPos, pDnode->createdTime, _OVER)
SDB_SET_INT64(pRaw, dataPos, pDnode->updateTime, DNODE_ENCODE_OVER) SDB_SET_INT64(pRaw, dataPos, pDnode->updateTime, _OVER)
SDB_SET_INT16(pRaw, dataPos, pDnode->port, DNODE_ENCODE_OVER) SDB_SET_INT16(pRaw, dataPos, pDnode->port, _OVER)
SDB_SET_BINARY(pRaw, dataPos, pDnode->fqdn, TSDB_FQDN_LEN, DNODE_ENCODE_OVER) SDB_SET_BINARY(pRaw, dataPos, pDnode->fqdn, TSDB_FQDN_LEN, _OVER)
SDB_SET_RESERVE(pRaw, dataPos, TSDB_DNODE_RESERVE_SIZE, DNODE_ENCODE_OVER) SDB_SET_RESERVE(pRaw, dataPos, TSDB_DNODE_RESERVE_SIZE, _OVER)
SDB_SET_DATALEN(pRaw, dataPos, DNODE_ENCODE_OVER); SDB_SET_DATALEN(pRaw, dataPos, _OVER);
terrno = 0; terrno = 0;
DNODE_ENCODE_OVER: _OVER:
if (terrno != 0) { if (terrno != 0) {
mError("dnode:%d, failed to encode to raw:%p since %s", pDnode->id, pRaw, terrstr()); mError("dnode:%d, failed to encode to raw:%p since %s", pDnode->id, pRaw, terrstr());
sdbFreeRaw(pRaw); sdbFreeRaw(pRaw);
@ -154,33 +146,32 @@ DNODE_ENCODE_OVER:
} }
static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw) { static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw) {
SSdbRow *pRow = NULL;
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
int8_t sver = 0; int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto DNODE_DECODE_OVER; if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
if (sver != TSDB_DNODE_VER_NUMBER) { if (sver != TSDB_DNODE_VER_NUMBER) {
terrno = TSDB_CODE_SDB_INVALID_DATA_VER; terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
goto DNODE_DECODE_OVER; goto _OVER;
} }
SSdbRow *pRow = sdbAllocRow(sizeof(SDnodeObj)); pRow = sdbAllocRow(sizeof(SDnodeObj));
if (pRow == NULL) goto DNODE_DECODE_OVER; if (pRow == NULL) goto _OVER;
SDnodeObj *pDnode = sdbGetRowObj(pRow); SDnodeObj *pDnode = sdbGetRowObj(pRow);
if (pDnode == NULL) goto DNODE_DECODE_OVER; if (pDnode == NULL) goto _OVER;
int32_t dataPos = 0; int32_t dataPos = 0;
SDB_GET_INT32(pRaw, dataPos, &pDnode->id, DNODE_DECODE_OVER) SDB_GET_INT32(pRaw, dataPos, &pDnode->id, _OVER)
SDB_GET_INT64(pRaw, dataPos, &pDnode->createdTime, DNODE_DECODE_OVER) SDB_GET_INT64(pRaw, dataPos, &pDnode->createdTime, _OVER)
SDB_GET_INT64(pRaw, dataPos, &pDnode->updateTime, DNODE_DECODE_OVER) SDB_GET_INT64(pRaw, dataPos, &pDnode->updateTime, _OVER)
SDB_GET_INT16(pRaw, dataPos, &pDnode->port, DNODE_DECODE_OVER) SDB_GET_INT16(pRaw, dataPos, &pDnode->port, _OVER)
SDB_GET_BINARY(pRaw, dataPos, pDnode->fqdn, TSDB_FQDN_LEN, DNODE_DECODE_OVER) SDB_GET_BINARY(pRaw, dataPos, pDnode->fqdn, TSDB_FQDN_LEN, _OVER)
SDB_GET_RESERVE(pRaw, dataPos, TSDB_DNODE_RESERVE_SIZE, DNODE_DECODE_OVER) SDB_GET_RESERVE(pRaw, dataPos, TSDB_DNODE_RESERVE_SIZE, _OVER)
terrno = 0; terrno = 0;
DNODE_DECODE_OVER: _OVER:
if (terrno != 0) { if (terrno != 0) {
mError("dnode:%d, failed to decode from raw:%p since %s", pDnode->id, pRaw, terrstr()); mError("dnode:%d, failed to decode from raw:%p since %s", pDnode->id, pRaw, terrstr());
taosMemoryFreeClear(pRow); taosMemoryFreeClear(pRow);
@ -246,6 +237,7 @@ static SDnodeObj *mndAcquireDnodeByEp(SMnode *pMnode, char *pEpStr) {
sdbRelease(pSdb, pDnode); sdbRelease(pSdb, pDnode);
} }
terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
return NULL; return NULL;
} }
@ -290,25 +282,26 @@ static void mndGetDnodeData(SMnode *pMnode, SArray *pDnodeEps) {
} }
} }
static int32_t mndCheckClusterCfgPara(SMnode *pMnode, const SClusterCfg *pCfg) { static int32_t mndCheckClusterCfgPara(SMnode *pMnode, SDnodeObj *pDnode, const SClusterCfg *pCfg) {
if (pCfg->statusInterval != tsStatusInterval) { if (pCfg->statusInterval != tsStatusInterval) {
mError("statusInterval [%d - %d] cfg inconsistent", pCfg->statusInterval, tsStatusInterval); mError("dnode:%d, statusInterval:%d inconsistent with cluster:%d", pDnode->id, pCfg->statusInterval,
tsStatusInterval);
return DND_REASON_STATUS_INTERVAL_NOT_MATCH; return DND_REASON_STATUS_INTERVAL_NOT_MATCH;
} }
if ((0 != strcasecmp(pCfg->timezone, tsTimezoneStr)) && (pMnode->checkTime != pCfg->checkTime)) { if ((0 != strcasecmp(pCfg->timezone, tsTimezoneStr)) && (pMnode->checkTime != pCfg->checkTime)) {
mError("timezone [%s - %s] [%" PRId64 " - %" PRId64 "] cfg inconsistent", pCfg->timezone, tsTimezoneStr, mError("dnode:%d, timezone:%s checkTime:%" PRId64 " inconsistent with cluster %s %" PRId64, pDnode->id,
pCfg->checkTime, pMnode->checkTime); pCfg->timezone, pCfg->checkTime, tsTimezoneStr, pMnode->checkTime);
return DND_REASON_TIME_ZONE_NOT_MATCH; return DND_REASON_TIME_ZONE_NOT_MATCH;
} }
if (0 != strcasecmp(pCfg->locale, tsLocale)) { if (0 != strcasecmp(pCfg->locale, tsLocale)) {
mError("locale [%s - %s] cfg inconsistent", pCfg->locale, tsLocale); mError("dnode:%d, locale:%s inconsistent with cluster:%s", pDnode->id, pCfg->locale, tsLocale);
return DND_REASON_LOCALE_NOT_MATCH; return DND_REASON_LOCALE_NOT_MATCH;
} }
if (0 != strcasecmp(pCfg->charset, tsCharset)) { if (0 != strcasecmp(pCfg->charset, tsCharset)) {
mError("charset [%s - %s] cfg inconsistent.", pCfg->charset, tsCharset); mError("dnode:%d, charset:%s inconsistent with cluster:%s", pDnode->id, pCfg->charset, tsCharset);
return DND_REASON_CHARSET_NOT_MATCH; return DND_REASON_CHARSET_NOT_MATCH;
} }
@ -323,15 +316,14 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
if (tDeserializeSStatusReq(pReq->pCont, pReq->contLen, &statusReq) != 0) { if (tDeserializeSStatusReq(pReq->pCont, pReq->contLen, &statusReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG; terrno = TSDB_CODE_INVALID_MSG;
goto PROCESS_STATUS_MSG_OVER; goto _OVER;
} }
if (statusReq.dnodeId == 0) { if (statusReq.dnodeId == 0) {
pDnode = mndAcquireDnodeByEp(pMnode, statusReq.dnodeEp); pDnode = mndAcquireDnodeByEp(pMnode, statusReq.dnodeEp);
if (pDnode == NULL) { if (pDnode == NULL) {
mDebug("dnode:%s, not created yet", statusReq.dnodeEp); mDebug("dnode:%s, not created yet", statusReq.dnodeEp);
terrno = TSDB_CODE_MND_DNODE_NOT_EXIST; goto _OVER;
goto PROCESS_STATUS_MSG_OVER;
} }
} else { } else {
pDnode = mndAcquireDnode(pMnode, statusReq.dnodeId); pDnode = mndAcquireDnode(pMnode, statusReq.dnodeId);
@ -341,13 +333,11 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
pDnode->offlineReason = DND_REASON_DNODE_ID_NOT_MATCH; pDnode->offlineReason = DND_REASON_DNODE_ID_NOT_MATCH;
} }
mError("dnode:%d, %s not exist", statusReq.dnodeId, statusReq.dnodeEp); mError("dnode:%d, %s not exist", statusReq.dnodeId, statusReq.dnodeEp);
terrno = TSDB_CODE_MND_DNODE_NOT_EXIST; goto _OVER;
goto PROCESS_STATUS_MSG_OVER;
} }
} }
int32_t numOfVloads = (int32_t)taosArrayGetSize(statusReq.pVloads); for (int32_t v = 0; v < taosArrayGetSize(statusReq.pVloads); ++v) {
for (int32_t v = 0; v < numOfVloads; ++v) {
SVnodeLoad *pVload = taosArrayGet(statusReq.pVloads, v); SVnodeLoad *pVload = taosArrayGet(statusReq.pVloads, v);
SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVload->vgId); SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVload->vgId);
@ -366,6 +356,7 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
roleChanged = true; roleChanged = true;
} }
pVgroup->vnodeGid[vg].role = pVload->syncState; pVgroup->vnodeGid[vg].role = pVload->syncState;
break;
} }
} }
if (roleChanged) { if (roleChanged) {
@ -405,11 +396,11 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
} }
mError("dnode:%d, status msg version:%d not match cluster:%d", statusReq.dnodeId, statusReq.sver, tsVersion); mError("dnode:%d, status msg version:%d not match cluster:%d", statusReq.dnodeId, statusReq.sver, tsVersion);
terrno = TSDB_CODE_VERSION_NOT_COMPATIBLE; terrno = TSDB_CODE_VERSION_NOT_COMPATIBLE;
goto PROCESS_STATUS_MSG_OVER; goto _OVER;
} }
if (statusReq.dnodeId == 0) { if (statusReq.dnodeId == 0) {
mDebug("dnode:%d, %s first access, set clusterId %" PRId64, pDnode->id, pDnode->ep, pMnode->clusterId); mInfo("dnode:%d, %s first access, set clusterId %" PRId64, pDnode->id, pDnode->ep, pMnode->clusterId);
} else { } else {
if (statusReq.clusterId != pMnode->clusterId) { if (statusReq.clusterId != pMnode->clusterId) {
if (pDnode != NULL) { if (pDnode != NULL) {
@ -418,7 +409,7 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
mError("dnode:%d, clusterId %" PRId64 " not match exist %" PRId64, pDnode->id, statusReq.clusterId, mError("dnode:%d, clusterId %" PRId64 " not match exist %" PRId64, pDnode->id, statusReq.clusterId,
pMnode->clusterId); pMnode->clusterId);
terrno = TSDB_CODE_MND_INVALID_CLUSTER_ID; terrno = TSDB_CODE_MND_INVALID_CLUSTER_ID;
goto PROCESS_STATUS_MSG_OVER; goto _OVER;
} else { } else {
pDnode->accessTimes++; pDnode->accessTimes++;
mTrace("dnode:%d, status received, access times %d", pDnode->id, pDnode->accessTimes); mTrace("dnode:%d, status received, access times %d", pDnode->id, pDnode->accessTimes);
@ -426,18 +417,17 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
} }
// Verify whether the cluster parameters are consistent when status change from offline to ready // Verify whether the cluster parameters are consistent when status change from offline to ready
int32_t ret = mndCheckClusterCfgPara(pMnode, &statusReq.clusterCfg); pDnode->offlineReason = mndCheckClusterCfgPara(pMnode, pDnode, &statusReq.clusterCfg);
if (0 != ret) { if (pDnode->offlineReason != 0) {
pDnode->offlineReason = ret; mError("dnode:%d, cluster cfg inconsistent since:%s", pDnode->id, offlineReason[pDnode->offlineReason]);
mError("dnode:%d, cluster cfg inconsistent since:%s", pDnode->id, offlineReason[ret]);
terrno = TSDB_CODE_MND_INVALID_CLUSTER_CFG; terrno = TSDB_CODE_MND_INVALID_CLUSTER_CFG;
goto PROCESS_STATUS_MSG_OVER; goto _OVER;
} }
if (!online) { if (!online) {
mInfo("dnode:%d, from offline to online", pDnode->id); mInfo("dnode:%d, from offline to online", pDnode->id);
} else { } else {
mDebug("dnode:%d, send dnode epset, online:%d ver:% " PRId64 ":%" PRId64 " reboot:%d", pDnode->id, online, mDebug("dnode:%d, send dnode epset, online:%d dnode_ver:%" PRId64 ":%" PRId64 " reboot:%d", pDnode->id, online,
statusReq.dnodeVer, dnodeVer, reboot); statusReq.dnodeVer, dnodeVer, reboot);
} }
@ -452,7 +442,7 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
statusRsp.pDnodeEps = taosArrayInit(mndGetDnodeSize(pMnode), sizeof(SDnodeEp)); statusRsp.pDnodeEps = taosArrayInit(mndGetDnodeSize(pMnode), sizeof(SDnodeEp));
if (statusRsp.pDnodeEps == NULL) { if (statusRsp.pDnodeEps == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
goto PROCESS_STATUS_MSG_OVER; goto _OVER;
} }
mndGetDnodeData(pMnode, statusRsp.pDnodeEps); mndGetDnodeData(pMnode, statusRsp.pDnodeEps);
@ -469,13 +459,17 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
pDnode->lastAccessTime = curMs; pDnode->lastAccessTime = curMs;
code = 0; code = 0;
PROCESS_STATUS_MSG_OVER: _OVER:
mndReleaseDnode(pMnode, pDnode); mndReleaseDnode(pMnode, pDnode);
taosArrayDestroy(statusReq.pVloads); taosArrayDestroy(statusReq.pVloads);
return code; return code;
} }
static int32_t mndCreateDnode(SMnode *pMnode, SRpcMsg *pReq, SCreateDnodeReq *pCreate) { static int32_t mndCreateDnode(SMnode *pMnode, SRpcMsg *pReq, SCreateDnodeReq *pCreate) {
int32_t code = -1;
SSdbRaw *pRaw = NULL;
STrans *pTrans = NULL;
SDnodeObj dnodeObj = {0}; SDnodeObj dnodeObj = {0};
dnodeObj.id = sdbGetMaxId(pMnode->pSdb, SDB_DNODE); dnodeObj.id = sdbGetMaxId(pMnode->pSdb, SDB_DNODE);
dnodeObj.createdTime = taosGetTimestampMs(); dnodeObj.createdTime = taosGetTimestampMs();
@ -484,29 +478,22 @@ static int32_t mndCreateDnode(SMnode *pMnode, SRpcMsg *pReq, SCreateDnodeReq *pC
memcpy(dnodeObj.fqdn, pCreate->fqdn, TSDB_FQDN_LEN); memcpy(dnodeObj.fqdn, pCreate->fqdn, TSDB_FQDN_LEN);
snprintf(dnodeObj.ep, TSDB_EP_LEN, "%s:%u", dnodeObj.fqdn, dnodeObj.port); snprintf(dnodeObj.ep, TSDB_EP_LEN, "%s:%u", dnodeObj.fqdn, dnodeObj.port);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_GLOBAL, pReq); pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_GLOBAL, pReq);
if (pTrans == NULL) { if (pTrans == NULL) goto _OVER;
mError("dnode:%s, failed to create since %s", dnodeObj.ep, terrstr());
return -1;
}
mDebug("trans:%d, used to create dnode:%s", pTrans->id, dnodeObj.ep); mDebug("trans:%d, used to create dnode:%s", pTrans->id, dnodeObj.ep);
SSdbRaw *pCommitRaw = mndDnodeActionEncode(&dnodeObj); pRaw = mndDnodeActionEncode(&dnodeObj);
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { if (pRaw == NULL || mndTransAppendCommitlog(pTrans, pRaw) != 0) goto _OVER;
mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr()); sdbSetRawStatus(pRaw, SDB_STATUS_READY);
mndTransDrop(pTrans); pRaw = NULL;
return -1;
}
sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
if (mndTransPrepare(pMnode, pTrans) != 0) { if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); code = 0;
mndTransDrop(pTrans);
return -1;
}
_OVER:
mndTransDrop(pTrans); mndTransDrop(pTrans);
return 0; sdbFreeRaw(pRaw);
return code;
} }
static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq) { static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq) {
@ -518,38 +505,37 @@ static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq) {
if (tDeserializeSCreateDnodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) { if (tDeserializeSCreateDnodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG; terrno = TSDB_CODE_INVALID_MSG;
goto CREATE_DNODE_OVER; goto _OVER;
} }
mDebug("dnode:%s:%d, start to create", createReq.fqdn, createReq.port); mDebug("dnode:%s:%d, start to create", createReq.fqdn, createReq.port);
if (createReq.fqdn[0] == 0 || createReq.port <= 0 || createReq.port > UINT16_MAX) { if (createReq.fqdn[0] == 0 || createReq.port <= 0 || createReq.port > UINT16_MAX) {
terrno = TSDB_CODE_MND_INVALID_DNODE_EP; terrno = TSDB_CODE_MND_INVALID_DNODE_EP;
goto CREATE_DNODE_OVER; goto _OVER;
} }
char ep[TSDB_EP_LEN]; char ep[TSDB_EP_LEN];
snprintf(ep, TSDB_EP_LEN, "%s:%d", createReq.fqdn, createReq.port); snprintf(ep, TSDB_EP_LEN, "%s:%d", createReq.fqdn, createReq.port);
pDnode = mndAcquireDnodeByEp(pMnode, ep); pDnode = mndAcquireDnodeByEp(pMnode, ep);
if (pDnode != NULL) { if (pDnode != NULL) {
terrno = TSDB_CODE_MND_DNODE_ALREADY_EXIST; goto _OVER;
goto CREATE_DNODE_OVER;
} }
pUser = mndAcquireUser(pMnode, pReq->conn.user); pUser = mndAcquireUser(pMnode, pReq->conn.user);
if (pUser == NULL) { if (pUser == NULL) {
terrno = TSDB_CODE_MND_NO_USER_FROM_CONN; terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
goto CREATE_DNODE_OVER; goto _OVER;
} }
if (mndCheckNodeAuth(pUser) != 0) { if (mndCheckNodeAuth(pUser) != 0) {
goto CREATE_DNODE_OVER; goto _OVER;
} }
code = mndCreateDnode(pMnode, pReq, &createReq); code = mndCreateDnode(pMnode, pReq, &createReq);
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
CREATE_DNODE_OVER: _OVER:
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("dnode:%s:%d, failed to create since %s", createReq.fqdn, createReq.port, terrstr()); mError("dnode:%s:%d, failed to create since %s", createReq.fqdn, createReq.port, terrstr());
} }
@ -559,7 +545,7 @@ CREATE_DNODE_OVER:
return code; return code;
} }
static int32_t mndDropDnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode, SMnodeObj *pMObj) { static int32_t mndDropDnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode, SMnodeObj *pMObj, int32_t numOfVnodes) {
int32_t code = -1; int32_t code = -1;
SSdbRaw *pRaw = NULL; SSdbRaw *pRaw = NULL;
STrans *pTrans = NULL; STrans *pTrans = NULL;
@ -579,8 +565,12 @@ static int32_t mndDropDnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode, SM
sdbSetRawStatus(pRaw, SDB_STATUS_DROPPED); sdbSetRawStatus(pRaw, SDB_STATUS_DROPPED);
pRaw = NULL; pRaw = NULL;
if (mndSetDropMnodeInfoToTrans(pMnode, pTrans, pMObj) != 0) goto _OVER; if (pMObj != NULL) {
if (mndSetMoveVgroupsInfoToTrans(pMnode, pTrans, pDnode->id) != 0) goto _OVER; if (mndSetDropMnodeInfoToTrans(pMnode, pTrans, pMObj) != 0) goto _OVER;
}
if (numOfVnodes > 0) {
if (mndSetMoveVgroupsInfoToTrans(pMnode, pTrans, pDnode->id) != 0) goto _OVER;
}
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
code = 0; code = 0;
@ -617,11 +607,6 @@ static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq) {
goto _OVER; goto _OVER;
} }
if (!mndIsDnodeOnline(pDnode, taosGetTimestampMs())) {
terrno = TSDB_CODE_NODE_OFFLINE;
goto _OVER;
}
pMObj = mndAcquireMnode(pMnode, dropReq.dnodeId); pMObj = mndAcquireMnode(pMnode, dropReq.dnodeId);
if (pMObj != NULL) { if (pMObj != NULL) {
if (sdbGetSize(pMnode->pSdb, SDB_MNODE) <= 1) { if (sdbGetSize(pMnode->pSdb, SDB_MNODE) <= 1) {
@ -629,7 +614,17 @@ static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq) {
goto _OVER; goto _OVER;
} }
if (pMnode->selfDnodeId == dropReq.dnodeId) { if (pMnode->selfDnodeId == dropReq.dnodeId) {
terrno = TSDB_CODE_MND_CANT_DROP_MASTER; terrno = TSDB_CODE_MND_CANT_DROP_LEADER;
goto _OVER;
}
}
int32_t numOfVnodes = mndGetVnodesNum(pMnode, pDnode->id);
if (numOfVnodes > 0 || pMObj != NULL) {
if (!mndIsDnodeOnline(pDnode, taosGetTimestampMs())) {
terrno = TSDB_CODE_NODE_OFFLINE;
mError("dnode:%d, failed to drop since %s, has_mnode:%d numOfVnodes:%d", pDnode->id, terrstr(), pMObj != NULL,
numOfVnodes);
goto _OVER; goto _OVER;
} }
} }
@ -644,7 +639,7 @@ static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq) {
goto _OVER; goto _OVER;
} }
code = mndDropDnode(pMnode, pReq, pDnode, pMObj); code = mndDropDnode(pMnode, pReq, pDnode, pMObj, numOfVnodes);
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
_OVER: _OVER:
@ -669,7 +664,6 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) {
SDnodeObj *pDnode = mndAcquireDnode(pMnode, cfgReq.dnodeId); SDnodeObj *pDnode = mndAcquireDnode(pMnode, cfgReq.dnodeId);
if (pDnode == NULL) { if (pDnode == NULL) {
terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
mError("dnode:%d, failed to config since %s ", cfgReq.dnodeId, terrstr()); mError("dnode:%d, failed to config since %s ", cfgReq.dnodeId, terrstr());
return -1; return -1;
} }
@ -679,17 +673,18 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) {
int32_t bufLen = tSerializeSMCfgDnodeReq(NULL, 0, &cfgReq); int32_t bufLen = tSerializeSMCfgDnodeReq(NULL, 0, &cfgReq);
void *pBuf = rpcMallocCont(bufLen); void *pBuf = rpcMallocCont(bufLen);
if (pBuf == NULL) return -1;
tSerializeSMCfgDnodeReq(pBuf, bufLen, &cfgReq); tSerializeSMCfgDnodeReq(pBuf, bufLen, &cfgReq);
mDebug("dnode:%d, send config req to dnode, app:%p", cfgReq.dnodeId, pReq->info.ahandle);
SRpcMsg rpcMsg = {.msgType = TDMT_DND_CONFIG_DNODE, .pCont = pBuf, .contLen = bufLen, .info = pReq->info}; SRpcMsg rpcMsg = {.msgType = TDMT_DND_CONFIG_DNODE, .pCont = pBuf, .contLen = bufLen, .info = pReq->info};
mDebug("dnode:%d, send config req to dnode, app:%p", cfgReq.dnodeId, rpcMsg.info.ahandle);
return tmsgSendReq(&epSet, &rpcMsg); return tmsgSendReq(&epSet, &rpcMsg);
} }
static int32_t mndProcessConfigDnodeRsp(SRpcMsg *pRsp) { static int32_t mndProcessConfigDnodeRsp(SRpcMsg *pRsp) {
mDebug("config rsp from dnode, app:%p", pRsp->info.ahandle); mDebug("config rsp from dnode, app:%p", pRsp->info.ahandle);
return TSDB_CODE_SUCCESS; return 0;
} }
static int32_t mndRetrieveConfigs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) { static int32_t mndRetrieveConfigs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
@ -698,7 +693,7 @@ static int32_t mndRetrieveConfigs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p
int32_t numOfRows = 0; int32_t numOfRows = 0;
char *cfgOpts[TSDB_CONFIG_NUMBER] = {0}; char *cfgOpts[TSDB_CONFIG_NUMBER] = {0};
char cfgVals[TSDB_CONFIG_NUMBER][TSDB_CONIIG_VALUE_LEN + 1] = {0}; char cfgVals[TSDB_CONFIG_NUMBER][TSDB_CONIIG_VALUE_LEN + 1] = {0};
char *pWrite; char *pWrite = NULL;
int32_t cols = 0; int32_t cols = 0;
cfgOpts[totalRows] = "statusInterval"; cfgOpts[totalRows] = "statusInterval";
@ -724,7 +719,6 @@ static int32_t mndRetrieveConfigs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p
cols = 0; cols = 0;
STR_WITH_MAXSIZE_TO_VARSTR(buf, cfgOpts[i], TSDB_CONFIG_OPTION_LEN); STR_WITH_MAXSIZE_TO_VARSTR(buf, cfgOpts[i], TSDB_CONFIG_OPTION_LEN);
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)buf, false); colDataAppend(pColInfo, numOfRows, (const char *)buf, false);
@ -774,7 +768,6 @@ static int32_t mndRetrieveDnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
char b1[9] = {0}; char b1[9] = {0};
STR_TO_VARSTR(b1, online ? "ready" : "offline"); STR_TO_VARSTR(b1, online ? "ready" : "offline");
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, b1, false); colDataAppend(pColInfo, numOfRows, b1, false);
@ -792,7 +785,6 @@ static int32_t mndRetrieveDnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
} }
pShow->numOfRows += numOfRows; pShow->numOfRows += numOfRows;
return numOfRows; return numOfRows;
} }

View File

@ -588,7 +588,7 @@ static int32_t mndProcessDropMnodeReq(SRpcMsg *pReq) {
} }
if (pMnode->selfDnodeId == dropReq.dnodeId) { if (pMnode->selfDnodeId == dropReq.dnodeId) {
terrno = TSDB_CODE_MND_CANT_DROP_MASTER; terrno = TSDB_CODE_MND_CANT_DROP_LEADER;
goto _OVER; goto _OVER;
} }

View File

@ -193,7 +193,7 @@ int32_t mndInitSync(SMnode *pMnode) {
void mndCleanupSync(SMnode *pMnode) { void mndCleanupSync(SMnode *pMnode) {
SSyncMgmt *pMgmt = &pMnode->syncMgmt; SSyncMgmt *pMgmt = &pMnode->syncMgmt;
syncStop(pMgmt->sync); syncStop(pMgmt->sync);
mDebug("sync:%" PRId64 " is stopped", pMgmt->sync); mDebug("mnode sync is stopped, id:%" PRId64, pMgmt->sync);
tsem_destroy(&pMgmt->syncSem); tsem_destroy(&pMgmt->syncSem);
if (pMgmt->pWal != NULL) { if (pMgmt->pWal != NULL) {

View File

@ -117,10 +117,10 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
int32_t dataPos = 0; int32_t dataPos = 0;
SDB_SET_INT32(pRaw, dataPos, pTrans->id, _OVER) SDB_SET_INT32(pRaw, dataPos, pTrans->id, _OVER)
SDB_SET_INT16(pRaw, dataPos, pTrans->stage, _OVER) SDB_SET_INT8(pRaw, dataPos, pTrans->stage, _OVER)
SDB_SET_INT16(pRaw, dataPos, pTrans->policy, _OVER) SDB_SET_INT8(pRaw, dataPos, pTrans->policy, _OVER)
SDB_SET_INT16(pRaw, dataPos, pTrans->conflict, _OVER) SDB_SET_INT8(pRaw, dataPos, pTrans->conflict, _OVER)
SDB_SET_INT16(pRaw, dataPos, pTrans->exec, _OVER) SDB_SET_INT8(pRaw, dataPos, pTrans->exec, _OVER)
SDB_SET_INT64(pRaw, dataPos, pTrans->createdTime, _OVER) SDB_SET_INT64(pRaw, dataPos, pTrans->createdTime, _OVER)
SDB_SET_BINARY(pRaw, dataPos, pTrans->dbname, TSDB_DB_FNAME_LEN, _OVER) SDB_SET_BINARY(pRaw, dataPos, pTrans->dbname, TSDB_DB_FNAME_LEN, _OVER)
SDB_SET_INT32(pRaw, dataPos, pTrans->redoActionPos, _OVER) SDB_SET_INT32(pRaw, dataPos, pTrans->redoActionPos, _OVER)
@ -256,15 +256,15 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT32(pRaw, dataPos, &pTrans->id, _OVER) SDB_GET_INT32(pRaw, dataPos, &pTrans->id, _OVER)
int16_t stage = 0; int8_t stage = 0;
int16_t policy = 0; int8_t policy = 0;
int16_t conflict = 0; int8_t conflict = 0;
int16_t exec = 0; int8_t exec = 0;
int8_t actionType = 0; int8_t actionType = 0;
SDB_GET_INT16(pRaw, dataPos, &stage, _OVER) SDB_GET_INT8(pRaw, dataPos, &stage, _OVER)
SDB_GET_INT16(pRaw, dataPos, &policy, _OVER) SDB_GET_INT8(pRaw, dataPos, &policy, _OVER)
SDB_GET_INT16(pRaw, dataPos, &conflict, _OVER) SDB_GET_INT8(pRaw, dataPos, &conflict, _OVER)
SDB_GET_INT16(pRaw, dataPos, &exec, _OVER) SDB_GET_INT8(pRaw, dataPos, &exec, _OVER)
pTrans->stage = stage; pTrans->stage = stage;
pTrans->policy = policy; pTrans->policy = policy;
pTrans->conflict = conflict; pTrans->conflict = conflict;
@ -290,7 +290,8 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT32(pRaw, dataPos, &action.acceptableCode, _OVER) SDB_GET_INT32(pRaw, dataPos, &action.acceptableCode, _OVER)
SDB_GET_INT8(pRaw, dataPos, &actionType, _OVER) SDB_GET_INT8(pRaw, dataPos, &actionType, _OVER)
action.actionType = actionType; action.actionType = actionType;
SDB_GET_INT8(pRaw, dataPos, &action.stage, _OVER) SDB_GET_INT8(pRaw, dataPos, &stage, _OVER)
action.stage = stage;
if (action.actionType == TRANS_ACTION_RAW) { if (action.actionType == TRANS_ACTION_RAW) {
SDB_GET_INT8(pRaw, dataPos, &action.rawWritten, _OVER) SDB_GET_INT8(pRaw, dataPos, &action.rawWritten, _OVER)
SDB_GET_INT32(pRaw, dataPos, &dataLen, _OVER) SDB_GET_INT32(pRaw, dataPos, &dataLen, _OVER)
@ -322,7 +323,8 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT32(pRaw, dataPos, &action.acceptableCode, _OVER) SDB_GET_INT32(pRaw, dataPos, &action.acceptableCode, _OVER)
SDB_GET_INT8(pRaw, dataPos, &actionType, _OVER) SDB_GET_INT8(pRaw, dataPos, &actionType, _OVER)
action.actionType = actionType; action.actionType = actionType;
SDB_GET_INT8(pRaw, dataPos, &action.stage, _OVER) SDB_GET_INT8(pRaw, dataPos, &stage, _OVER)
action.stage = stage;
if (action.actionType == TRANS_ACTION_RAW) { if (action.actionType == TRANS_ACTION_RAW) {
SDB_GET_INT8(pRaw, dataPos, &action.rawWritten, _OVER) SDB_GET_INT8(pRaw, dataPos, &action.rawWritten, _OVER)
SDB_GET_INT32(pRaw, dataPos, &dataLen, _OVER) SDB_GET_INT32(pRaw, dataPos, &dataLen, _OVER)
@ -354,7 +356,8 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT32(pRaw, dataPos, &action.acceptableCode, _OVER) SDB_GET_INT32(pRaw, dataPos, &action.acceptableCode, _OVER)
SDB_GET_INT8(pRaw, dataPos, &actionType, _OVER) SDB_GET_INT8(pRaw, dataPos, &actionType, _OVER)
action.actionType = actionType; action.actionType = actionType;
SDB_GET_INT8(pRaw, dataPos, &action.stage, _OVER) SDB_GET_INT8(pRaw, dataPos, &stage, _OVER)
action.stage = stage;
if (action.actionType) { if (action.actionType) {
SDB_GET_INT8(pRaw, dataPos, &action.rawWritten, _OVER) SDB_GET_INT8(pRaw, dataPos, &action.rawWritten, _OVER)
SDB_GET_INT32(pRaw, dataPos, &dataLen, _OVER) SDB_GET_INT32(pRaw, dataPos, &dataLen, _OVER)
@ -878,7 +881,6 @@ static void mndTransResetActions(SMnode *pMnode, STrans *pTrans, SArray *pArray)
mDebug("trans:%d, %s:%d execute status is reset", pTrans->id, mndTransStr(pAction->stage), action); mDebug("trans:%d, %s:%d execute status is reset", pTrans->id, mndTransStr(pAction->stage), action);
} }
pAction->errCode = 0; pAction->errCode = 0;
} }
} }
@ -890,11 +892,12 @@ static int32_t mndTransWriteSingleLog(SMnode *pMnode, STrans *pTrans, STransActi
pAction->rawWritten = true; pAction->rawWritten = true;
pAction->errCode = 0; pAction->errCode = 0;
code = 0; code = 0;
mDebug("trans:%d, %s:%d write to sdb", pTrans->id, mndTransStr(pAction->stage), pAction->id); mDebug("trans:%d, %s:%d write to sdb, type:%s status:%s", pTrans->id, mndTransStr(pAction->stage), pAction->id,
sdbTableName(pAction->pRaw->type), sdbStatusName(pAction->pRaw->status));
} else { } else {
pAction->errCode = (terrno != 0) ? terrno : code; pAction->errCode = (terrno != 0) ? terrno : code;
mError("trans:%d, %s:%d failed to write sdb since %s", pTrans->id, mndTransStr(pAction->stage), pAction->id, mError("trans:%d, %s:%d failed to write sdb since %s, type:%s status:%s", pTrans->id, mndTransStr(pAction->stage),
terrstr()); pAction->id, terrstr(), sdbTableName(pAction->pRaw->type), sdbStatusName(pAction->pRaw->status));
} }
return code; return code;
@ -916,18 +919,26 @@ static int32_t mndTransSendSingleMsg(SMnode *pMnode, STrans *pTrans, STransActio
} }
memcpy(rpcMsg.pCont, pAction->pCont, pAction->contLen); memcpy(rpcMsg.pCont, pAction->pCont, pAction->contLen);
char detail[1024] = {0};
int32_t len = snprintf(detail, sizeof(detail), "msgType:%s numOfEps:%d inUse:%d", TMSG_INFO(pAction->msgType),
pAction->epSet.numOfEps, pAction->epSet.inUse);
for (int32_t i = 0; i < pTrans->lastErrorEpset.numOfEps; ++i) {
len += snprintf(detail + len, sizeof(detail) - len, " ep:%d-%s:%u", i, pAction->epSet.eps[i].fqdn,
pAction->epSet.eps[i].port);
}
int32_t code = tmsgSendReq(&pAction->epSet, &rpcMsg); int32_t code = tmsgSendReq(&pAction->epSet, &rpcMsg);
if (code == 0) { if (code == 0) {
pAction->msgSent = 1; pAction->msgSent = 1;
pAction->msgReceived = 0; pAction->msgReceived = 0;
pAction->errCode = 0; pAction->errCode = 0;
mDebug("trans:%d, %s:%d is sent to %s:%u", pTrans->id, mndTransStr(pAction->stage), pAction->id, mDebug("trans:%d, %s:%d is sent, %s", pTrans->id, mndTransStr(pAction->stage), pAction->id, detail);
pAction->epSet.eps[pAction->epSet.inUse].fqdn, pAction->epSet.eps[pAction->epSet.inUse].port);
} else { } else {
pAction->msgSent = 0; pAction->msgSent = 0;
pAction->msgReceived = 0; pAction->msgReceived = 0;
pAction->errCode = (terrno != 0) ? terrno : code; pAction->errCode = (terrno != 0) ? terrno : code;
mError("trans:%d, %s:%d not send since %s", pTrans->id, mndTransStr(pAction->stage), pAction->id, terrstr()); mError("trans:%d, %s:%d not send since %s, %s", pTrans->id, mndTransStr(pAction->stage), pAction->id, terrstr(),
detail);
} }
return code; return code;
@ -1424,9 +1435,9 @@ static int32_t mndRetrieveTrans(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl
if (epset.numOfEps > 0) { if (epset.numOfEps > 0) {
len += snprintf(detail + len, sizeof(detail) - len, "msgType:%s numOfEps:%d inUse:%d ", len += snprintf(detail + len, sizeof(detail) - len, "msgType:%s numOfEps:%d inUse:%d ",
TMSG_INFO(pTrans->lastErrorMsgType), epset.numOfEps, epset.inUse); TMSG_INFO(pTrans->lastErrorMsgType), epset.numOfEps, epset.inUse);
} for (int32_t i = 0; i < pTrans->lastErrorEpset.numOfEps; ++i) {
for (int32_t i = 0; i < pTrans->lastErrorEpset.numOfEps; ++i) { len += snprintf(detail + len, sizeof(detail) - len, "ep:%d-%s:%u ", i, epset.eps[i].fqdn, epset.eps[i].port);
len += snprintf(detail + len, sizeof(detail) - len, "ep:%d-%s:%u ", i, epset.eps[i].fqdn, epset.eps[i].port); }
} }
} }
STR_WITH_MAXSIZE_TO_VARSTR(lastError, detail, pShow->pMeta->pSchemas[cols].bytes); STR_WITH_MAXSIZE_TO_VARSTR(lastError, detail, pShow->pMeta->pSchemas[cols].bytes);

View File

@ -392,6 +392,7 @@ int32_t sdbStopWrite(SSdb *pSdb, SSdbIter *pIter, bool isApply);
int32_t sdbDoWrite(SSdb *pSdb, SSdbIter *pIter, void *pBuf, int32_t len); int32_t sdbDoWrite(SSdb *pSdb, SSdbIter *pIter, void *pBuf, int32_t len);
const char *sdbTableName(ESdbType type); const char *sdbTableName(ESdbType type);
const char *sdbStatusName(ESdbStatus status);
void sdbPrintOper(SSdb *pSdb, SSdbRow *pRow, const char *oper); void sdbPrintOper(SSdb *pSdb, SSdbRow *pRow, const char *oper);
int32_t sdbGetIdFromRaw(SSdb *pSdb, SSdbRaw *pRaw); int32_t sdbGetIdFromRaw(SSdb *pSdb, SSdbRaw *pRaw);

View File

@ -65,7 +65,7 @@ const char *sdbTableName(ESdbType type) {
} }
} }
static const char *sdbStatusName(ESdbStatus status) { const char *sdbStatusName(ESdbStatus status) {
switch (status) { switch (status) {
case SDB_STATUS_CREATING: case SDB_STATUS_CREATING:
return "creating"; return "creating";

View File

@ -13,7 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "tstream.h" #include "streamInc.h"
#if 0 #if 0
int32_t streamDataBlockEncode(void** buf, const SStreamDataBlock* pOutput) { int32_t streamDataBlockEncode(void** buf, const SStreamDataBlock* pOutput) {
@ -74,7 +74,6 @@ FAIL:
} }
static FORCE_INLINE void streamDataSubmitRefInc(SStreamDataSubmit* pDataSubmit) { static FORCE_INLINE void streamDataSubmitRefInc(SStreamDataSubmit* pDataSubmit) {
//
atomic_add_fetch_32(pDataSubmit->dataRef, 1); atomic_add_fetch_32(pDataSubmit->dataRef, 1);
} }

View File

@ -116,11 +116,9 @@ int32_t streamBuildDispatchMsg(SStreamTask* pTask, SStreamDataBlock* data, SRpcM
*ppEpSet = &pTask->fixedEpDispatcher.epSet; *ppEpSet = &pTask->fixedEpDispatcher.epSet;
downstreamTaskId = pTask->fixedEpDispatcher.taskId; downstreamTaskId = pTask->fixedEpDispatcher.taskId;
} else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) { } else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) {
// TODO get ctbName // TODO get ctbName for each block
char ctbName[TSDB_TABLE_FNAME_LEN + 22] = {0};
SSDataBlock* pBlock = taosArrayGet(data->blocks, 0); SSDataBlock* pBlock = taosArrayGet(data->blocks, 0);
sprintf(ctbName, "%s:%ld", pTask->shuffleDispatcher.stbFullName, pBlock->info.groupId); char* ctbName = buildCtbNameByGroupId(pTask->shuffleDispatcher.stbFullName, pBlock->info.groupId);
// get vg and ep
// TODO: get hash function by hashMethod // TODO: get hash function by hashMethod
// get groupId, compute hash value // get groupId, compute hash value

View File

@ -188,9 +188,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_SNODE_ALREADY_EXIST, "Snode already exists"
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SNODE_NOT_EXIST, "Snode not there") TAOS_DEFINE_ERROR(TSDB_CODE_MND_SNODE_NOT_EXIST, "Snode not there")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_BNODE_ALREADY_EXIST, "Bnode already exists") TAOS_DEFINE_ERROR(TSDB_CODE_MND_BNODE_ALREADY_EXIST, "Bnode already exists")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_BNODE_NOT_EXIST, "Bnode not there") TAOS_DEFINE_ERROR(TSDB_CODE_MND_BNODE_NOT_EXIST, "Bnode not there")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_FEW_MNODES, "The replicas of mnode cannot less than 1") TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_FEW_MNODES, "The replica of mnode cannot less than 1")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_MNODES, "The replicas of mnode cannot exceed 3") TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_MNODES, "The replica of mnode cannot exceed 3")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_CANT_DROP_MASTER, "Can't drop mnode which is leader") TAOS_DEFINE_ERROR(TSDB_CODE_MND_CANT_DROP_LEADER, "Cannot drop mnode which is leader")
// mnode-acct // mnode-acct
TAOS_DEFINE_ERROR(TSDB_CODE_MND_ACCT_ALREADY_EXIST, "Account already exists") TAOS_DEFINE_ERROR(TSDB_CODE_MND_ACCT_ALREADY_EXIST, "Account already exists")

View File

@ -21,7 +21,7 @@
./test.sh -f tsim/db/taosdlog.sim ./test.sh -f tsim/db/taosdlog.sim
# ---- dnode # ---- dnode
./test.sh -f tsim/dnode/basic1.sim ./test.sh -f tsim/dnode/create_dnode.sim
# ---- insert # ---- insert
./test.sh -f tsim/insert/basic0.sim ./test.sh -f tsim/insert/basic0.sim

View File

@ -31,7 +31,7 @@ if $data[0][4] != ready then
goto check_dnode_ready goto check_dnode_ready
endi endi
#sql connect sql connect
sql create dnode $hostname port 7200 sql create dnode $hostname port 7200
sql create dnode $hostname port 7300 sql create dnode $hostname port 7300
sql create dnode $hostname port 7400 sql create dnode $hostname port 7400
@ -71,7 +71,7 @@ sql create database db replica $replica vgroups $vgroups
$loop_cnt = 0 $loop_cnt = 0
check_db_ready: check_db_ready:
$loop_cnt = $loop_cnt + 1 $loop_cnt = $loop_cnt + 1
sleep 200 sleep 20
if $loop_cnt == 10 then if $loop_cnt == 10 then
print ====> db not ready! print ====> db not ready!
return -1 return -1
@ -93,13 +93,12 @@ $loop_cnt = 0
check_vg_ready: check_vg_ready:
$loop_cnt = $loop_cnt + 1 $loop_cnt = $loop_cnt + 1
sleep 200 sleep 200
if $loop_cnt == 10 then if $loop_cnt == 300 then
print ====> vgroups not ready! print ====> vgroups not ready!
return -1 return -1
endi endi
sql show vgroups sql show vgroups
print ===> rows: $rows print ===> rows: $rows
print $data(2)[0] $data(2)[1] $data(2)[2] $data(2)[3] $data(2)[4] $data(2)[5] $data(2)[6] $data(2)[7] $data(2)[8] $data(2)[9] $data(2)[10] $data(2)[11] $data(2)[12] $data(2)[13]
print $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6] $data[0][7] $data[0][8] $data[0][9] $data[10][6] $data[0][11] $data[0][12] $data[0][13] print $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6] $data[0][7] $data[0][8] $data[0][9] $data[10][6] $data[0][11] $data[0][12] $data[0][13]
if $rows != $vgroups then if $rows != $vgroups then
return -1 return -1
@ -132,10 +131,12 @@ if $data[0][8] == leader then
goto check_vg_ready goto check_vg_ready
endi endi
print ---- vgroup $data[0][0] leader locate on dnode $data[0][7] print ---- vgroup $data[0][0] leader locate on dnode $data[0][7]
goto vg_ready goto vg_ready
else
goto check_vg_ready
endi endi
vg_ready:
vg_ready:
print ====> create stable/child table print ====> create stable/child table
sql create table stb (ts timestamp, c1 int, c2 float, c3 binary(10)) tags (t1 int) sql create table stb (ts timestamp, c1 int, c2 float, c3 binary(10)) tags (t1 int)
@ -156,27 +157,13 @@ while $i < $tbNum
sql create table $ctb using stb tags( $i ) sql create table $ctb using stb tags( $i )
$ntb = $ntbPrefix . $i $ntb = $ntbPrefix . $i
sql create table $ntb (ts timestamp, c1 int, c2 float, c3 binary(10)) sql create table $ntb (ts timestamp, c1 int, c2 float, c3 binary(10))
# $x = 0
# while $x < $rowNum
# $binary = ' . binary
# $binary = $binary . $i
# $binary = $binary . '
#
# sql insert into $ctb values ($tstart , $i , $x , $binary )
# sql insert into $ntb values ($tstart , 999 , 999 , 'binary-ntb' )
# $tstart = $tstart + 1
# $x = $x + 1
# endw
# print ====> insert rows: $rowNum into $ctb and $ntb
$i = $i + 1 $i = $i + 1
# $tstart = 1640966400000
endw endw
$totalTblNum = $tbNum * 2 $totalTblNum = $tbNum * 2
sleep 1000
sql show tables sql show tables
print ====> expect $totalTblNum and infinsert $rows in fact
if $rows != $totalTblNum then if $rows != $totalTblNum then
return -1 return -1
endi endi
@ -185,7 +172,7 @@ print ====> create a normal table for interaction between main and back threads
sql create table interaction (ts timestamp, flag binary(10), childrows int, stbrows int) sql create table interaction (ts timestamp, flag binary(10), childrows int, stbrows int)
print ====> start to run_back to insert data print ====> start to run_back to insert data
run_back tsim/tmq/insertDataByRunBack.sim run_back tsim/sync/insertDataByRunBack.sim
print ====> waiting insert thread starting insert data print ====> waiting insert thread starting insert data
@ -222,12 +209,15 @@ endi
$dnodeId = dnode . $dnodeId $dnodeId = dnode . $dnodeId
print ====> stop $dnodeId print ====> stop $dnodeId
system sh/exec.sh -n $dnodeId -s stop -x SIGINT system sh/exec.sh -n $dnodeId -s stop -x SIGINT
sleep 1000
print ====> start $dnodeId
system sh/exec.sh -n $dnodeId -s start
$loop_cnt = 0 $loop_cnt = 0
check_vg_ready_2: check_vg_ready_2:
$loop_cnt = $loop_cnt + 1 $loop_cnt = $loop_cnt + 1
sleep 200 sleep 200
if $loop_cnt == 10 then if $loop_cnt == 300 then
print ====> vgroups switch fail!!! print ====> vgroups switch fail!!!
return -1 return -1
endi endi
@ -242,38 +232,38 @@ if $data[0][4] == leader then
if $data[0][6] != NULL then if $data[0][6] != NULL then
goto check_vg_ready_2 goto check_vg_ready_2
endi endi
if $data[0][8] != NULL then if $data[0][8] != FOLLOWER then
goto check_vg_ready_2 goto check_vg_ready_2
endi endi
print ---- vgroup $data[0][0] leader switch to dnode $data[0][3] print ---- vgroup $dnodeId leader switch to dnode $data[0][3]
goto vg_ready_2 goto vg_ready_2
endi endi
if $data[0][6] == leader then if $data[0][6] == leader then
if $data[0][4] != NULL then if $data[0][4] != NULL then
goto check_vg_ready_2 goto check_vg_ready_2
endi endi
if $data[0][8] != NULL then if $data[0][8] != FOLLOWER then
goto check_vg_ready_2 goto check_vg_ready_2
endi endi
print ---- vgroup $data[0][0] leader switch to dnode $data[0][5] print ---- vgroup $dnodeId leader switch to dnode $data[0][5]
goto vg_ready_2 goto vg_ready_2
endi endi
if $data[0][8] == leader then if $data[0][8] == leader then
if $data[0][4] != NULL then if $data[0][4] != NULL then
goto check_vg_ready_2 goto check_vg_ready_2
endi endi
if $data[0][6] != NULL then if $data[0][6] != FOLLOWER then
goto check_vg_ready_2 goto check_vg_ready_2
endi endi
print ---- vgroup $data[0][0] leader switch to dnode $data[0][7] print ---- vgroup $dnodeId leader switch to dnode $data[0][7]
goto vg_ready_2 goto vg_ready_2
else
goto check_vg_ready_2
endi endi
vg_ready_2: vg_ready_2:
$switch_loop_cnt = $switch_loop_cnt + 1 $switch_loop_cnt = $switch_loop_cnt + 1
if $switch_loop_cnt < 3 then if $switch_loop_cnt < 3 then
print ====> start $dnodeId
system sh/exec.sh -n $dnodeId -s start
goto switch_leader_loop goto switch_leader_loop
endi endi
@ -347,7 +337,7 @@ if $data[0][4] == leader then
if $data[0][6] != NULL then if $data[0][6] != NULL then
goto check_vg_ready_1 goto check_vg_ready_1
endi endi
if $data[0][8] != NULL then if $data[0][8] != FOLLOWER then
goto check_vg_ready_1 goto check_vg_ready_1
endi endi
goto vg_ready_1 goto vg_ready_1
@ -356,7 +346,7 @@ if $data[0][6] == leader then
if $data[0][4] != NULL then if $data[0][4] != NULL then
goto check_vg_ready_1 goto check_vg_ready_1
endi endi
if $data[0][8] != NULL then if $data[0][8] != FOLLOWER then
goto check_vg_ready_1 goto check_vg_ready_1
endi endi
goto vg_ready_1 goto vg_ready_1
@ -365,7 +355,7 @@ if $data[0][8] == leader then
if $data[0][4] != NULL then if $data[0][4] != NULL then
goto check_vg_ready_1 goto check_vg_ready_1
endi endi
if $data[0][6] != NULL then if $data[0][6] != FOLLOWER then
goto check_vg_ready_1 goto check_vg_ready_1
endi endi
goto vg_ready_1 goto vg_ready_1
@ -394,6 +384,73 @@ if $data[0][0] != $totalRowsOfStb then
return -1 return -1
endi endi
print ====> once stop one dnode by loop, and do query every time
$i = 2
loop_stop_dnode:
$dnodeId = dnode . $i
print ====> stop $dnodeId
system sh/exec.sh -n $dnodeId -s stop -x SIGINT
check_vg_ready_3:
sql show vgroups
print $data(2)[0] $data(2)[1] $data(2)[2] $data(2)[3] $data(2)[4] $data(2)[5] $data(2)[6] $data(2)[7] $data(2)[8] $data(2)[9] $data(2)[10] $data(2)[11] $data(2)[12] $data(2)[13]
if $data[0][4] == LEADER then
if $data[0][6] == LEADER then
goto check_vg_ready_3
endi
if $data[0][8] == LEADER then
goto check_vg_ready_3
endi
print ---- vgroup $data[0][0] leader locating dnode $data[0][5]
elif $data[0][6] == LEADER then
if $data[0][4] == LEADER then
goto check_vg_ready_3
endi
if $data[0][8] == LEADER then
goto check_vg_ready_3
endi
print ---- vgroup $data[0][0] leader locating dnode $data[0][7]
elif $data[0][8] == LEADER then
if $data[0][4] == LEADER then
goto check_vg_ready_3
endi
if $data[0][6] == LEADER then
goto check_vg_ready_3
endi
print ---- vgroup $data[0][0] leader locating dnode $data[0][9]
else
print ====> no leader vnode!!!
return -1
endi
sql select count(*) from ntb0
print rows: $rows
print $data[0][0] $data[0][1]
if $data[0][0] != $totalRowsOfCtb then
return -1
endi
sql select count(*) from ctb0
print rows: $rows
print $data[0][0] $data[0][1]
if $data[0][0] != $totalRowsOfCtb then
return -1
endi
sql select count(*) from stb
print rows: $rows
print $data[0][0] $data[0][1]
if $data[0][0] != $totalRowsOfStb then
return -1
endi
$i = $i + 1
if $i <= 4 then
print ====> start $dnodeId
system sh/exec.sh -n $dnodeId -s start
goto loop_stop_dnode
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode2 -s stop -x SIGINT system sh/exec.sh -n dnode2 -s stop -x SIGINT
system sh/exec.sh -n dnode3 -s stop -x SIGINT system sh/exec.sh -n dnode3 -s stop -x SIGINT

View File

@ -80,93 +80,9 @@ class TDTestCase:
con=taos.connect(host=host, user=user, password=password, config=cfg ,port=port) con=taos.connect(host=host, user=user, password=password, config=cfg ,port=port)
print(con) print(con)
return con return con
def test_stmt_insert_multi(self,conn):
# type: (TaosConnection) -> None
dbname = "pytest_taos_stmt_multi"
try:
conn.execute("drop database if exists %s" % dbname)
conn.execute("create database if not exists %s" % dbname)
conn.select_db(dbname)
conn.execute(
"create table if not exists log(ts timestamp, bo bool, nil tinyint, ti tinyint, si smallint, ii int,\
bi bigint, tu tinyint unsigned, su smallint unsigned, iu int unsigned, bu bigint unsigned, \
ff float, dd double, bb binary(100), nn nchar(100), tt timestamp)",
)
# conn.load_table_info("log")
start = datetime.now()
stmt = conn.statement("insert into log values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")
params = new_multi_binds(16)
params[0].timestamp((1626861392589, 1626861392590, 1626861392591))
params[1].bool((True, None, False))
params[2].tinyint([-128, -128, None]) # -128 is tinyint null
params[3].tinyint([0, 127, None])
params[4].smallint([3, None, 2])
params[5].int([3, 4, None])
params[6].bigint([3, 4, None])
params[7].tinyint_unsigned([3, 4, None])
params[8].smallint_unsigned([3, 4, None])
params[9].int_unsigned([3, 4, None])
params[10].bigint_unsigned([3, 4, None])
params[11].float([3, None, 1])
params[12].double([3, None, 1.2])
params[13].binary(["abc", "dddafadfadfadfadfa", None])
params[14].nchar(["涛思数据", None, "a long string with 中文字符"])
params[15].timestamp([None, None, 1626861392591])
# print(type(stmt))
stmt.bind_param_batch(params)
stmt.execute()
end = datetime.now()
print("elapsed time: ", end - start)
assert stmt.affected_rows == 3
#query
querystmt=conn.statement("select ?,bu from log")
queryparam=new_bind_params(1)
print(type(queryparam))
queryparam[0].binary("ts")
querystmt.bind_param(queryparam)
querystmt.execute()
result=querystmt.use_result()
# rows=result.fetch_all()
# print( querystmt.use_result())
# result = conn.query("select * from log")
rows=result.fetch_all()
# rows=result.fetch_all()
print(rows)
assert rows[1][0] == "ts"
assert rows[0][1] == 3
#query
querystmt1=conn.statement("select * from log where bu < ?")
queryparam1=new_bind_params(1)
print(type(queryparam1))
queryparam1[0].int(4)
querystmt1.bind_param(queryparam1)
querystmt1.execute()
result1=querystmt1.use_result()
rows1=result1.fetch_all()
assert str(rows1[0][0]) == "2021-07-21 17:56:32.589000"
assert rows1[0][10] == 3
stmt.close()
# conn.execute("drop database if exists %s" % dbname)
conn.close()
except Exception as err:
# conn.execute("drop database if exists %s" % dbname)
conn.close()
raise err
def test_stmt_set_tbname_tag(self,conn): def test_stmt_set_tbname_tag(self,conn):
dbname = "pytest_taos_stmt_set_tbname_tag" dbname = "stmt_set_tbname_tag"
try: try:
conn.execute("drop database if exists %s" % dbname) conn.execute("drop database if exists %s" % dbname)
@ -174,16 +90,16 @@ class TDTestCase:
conn.select_db(dbname) conn.select_db(dbname)
conn.execute("create table if not exists log(ts timestamp, bo bool, nil tinyint, ti tinyint, si smallint, ii int,\ conn.execute("create table if not exists log(ts timestamp, bo bool, nil tinyint, ti tinyint, si smallint, ii int,\
bi bigint, tu tinyint unsigned, su smallint unsigned, iu int unsigned, bu bigint unsigned, \ bi bigint, tu tinyint unsigned, su smallint unsigned, iu int unsigned, bu bigint unsigned, \
ff float, dd double, bb binary(100), nn nchar(100), tt timestamp) tags (t1 timestamp, t2 bool,\ ff float, dd double, bb binary(100), nn nchar(100), tt timestamp , vc varchar(100)) tags (t1 timestamp, t2 bool,\
t3 tinyint, t4 tinyint, t5 smallint, t6 int, t7 bigint, t8 tinyint unsigned, t9 smallint unsigned, \ t3 tinyint, t4 tinyint, t5 smallint, t6 int, t7 bigint, t8 tinyint unsigned, t9 smallint unsigned, \
t10 int unsigned, t11 bigint unsigned, t12 float, t13 double, t14 binary(100), t15 nchar(100), t16 timestamp)") t10 int unsigned, t11 bigint unsigned, t12 float, t13 double, t14 binary(100), t15 nchar(100), t16 timestamp)")
stmt = conn.statement("insert into ? using log tags (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?) \ stmt = conn.statement("insert into ? using log tags (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?) \
values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)") values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
tags = new_bind_params(16) tags = new_bind_params(16)
tags[0].timestamp(1626861392589123, PrecisionEnum.Microseconds) tags[0].timestamp(1626861392589123, PrecisionEnum.Microseconds)
tags[1].bool(True) tags[1].bool(True)
tags[2].null() tags[2].bool(False)
tags[3].tinyint(2) tags[3].tinyint(2)
tags[4].smallint(3) tags[4].smallint(3)
tags[5].int(4) tags[5].int(4)
@ -198,7 +114,7 @@ class TDTestCase:
tags[14].nchar("stmt") tags[14].nchar("stmt")
tags[15].timestamp(1626861392589, PrecisionEnum.Milliseconds) tags[15].timestamp(1626861392589, PrecisionEnum.Milliseconds)
stmt.set_tbname_tags("tb1", tags) stmt.set_tbname_tags("tb1", tags)
params = new_multi_binds(16) params = new_multi_binds(17)
params[0].timestamp((1626861392589111, 1626861392590111, 1626861392591111)) params[0].timestamp((1626861392589111, 1626861392590111, 1626861392591111))
params[1].bool((True, None, False)) params[1].bool((True, None, False))
params[2].tinyint([-128, -128, None]) # -128 is tinyint null params[2].tinyint([-128, -128, None]) # -128 is tinyint null
@ -213,25 +129,32 @@ class TDTestCase:
params[11].float([3, None, 1]) params[11].float([3, None, 1])
params[12].double([3, None, 1.2]) params[12].double([3, None, 1.2])
params[13].binary(["abc", "dddafadfadfadfadfa", None]) params[13].binary(["abc", "dddafadfadfadfadfa", None])
params[14].nchar(["涛思数据", None, "a? long string with 中文字符"]) params[14].nchar(["涛思数据", None, "a long string with 中文字符"])
params[15].timestamp([None, None, 1626861392591]) params[15].timestamp([None, None, 1626861392591])
params[16].binary(["涛思数据16", None, "a long string with 中文-字符"])
stmt.bind_param_batch(params) stmt.bind_param_batch(params)
stmt.execute() stmt.execute()
assert stmt.affected_rows == 3 assert stmt.affected_rows == 3
#query #query all
querystmt1=conn.statement("select * from log where bu < ?") querystmt1=conn.statement("select * from log where bu < ?")
queryparam1=new_bind_params(1) queryparam1=new_bind_params(1)
print(type(queryparam1)) print(type(queryparam1))
queryparam1[0].int(5) queryparam1[0].int(10)
querystmt1.bind_param(queryparam1) querystmt1.bind_param(queryparam1)
querystmt1.execute() querystmt1.execute()
result1=querystmt1.use_result() result1=querystmt1.use_result()
rows1=result1.fetch_all() rows1=result1.fetch_all()
print("1",rows1) print(rows1[0])
print(rows1[1])
print(rows1[2])
assert str(rows1[0][0]) == "2021-07-21 17:56:32.589111"
assert rows1[0][10] == 3
assert rows1[1][10] == 4
#query: Numeric Functions
querystmt2=conn.statement("select abs(?) from log where bu < ?") querystmt2=conn.statement("select abs(?) from log where bu < ?")
queryparam2=new_bind_params(2) queryparam2=new_bind_params(2)
print(type(queryparam2)) print(type(queryparam2))
@ -242,6 +165,11 @@ class TDTestCase:
result2=querystmt2.use_result() result2=querystmt2.use_result()
rows2=result2.fetch_all() rows2=result2.fetch_all()
print("2",rows2) print("2",rows2)
assert rows2[0][0] == 5
assert rows2[1][0] == 5
#query: Numeric Functions and escapes
querystmt3=conn.statement("select abs(?) from log where nn= 'a? long string with 中文字符' ") querystmt3=conn.statement("select abs(?) from log where nn= 'a? long string with 中文字符' ")
queryparam3=new_bind_params(1) queryparam3=new_bind_params(1)
@ -252,9 +180,63 @@ class TDTestCase:
result3=querystmt3.use_result() result3=querystmt3.use_result()
rows3=result3.fetch_all() rows3=result3.fetch_all()
print("3",rows3) print("3",rows3)
# assert str(rows1[0][0]) == "2021-07-21 17:56:32.589111" assert rows3 == []
# assert rows1[0][10] == 3
# assert rows1[1][10] == 4 #query: string Functions
querystmt3=conn.statement("select CHAR_LENGTH(?) from log ")
queryparam3=new_bind_params(1)
print(type(queryparam3))
queryparam3[0].binary('中文字符')
querystmt3.bind_param(queryparam3)
querystmt3.execute()
result3=querystmt3.use_result()
rows3=result3.fetch_all()
print("4",rows3)
assert rows3[0][0] == 12, 'fourth case is failed'
assert rows3[1][0] == 12, 'fourth case is failed'
# #query: conversion Functions
# querystmt4=conn.statement("select cast( ? as bigint) from log ")
# queryparam4=new_bind_params(1)
# print(type(queryparam4))
# queryparam4[0].binary('1232a')
# querystmt4.bind_param(queryparam4)
# querystmt4.execute()
# result4=querystmt4.use_result()
# rows4=result4.fetch_all()
# print("5",rows4)
# assert rows4[0][0] == 1232
# assert rows4[1][0] == 1232
# querystmt4=conn.statement("select cast( ? as binary(10)) from log ")
# queryparam4=new_bind_params(1)
# print(type(queryparam4))
# queryparam4[0].int(123)
# querystmt4.bind_param(queryparam4)
# querystmt4.execute()
# result4=querystmt4.use_result()
# rows4=result4.fetch_all()
# print("6",rows4)
# assert rows4[0][0] == '123'
# assert rows4[1][0] == '123'
# #query: datatime Functions
# querystmt4=conn.statement(" select timediff('2021-07-21 17:56:32.590111',?,1s) from log ")
# queryparam4=new_bind_params(1)
# print(type(queryparam4))
# queryparam4[0].timestamp(1626861392591111)
# querystmt4.bind_param(queryparam4)
# querystmt4.execute()
# result4=querystmt4.use_result()
# rows4=result4.fetch_all()
# print("7",rows4)
# assert rows4[0][0] == 1, 'seventh case is failed'
# assert rows4[1][0] == 1, 'seventh case is failed'
# conn.execute("drop database if exists %s" % dbname) # conn.execute("drop database if exists %s" % dbname)
conn.close() conn.close()
@ -269,8 +251,6 @@ class TDTestCase:
config = buildPath+ "../sim/dnode1/cfg/" config = buildPath+ "../sim/dnode1/cfg/"
host="localhost" host="localhost"
connectstmt=self.newcon(host,config) connectstmt=self.newcon(host,config)
self.test_stmt_insert_multi(connectstmt)
connectstmt=self.newcon(host,config)
self.test_stmt_set_tbname_tag(connectstmt) self.test_stmt_set_tbname_tag(connectstmt)
return return

View File

@ -124,7 +124,7 @@ class TDTestCase:
print("elapsed time: ", end - start) print("elapsed time: ", end - start)
assert stmt.affected_rows == 3 assert stmt.affected_rows == 3
#query #query 1
querystmt=conn.statement("select ?,bu from log") querystmt=conn.statement("select ?,bu from log")
queryparam=new_bind_params(1) queryparam=new_bind_params(1)
print(type(queryparam)) print(type(queryparam))
@ -141,8 +141,9 @@ class TDTestCase:
print(rows) print(rows)
assert rows[1][0] == "ts" assert rows[1][0] == "ts"
assert rows[0][1] == 3 assert rows[0][1] == 3
assert rows[2][1] == None
#query #query 2
querystmt1=conn.statement("select * from log where bu < ?") querystmt1=conn.statement("select * from log where bu < ?")
queryparam1=new_bind_params(1) queryparam1=new_bind_params(1)
print(type(queryparam1)) print(type(queryparam1))

View File

@ -82,7 +82,7 @@ class TDTestCase:
return con return con
def test_stmt_set_tbname_tag(self,conn): def test_stmt_set_tbname_tag(self,conn):
dbname = "pytest_taos_stmt_set_tbname_tag" dbname = "stmt_set_tbname_tag"
try: try:
conn.execute("drop database if exists %s" % dbname) conn.execute("drop database if exists %s" % dbname)
@ -99,7 +99,7 @@ class TDTestCase:
tags = new_bind_params(16) tags = new_bind_params(16)
tags[0].timestamp(1626861392589123, PrecisionEnum.Microseconds) tags[0].timestamp(1626861392589123, PrecisionEnum.Microseconds)
tags[1].bool(True) tags[1].bool(True)
tags[2].null() tags[2].bool(False)
tags[3].tinyint(2) tags[3].tinyint(2)
tags[4].smallint(3) tags[4].smallint(3)
tags[5].int(4) tags[5].int(4)
@ -114,7 +114,7 @@ class TDTestCase:
tags[14].nchar("stmt") tags[14].nchar("stmt")
tags[15].timestamp(1626861392589, PrecisionEnum.Milliseconds) tags[15].timestamp(1626861392589, PrecisionEnum.Milliseconds)
stmt.set_tbname_tags("tb1", tags) stmt.set_tbname_tags("tb1", tags)
params = new_multi_binds(16) params = new_multi_binds(17)
params[0].timestamp((1626861392589111, 1626861392590111, 1626861392591111)) params[0].timestamp((1626861392589111, 1626861392590111, 1626861392591111))
params[1].bool((True, None, False)) params[1].bool((True, None, False))
params[2].tinyint([-128, -128, None]) # -128 is tinyint null params[2].tinyint([-128, -128, None]) # -128 is tinyint null
@ -129,28 +129,153 @@ class TDTestCase:
params[11].float([3, None, 1]) params[11].float([3, None, 1])
params[12].double([3, None, 1.2]) params[12].double([3, None, 1.2])
params[13].binary(["abc", "dddafadfadfadfadfa", None]) params[13].binary(["abc", "dddafadfadfadfadfa", None])
params[14].nchar(["涛思数据", None, "a long string with 中文字符"]) params[14].nchar(["涛思数据", None, "a long string with 中文?字符"])
params[15].timestamp([None, None, 1626861392591]) params[15].timestamp([None, None, 1626861392591])
params[16].binary(["涛思数据16", None, "a long string with 中文-字符"]) params[16].binary(["涛思数据16", None, None])
stmt.bind_param_batch(params) stmt.bind_param_batch(params)
stmt.execute() stmt.execute()
assert stmt.affected_rows == 3 assert stmt.affected_rows == 3
#query #query all
querystmt1=conn.statement("select * from log where bu < ?") querystmt1=conn.statement("select * from log where bu < ?")
queryparam1=new_bind_params(1) queryparam1=new_bind_params(1)
print(type(queryparam1)) print(type(queryparam1))
queryparam1[0].int(5) queryparam1[0].int(10)
querystmt1.bind_param(queryparam1) querystmt1.bind_param(queryparam1)
querystmt1.execute() querystmt1.execute()
result1=querystmt1.use_result() result1=querystmt1.use_result()
rows1=result1.fetch_all() rows1=result1.fetch_all()
print(rows1) print(rows1[0])
# assert str(rows1[0][0]) == "2021-07-21 17:56:32.589111" print(rows1[1])
# assert rows1[0][10] == 3 print(rows1[2])
# assert rows1[1][10] == 4 assert str(rows1[0][0]) == "2021-07-21 17:56:32.589111"
assert rows1[0][10] == 3
assert rows1[1][10] == 4
#query: Numeric Functions
querystmt2=conn.statement("select abs(?) from log where bu < ?")
queryparam2=new_bind_params(2)
print(type(queryparam2))
queryparam2[0].int(5)
queryparam2[1].int(5)
querystmt2.bind_param(queryparam2)
querystmt2.execute()
result2=querystmt2.use_result()
rows2=result2.fetch_all()
print("2",rows2)
assert rows2[0][0] == 5
assert rows2[1][0] == 5
#query: Numeric Functions and escapes
querystmt3=conn.statement("select abs(?) from log where nn= 'a? long string with 中文字符' ")
queryparam3=new_bind_params(1)
print(type(queryparam3))
queryparam3[0].int(5)
querystmt3.bind_param(queryparam3)
querystmt3.execute()
result3=querystmt3.use_result()
rows3=result3.fetch_all()
print("3",rows3)
assert rows3 == []
# #query: string Functions
# querystmt3=conn.statement("select CHAR_LENGTH(?) from log ")
# queryparam3=new_bind_params(1)
# print(type(queryparam3))
# queryparam3[0].binary('中文字符')
# querystmt3.bind_param(queryparam3)
# querystmt3.execute()
# result3=querystmt3.use_result()
# rows3=result3.fetch_all()
# print("4",rows3)
# assert rows3[0][0] == 12, 'fourth case is failed'
# assert rows3[1][0] == 12, 'fourth case is failed'
# #query: conversion Functions
# querystmt4=conn.statement("select cast( ? as bigint) from log ")
# queryparam4=new_bind_params(1)
# print(type(queryparam4))
# queryparam4[0].binary('1232a')
# querystmt4.bind_param(queryparam4)
# querystmt4.execute()
# result4=querystmt4.use_result()
# rows4=result4.fetch_all()
# print("5",rows4)
# assert rows4[0][0] == 1232
# assert rows4[1][0] == 1232
# querystmt4=conn.statement("select cast( ? as binary(10)) from log ")
# queryparam4=new_bind_params(1)
# print(type(queryparam4))
# queryparam4[0].int(123)
# querystmt4.bind_param(queryparam4)
# querystmt4.execute()
# result4=querystmt4.use_result()
# rows4=result4.fetch_all()
# print("6",rows4)
# assert rows4[0][0] == '123'
# assert rows4[1][0] == '123'
# #query: datatime Functions
# querystmt4=conn.statement(" select timediff('2021-07-21 17:56:32.590111',?,1s) from log ")
# queryparam4=new_bind_params(1)
# print(type(queryparam4))
# queryparam4[0].timestamp(1626861392591111)
# querystmt4.bind_param(queryparam4)
# querystmt4.execute()
# result4=querystmt4.use_result()
# rows4=result4.fetch_all()
# print("7",rows4)
# assert rows4[0][0] == 1, 'seventh case is failed'
# assert rows4[1][0] == 1, 'seventh case is failed'
#query: aggregate Functions
querystmt4=conn.statement(" select count(?) from log ")
queryparam4=new_bind_params(1)
print(type(queryparam4))
queryparam4[0].int(123)
querystmt4.bind_param(queryparam4)
querystmt4.execute()
result4=querystmt4.use_result()
rows4=result4.fetch_all()
print("8",rows4)
assert rows4[0][0] == 3, ' 8 case is failed'
#query: selector Functions 9
querystmt4=conn.statement(" select bottom(bu,?) from log group by bu ; ")
queryparam4=new_bind_params(1)
print(type(queryparam4))
queryparam4[0].int(2)
querystmt4.bind_param(queryparam4)
querystmt4.execute()
result4=querystmt4.use_result()
rows4=result4.fetch_all()
print("9",rows4)
assert rows4[0][0] == 4, ' 9 case is failed'
assert rows4[1][0] == 3, ' 9 case is failed'
# #query: time-series specific Functions 10
querystmt4=conn.statement(" select twa(?) from log; ")
queryparam4=new_bind_params(1)
print(type(queryparam4))
queryparam4[0].int(15)
querystmt4.bind_param(queryparam4)
querystmt4.execute()
result4=querystmt4.use_result()
rows4=result4.fetch_all()
print("10",rows4)
assert rows4[0][0] == 15, ' 10 case is failed'
# conn.execute("drop database if exists %s" % dbname) # conn.execute("drop database if exists %s" % dbname)
conn.close() conn.close()