Merge branch '3.0' of https://github.com/taosdata/TDengine into feature/tkv
This commit is contained in:
commit
ca197e7113
|
@ -264,10 +264,10 @@ typedef struct {
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char name[TSDB_TABLE_FNAME_LEN];
|
char name[TSDB_TABLE_FNAME_LEN];
|
||||||
int8_t updateType;
|
int8_t alterType;
|
||||||
int32_t numOfSchemas;
|
int32_t numOfSchemas;
|
||||||
SSchema pSchemas[];
|
SSchema pSchemas[];
|
||||||
} SMUpdateStbReq;
|
} SMAltertbReq;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t pid;
|
int32_t pid;
|
||||||
|
@ -1178,31 +1178,28 @@ typedef struct SVCreateTbReq {
|
||||||
SSchema* pSchema;
|
SSchema* pSchema;
|
||||||
} ntbCfg;
|
} ntbCfg;
|
||||||
};
|
};
|
||||||
} SVCreateTbReq;
|
} SVCreateTbReq, SVUpdateTbReq;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
} SVCreateTbRsp, SVUpdateTbRsp;
|
||||||
|
|
||||||
|
int32_t tSerializeSVCreateTbReq(void** buf, SVCreateTbReq* pReq);
|
||||||
|
void* tDeserializeSVCreateTbReq(void* buf, SVCreateTbReq* pReq);
|
||||||
|
int32_t tSerializeSVCreateTbRsp(void** buf, SVCreateTbRsp* pRsp);
|
||||||
|
void* tDeserializeSVCreateTbRsp(void* buf, SVCreateTbRsp* pRsp);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
uint64_t ver; // use a general definition
|
uint64_t ver; // use a general definition
|
||||||
SArray* pArray;
|
SArray* pArray;
|
||||||
} SVCreateTbBatchReq;
|
} SVCreateTbBatchReq;
|
||||||
|
|
||||||
int tSerializeSVCreateTbReq(void** buf, SVCreateTbReq* pReq);
|
|
||||||
void* tDeserializeSVCreateTbReq(void* buf, SVCreateTbReq* pReq);
|
|
||||||
int tSVCreateTbBatchReqSerialize(void** buf, SVCreateTbBatchReq* pReq);
|
|
||||||
void* tSVCreateTbBatchReqDeserialize(void* buf, SVCreateTbBatchReq* pReq);
|
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
SMsgHead head;
|
} SVCreateTbBatchRsp;
|
||||||
} SVCreateTbRsp;
|
|
||||||
|
|
||||||
typedef struct {
|
int32_t tSerializeSVCreateTbBatchReq(void** buf, SVCreateTbBatchReq* pReq);
|
||||||
SMsgHead head;
|
void* tDeserializeSVCreateTbBatchReq(void* buf, SVCreateTbBatchReq* pReq);
|
||||||
char name[TSDB_TABLE_FNAME_LEN];
|
int32_t tSerializeSVCreateTbBatchReqp(void** buf, SVCreateTbBatchReq* pRsp);
|
||||||
int8_t ignoreNotExists;
|
void* tDeserializeSVCreateTbBatchReq(void* buf, SVCreateTbBatchReq* pRsp);
|
||||||
} SVAlterTbReq;
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
SMsgHead head;
|
|
||||||
} SVAlterTbRsp;
|
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
uint64_t ver;
|
uint64_t ver;
|
||||||
|
|
|
@ -293,7 +293,7 @@ void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) {
|
||||||
return buf;
|
return buf;
|
||||||
}
|
}
|
||||||
|
|
||||||
int tSVCreateTbBatchReqSerialize(void **buf, SVCreateTbBatchReq *pReq) {
|
int tSerializeSVCreateTbBatchReq(void **buf, SVCreateTbBatchReq *pReq) {
|
||||||
int tlen = 0;
|
int tlen = 0;
|
||||||
|
|
||||||
tlen += taosEncodeFixedU64(buf, pReq->ver);
|
tlen += taosEncodeFixedU64(buf, pReq->ver);
|
||||||
|
@ -306,7 +306,7 @@ int tSVCreateTbBatchReqSerialize(void **buf, SVCreateTbBatchReq *pReq) {
|
||||||
return tlen;
|
return tlen;
|
||||||
}
|
}
|
||||||
|
|
||||||
void *tSVCreateTbBatchReqDeserialize(void *buf, SVCreateTbBatchReq *pReq) {
|
void *tDeserializeSVCreateTbBatchReq(void *buf, SVCreateTbBatchReq *pReq) {
|
||||||
uint32_t nsize = 0;
|
uint32_t nsize = 0;
|
||||||
|
|
||||||
buf = taosDecodeFixedU64(buf, &pReq->ver);
|
buf = taosDecodeFixedU64(buf, &pReq->ver);
|
||||||
|
|
|
@ -220,15 +220,69 @@ TEST_F(DndTestVnode, 03_Create_Stb) {
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(DndTestVnode, 04_ALTER_Stb) {
|
TEST_F(DndTestVnode, 04_ALTER_Stb) {
|
||||||
#if 0
|
for (int i = 0; i < 1; ++i) {
|
||||||
{
|
SVCreateTbReq req = {0};
|
||||||
for (int i = 0; i < 3; ++i) {
|
req.ver = 0;
|
||||||
SRpcMsg* pRsp = test.SendReq(TDMT_VND_ALTER_STB, pReq, contLen);
|
req.name = (char*)"stb1";
|
||||||
ASSERT_NE(pRsp, nullptr);
|
req.ttl = 0;
|
||||||
ASSERT_EQ(pRsp->code, 0);
|
req.keep = 0;
|
||||||
|
req.type = TD_SUPER_TABLE;
|
||||||
|
|
||||||
|
SSchema schemas[5] = {0};
|
||||||
|
{
|
||||||
|
SSchema* pSchema = &schemas[0];
|
||||||
|
pSchema->bytes = htonl(8);
|
||||||
|
pSchema->type = TSDB_DATA_TYPE_TIMESTAMP;
|
||||||
|
strcpy(pSchema->name, "ts");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
SSchema* pSchema = &schemas[1];
|
||||||
|
pSchema->bytes = htonl(4);
|
||||||
|
pSchema->type = TSDB_DATA_TYPE_INT;
|
||||||
|
strcpy(pSchema->name, "col1");
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
SSchema* pSchema = &schemas[2];
|
||||||
|
pSchema->bytes = htonl(2);
|
||||||
|
pSchema->type = TSDB_DATA_TYPE_TINYINT;
|
||||||
|
strcpy(pSchema->name, "_tag1");
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
SSchema* pSchema = &schemas[3];
|
||||||
|
pSchema->bytes = htonl(8);
|
||||||
|
pSchema->type = TSDB_DATA_TYPE_BIGINT;
|
||||||
|
strcpy(pSchema->name, "_tag2");
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
SSchema* pSchema = &schemas[4];
|
||||||
|
pSchema->bytes = htonl(16);
|
||||||
|
pSchema->type = TSDB_DATA_TYPE_BINARY;
|
||||||
|
strcpy(pSchema->name, "_tag3");
|
||||||
|
}
|
||||||
|
|
||||||
|
req.stbCfg.suid = 9527;
|
||||||
|
req.stbCfg.nCols = 2;
|
||||||
|
req.stbCfg.pSchema = &schemas[0];
|
||||||
|
req.stbCfg.nTagCols = 3;
|
||||||
|
req.stbCfg.pTagSchema = &schemas[2];
|
||||||
|
|
||||||
|
int32_t contLen = tSerializeSVCreateTbReq(NULL, &req) + sizeof(SMsgHead);
|
||||||
|
SMsgHead* pHead = (SMsgHead*)rpcMallocCont(contLen);
|
||||||
|
|
||||||
|
pHead->contLen = htonl(contLen);
|
||||||
|
pHead->vgId = htonl(2);
|
||||||
|
|
||||||
|
void* pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead));
|
||||||
|
tSerializeSVCreateTbReq(&pBuf, &req);
|
||||||
|
|
||||||
|
SRpcMsg* pRsp = test.SendReq(TDMT_VND_ALTER_STB, (void*)pHead, contLen);
|
||||||
|
ASSERT_NE(pRsp, nullptr);
|
||||||
|
ASSERT_EQ(pRsp->code, 0);
|
||||||
}
|
}
|
||||||
#endif
|
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(DndTestVnode, 05_DROP_Stb) {
|
TEST_F(DndTestVnode, 05_DROP_Stb) {
|
||||||
|
|
|
@ -33,10 +33,10 @@ static int32_t mndStbActionInsert(SSdb *pSdb, SStbObj *pStb);
|
||||||
static int32_t mndStbActionDelete(SSdb *pSdb, SStbObj *pStb);
|
static int32_t mndStbActionDelete(SSdb *pSdb, SStbObj *pStb);
|
||||||
static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOld, SStbObj *pNew);
|
static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOld, SStbObj *pNew);
|
||||||
static int32_t mndProcessMCreateStbReq(SMnodeMsg *pReq);
|
static int32_t mndProcessMCreateStbReq(SMnodeMsg *pReq);
|
||||||
static int32_t mndProcessMUpdateStbReq(SMnodeMsg *pReq);
|
static int32_t mndProcessMAlterStbReq(SMnodeMsg *pReq);
|
||||||
static int32_t mndProcessMDropStbReq(SMnodeMsg *pReq);
|
static int32_t mndProcessMDropStbReq(SMnodeMsg *pReq);
|
||||||
static int32_t mndProcessVCreateStbRsp(SMnodeMsg *pRsp);
|
static int32_t mndProcessVCreateStbRsp(SMnodeMsg *pRsp);
|
||||||
static int32_t mndProcessVUpdateStbRsp(SMnodeMsg *pRsp);
|
static int32_t mndProcessVAlterStbRsp(SMnodeMsg *pRsp);
|
||||||
static int32_t mndProcessVDropStbRsp(SMnodeMsg *pRsp);
|
static int32_t mndProcessVDropStbRsp(SMnodeMsg *pRsp);
|
||||||
static int32_t mndProcessStbMetaReq(SMnodeMsg *pReq);
|
static int32_t mndProcessStbMetaReq(SMnodeMsg *pReq);
|
||||||
static int32_t mndGetStbMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
|
static int32_t mndGetStbMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
|
||||||
|
@ -53,10 +53,10 @@ int32_t mndInitStb(SMnode *pMnode) {
|
||||||
.deleteFp = (SdbDeleteFp)mndStbActionDelete};
|
.deleteFp = (SdbDeleteFp)mndStbActionDelete};
|
||||||
|
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STB, mndProcessMCreateStbReq);
|
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STB, mndProcessMCreateStbReq);
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_ALTER_STB, mndProcessMUpdateStbReq);
|
mndSetMsgHandle(pMnode, TDMT_MND_ALTER_STB, mndProcessMAlterStbReq);
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_DROP_STB, mndProcessMDropStbReq);
|
mndSetMsgHandle(pMnode, TDMT_MND_DROP_STB, mndProcessMDropStbReq);
|
||||||
mndSetMsgHandle(pMnode, TDMT_VND_CREATE_STB_RSP, mndProcessVCreateStbRsp);
|
mndSetMsgHandle(pMnode, TDMT_VND_CREATE_STB_RSP, mndProcessVCreateStbRsp);
|
||||||
mndSetMsgHandle(pMnode, TDMT_VND_ALTER_STB_RSP, mndProcessVUpdateStbRsp);
|
mndSetMsgHandle(pMnode, TDMT_VND_ALTER_STB_RSP, mndProcessVAlterStbRsp);
|
||||||
mndSetMsgHandle(pMnode, TDMT_VND_DROP_STB_RSP, mndProcessVDropStbRsp);
|
mndSetMsgHandle(pMnode, TDMT_VND_DROP_STB_RSP, mndProcessVDropStbRsp);
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_STB_META, mndProcessStbMetaReq);
|
mndSetMsgHandle(pMnode, TDMT_MND_STB_META, mndProcessStbMetaReq);
|
||||||
|
|
||||||
|
@ -193,6 +193,8 @@ static int32_t mndStbActionInsert(SSdb *pSdb, SStbObj *pStb) {
|
||||||
|
|
||||||
static int32_t mndStbActionDelete(SSdb *pSdb, SStbObj *pStb) {
|
static int32_t mndStbActionDelete(SSdb *pSdb, SStbObj *pStb) {
|
||||||
mTrace("stb:%s, perform delete action, row:%p", pStb->name, pStb);
|
mTrace("stb:%s, perform delete action, row:%p", pStb->name, pStb);
|
||||||
|
tfree(pStb->pColumns);
|
||||||
|
tfree(pStb->pTags);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -202,10 +204,10 @@ static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOld, SStbObj *pNew) {
|
||||||
taosWLockLatch(&pOld->lock);
|
taosWLockLatch(&pOld->lock);
|
||||||
|
|
||||||
if (pOld->numOfColumns < pNew->numOfColumns) {
|
if (pOld->numOfColumns < pNew->numOfColumns) {
|
||||||
void *pSchema = malloc(pOld->numOfColumns * sizeof(SSchema));
|
void *pColumns = malloc(pNew->numOfColumns * sizeof(SSchema));
|
||||||
if (pSchema != NULL) {
|
if (pColumns != NULL) {
|
||||||
free(pOld->pColumns);
|
free(pOld->pColumns);
|
||||||
pOld->pColumns = pSchema;
|
pOld->pColumns = pColumns;
|
||||||
} else {
|
} else {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
mTrace("stb:%s, failed to perform update action since %s", pOld->name, terrstr());
|
mTrace("stb:%s, failed to perform update action since %s", pOld->name, terrstr());
|
||||||
|
@ -214,10 +216,10 @@ static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOld, SStbObj *pNew) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pOld->numOfTags < pNew->numOfTags) {
|
if (pOld->numOfTags < pNew->numOfTags) {
|
||||||
void *pSchema = malloc(pOld->numOfTags * sizeof(SSchema));
|
void *pTags = malloc(pNew->numOfTags * sizeof(SSchema));
|
||||||
if (pSchema != NULL) {
|
if (pTags != NULL) {
|
||||||
free(pOld->pTags);
|
free(pOld->pTags);
|
||||||
pOld->pTags = pSchema;
|
pOld->pTags = pTags;
|
||||||
} else {
|
} else {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
mTrace("stb:%s, failed to perform update action since %s", pOld->name, terrstr());
|
mTrace("stb:%s, failed to perform update action since %s", pOld->name, terrstr());
|
||||||
|
@ -575,11 +577,11 @@ static int32_t mndProcessVCreateStbRsp(SMnodeMsg *pRsp) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndCheckUpdateStbReq(SMUpdateStbReq *pUpdate) {
|
static int32_t mndCheckAlterStbReq(SMAltertbReq *pAlter) {
|
||||||
pUpdate->numOfSchemas = htonl(pUpdate->numOfSchemas);
|
pAlter->numOfSchemas = htonl(pAlter->numOfSchemas);
|
||||||
|
|
||||||
for (int32_t i = 0; i < pUpdate->numOfSchemas; ++i) {
|
for (int32_t i = 0; i < pAlter->numOfSchemas; ++i) {
|
||||||
SSchema *pSchema = &pUpdate->pSchemas[i];
|
SSchema *pSchema = &pAlter->pSchemas[i];
|
||||||
pSchema->colId = htonl(pSchema->colId);
|
pSchema->colId = htonl(pSchema->colId);
|
||||||
pSchema->bytes = htonl(pSchema->bytes);
|
pSchema->bytes = htonl(pSchema->bytes);
|
||||||
|
|
||||||
|
@ -696,8 +698,7 @@ static int32_t mndDropSuperTableTag(const SStbObj *pOld, SStbObj *pNew, const ch
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndUpdateStbTagName(const SStbObj *pOld, SStbObj *pNew, const char *oldTagName,
|
static int32_t mndAlterStbTagName(const SStbObj *pOld, SStbObj *pNew, const char *oldTagName, const char *newTagName) {
|
||||||
const char *newTagName) {
|
|
||||||
int32_t tag = mndFindSuperTableTagIndex(pOld, oldTagName);
|
int32_t tag = mndFindSuperTableTagIndex(pOld, oldTagName);
|
||||||
if (tag < 0) {
|
if (tag < 0) {
|
||||||
terrno = TSDB_CODE_MND_TAG_NOT_EXIST;
|
terrno = TSDB_CODE_MND_TAG_NOT_EXIST;
|
||||||
|
@ -727,7 +728,7 @@ static int32_t mndUpdateStbTagName(const SStbObj *pOld, SStbObj *pNew, const cha
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndUpdateStbTagBytes(const SStbObj *pOld, SStbObj *pNew, const SSchema *pSchema) {
|
static int32_t mndAlterStbTagBytes(const SStbObj *pOld, SStbObj *pNew, const SSchema *pSchema) {
|
||||||
int32_t tag = mndFindSuperTableTagIndex(pOld, pSchema->name);
|
int32_t tag = mndFindSuperTableTagIndex(pOld, pSchema->name);
|
||||||
if (tag < 0) {
|
if (tag < 0) {
|
||||||
terrno = TSDB_CODE_MND_TAG_NOT_EXIST;
|
terrno = TSDB_CODE_MND_TAG_NOT_EXIST;
|
||||||
|
@ -810,7 +811,7 @@ static int32_t mndDropSuperTableColumn(const SStbObj *pOld, SStbObj *pNew, const
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndUpdateStbColumnBytes(const SStbObj *pOld, SStbObj *pNew, const SSchema *pSchema) {
|
static int32_t mndAlterStbColumnBytes(const SStbObj *pOld, SStbObj *pNew, const SSchema *pSchema) {
|
||||||
int32_t col = mndFindSuperTableColumnIndex(pOld, pSchema->name);
|
int32_t col = mndFindSuperTableColumnIndex(pOld, pSchema->name);
|
||||||
if (col < 0) {
|
if (col < 0) {
|
||||||
terrno = TSDB_CODE_MND_COLUMN_NOT_EXIST;
|
terrno = TSDB_CODE_MND_COLUMN_NOT_EXIST;
|
||||||
|
@ -849,17 +850,16 @@ static int32_t mndUpdateStbColumnBytes(const SStbObj *pOld, SStbObj *pNew, const
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t mndSetAlterStbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
|
||||||
static int32_t mndSetUpdateStbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
|
|
||||||
SSdbRaw *pRedoRaw = mndStbActionEncode(pStb);
|
SSdbRaw *pRedoRaw = mndStbActionEncode(pStb);
|
||||||
if (pRedoRaw == NULL) return -1;
|
if (pRedoRaw == NULL) return -1;
|
||||||
if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1;
|
if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1;
|
||||||
if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING) != 0) return -1;
|
if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_UPDATING) != 0) return -1;
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndSetUpdateStbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
|
static int32_t mndSetAlterStbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
|
||||||
SSdbRaw *pCommitRaw = mndStbActionEncode(pStb);
|
SSdbRaw *pCommitRaw = mndStbActionEncode(pStb);
|
||||||
if (pCommitRaw == NULL) return -1;
|
if (pCommitRaw == NULL) return -1;
|
||||||
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
|
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
|
||||||
|
@ -868,8 +868,7 @@ static int32_t mndSetUpdateStbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t mndSetAlterStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
|
||||||
static int32_t mndSetUpdateStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
|
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
SVgObj *pVgroup = NULL;
|
SVgObj *pVgroup = NULL;
|
||||||
void *pIter = NULL;
|
void *pIter = NULL;
|
||||||
|
@ -892,7 +891,7 @@ static int32_t mndSetUpdateStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
|
||||||
action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
|
action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
|
||||||
action.pCont = pReq;
|
action.pCont = pReq;
|
||||||
action.contLen = contLen;
|
action.contLen = contLen;
|
||||||
action.msgType = TDMT_VND_CREATE_STB;
|
action.msgType = TDMT_VND_ALTER_STB;
|
||||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||||
free(pReq);
|
free(pReq);
|
||||||
sdbCancelFetch(pSdb, pIter);
|
sdbCancelFetch(pSdb, pIter);
|
||||||
|
@ -905,7 +904,7 @@ static int32_t mndSetUpdateStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndUpdateStb(SMnode *pMnode, SMnodeMsg *pReq, const SMUpdateStbReq *pUpdate, SDbObj *pDb, SStbObj *pOld) {
|
static int32_t mndAlterStb(SMnode *pMnode, SMnodeMsg *pReq, const SMAltertbReq *pAlter, SDbObj *pDb, SStbObj *pOld) {
|
||||||
SStbObj stbObj = {0};
|
SStbObj stbObj = {0};
|
||||||
taosRLockLatch(&pOld->lock);
|
taosRLockLatch(&pOld->lock);
|
||||||
memcpy(&stbObj, pOld, sizeof(SStbObj));
|
memcpy(&stbObj, pOld, sizeof(SStbObj));
|
||||||
|
@ -916,93 +915,93 @@ static int32_t mndUpdateStb(SMnode *pMnode, SMnodeMsg *pReq, const SMUpdateStbRe
|
||||||
|
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
|
|
||||||
switch (pUpdate->updateType) {
|
switch (pAlter->alterType) {
|
||||||
case TSDB_ALTER_TABLE_ADD_TAG_COLUMN:
|
case TSDB_ALTER_TABLE_ADD_TAG_COLUMN:
|
||||||
code = mndAddSuperTableTag(pOld, &stbObj, pUpdate->pSchemas, 1);
|
code = mndAddSuperTableTag(pOld, &stbObj, pAlter->pSchemas, 1);
|
||||||
break;
|
break;
|
||||||
case TSDB_ALTER_TABLE_DROP_TAG_COLUMN:
|
case TSDB_ALTER_TABLE_DROP_TAG_COLUMN:
|
||||||
code = mndDropSuperTableTag(pOld, &stbObj, pUpdate->pSchemas[0].name);
|
code = mndDropSuperTableTag(pOld, &stbObj, pAlter->pSchemas[0].name);
|
||||||
break;
|
break;
|
||||||
case TSDB_ALTER_TABLE_UPDATE_TAG_NAME:
|
case TSDB_ALTER_TABLE_UPDATE_TAG_NAME:
|
||||||
code = mndUpdateStbTagName(pOld, &stbObj, pUpdate->pSchemas[0].name, pUpdate->pSchemas[1].name);
|
code = mndAlterStbTagName(pOld, &stbObj, pAlter->pSchemas[0].name, pAlter->pSchemas[1].name);
|
||||||
break;
|
break;
|
||||||
case TSDB_ALTER_TABLE_UPDATE_TAG_BYTES:
|
case TSDB_ALTER_TABLE_UPDATE_TAG_BYTES:
|
||||||
code = mndUpdateStbTagBytes(pOld, &stbObj, &pUpdate->pSchemas[0]);
|
code = mndAlterStbTagBytes(pOld, &stbObj, &pAlter->pSchemas[0]);
|
||||||
break;
|
break;
|
||||||
case TSDB_ALTER_TABLE_ADD_COLUMN:
|
case TSDB_ALTER_TABLE_ADD_COLUMN:
|
||||||
code = mndAddSuperTableColumn(pOld, &stbObj, pUpdate->pSchemas, 1);
|
code = mndAddSuperTableColumn(pOld, &stbObj, pAlter->pSchemas, 1);
|
||||||
break;
|
break;
|
||||||
case TSDB_ALTER_TABLE_DROP_COLUMN:
|
case TSDB_ALTER_TABLE_DROP_COLUMN:
|
||||||
code = mndDropSuperTableColumn(pOld, &stbObj, pUpdate->pSchemas[0].name);
|
code = mndDropSuperTableColumn(pOld, &stbObj, pAlter->pSchemas[0].name);
|
||||||
break;
|
break;
|
||||||
case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES:
|
case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES:
|
||||||
code = mndUpdateStbColumnBytes(pOld, &stbObj, &pUpdate->pSchemas[0]);
|
code = mndAlterStbColumnBytes(pOld, &stbObj, &pAlter->pSchemas[0]);
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
|
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (code != 0) goto UPDATE_STB_OVER;
|
if (code != 0) goto ALTER_STB_OVER;
|
||||||
|
|
||||||
code = -1;
|
code = -1;
|
||||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pReq->rpcMsg);
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pReq->rpcMsg);
|
||||||
if (pTrans == NULL) goto UPDATE_STB_OVER;
|
if (pTrans == NULL) goto ALTER_STB_OVER;
|
||||||
|
|
||||||
mDebug("trans:%d, used to update stb:%s", pTrans->id, pUpdate->name);
|
mDebug("trans:%d, used to alter stb:%s", pTrans->id, pAlter->name);
|
||||||
|
|
||||||
if (mndSetUpdateStbRedoLogs(pMnode, pTrans, pDb, &stbObj) != 0) goto UPDATE_STB_OVER;
|
if (mndSetAlterStbRedoLogs(pMnode, pTrans, pDb, &stbObj) != 0) goto ALTER_STB_OVER;
|
||||||
if (mndSetUpdateStbCommitLogs(pMnode, pTrans, pDb, &stbObj) != 0) goto UPDATE_STB_OVER;
|
if (mndSetAlterStbCommitLogs(pMnode, pTrans, pDb, &stbObj) != 0) goto ALTER_STB_OVER;
|
||||||
if (mndSetUpdateStbRedoActions(pMnode, pTrans, pDb, &stbObj) != 0) goto UPDATE_STB_OVER;
|
if (mndSetAlterStbRedoActions(pMnode, pTrans, pDb, &stbObj) != 0) goto ALTER_STB_OVER;
|
||||||
if (mndTransPrepare(pMnode, pTrans) != 0) goto UPDATE_STB_OVER;
|
if (mndTransPrepare(pMnode, pTrans) != 0) goto ALTER_STB_OVER;
|
||||||
|
|
||||||
code = 0;
|
code = 0;
|
||||||
|
|
||||||
UPDATE_STB_OVER:
|
ALTER_STB_OVER:
|
||||||
mndTransDrop(pTrans);
|
mndTransDrop(pTrans);
|
||||||
tfree(stbObj.pTags);
|
tfree(stbObj.pTags);
|
||||||
tfree(stbObj.pColumns);
|
tfree(stbObj.pColumns);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndProcessMUpdateStbReq(SMnodeMsg *pReq) {
|
static int32_t mndProcessMAlterStbReq(SMnodeMsg *pReq) {
|
||||||
SMnode *pMnode = pReq->pMnode;
|
SMnode *pMnode = pReq->pMnode;
|
||||||
SMUpdateStbReq *pUpdate = pReq->rpcMsg.pCont;
|
SMAltertbReq *pAlter = pReq->rpcMsg.pCont;
|
||||||
|
|
||||||
mDebug("stb:%s, start to update", pUpdate->name);
|
mDebug("stb:%s, start to alter", pAlter->name);
|
||||||
|
|
||||||
if (mndCheckUpdateStbReq(pUpdate) != 0) {
|
if (mndCheckAlterStbReq(pAlter) != 0) {
|
||||||
mError("stb:%s, failed to update since %s", pUpdate->name, terrstr());
|
mError("stb:%s, failed to alter since %s", pAlter->name, terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
SStbObj *pStb = mndAcquireStb(pMnode, pUpdate->name);
|
SStbObj *pStb = mndAcquireStb(pMnode, pAlter->name);
|
||||||
if (pStb == NULL) {
|
if (pStb == NULL) {
|
||||||
terrno = TSDB_CODE_MND_STB_NOT_EXIST;
|
terrno = TSDB_CODE_MND_STB_NOT_EXIST;
|
||||||
mError("stb:%s, failed to update since %s", pUpdate->name, terrstr());
|
mError("stb:%s, failed to alter since %s", pAlter->name, terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
SDbObj *pDb = mndAcquireDbByStb(pMnode, pUpdate->name);
|
SDbObj *pDb = mndAcquireDbByStb(pMnode, pAlter->name);
|
||||||
if (pDb == NULL) {
|
if (pDb == NULL) {
|
||||||
mndReleaseStb(pMnode, pStb);
|
mndReleaseStb(pMnode, pStb);
|
||||||
terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
|
terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
|
||||||
mError("stb:%s, failed to update since %s", pUpdate->name, terrstr());
|
mError("stb:%s, failed to alter since %s", pAlter->name, terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t code = mndUpdateStb(pMnode, pReq, pUpdate, pDb, pStb);
|
int32_t code = mndAlterStb(pMnode, pReq, pAlter, pDb, pStb);
|
||||||
mndReleaseStb(pMnode, pStb);
|
mndReleaseStb(pMnode, pStb);
|
||||||
|
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
mError("stb:%s, failed to update since %s", pUpdate->name, tstrerror(code));
|
mError("stb:%s, failed to alter since %s", pAlter->name, tstrerror(code));
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndProcessVUpdateStbRsp(SMnodeMsg *pRsp) {
|
static int32_t mndProcessVAlterStbRsp(SMnodeMsg *pRsp) {
|
||||||
mndTransProcessRsp(pRsp);
|
mndTransProcessRsp(pRsp);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,213 +21,289 @@ class MndTestStb : public ::testing::Test {
|
||||||
public:
|
public:
|
||||||
void SetUp() override {}
|
void SetUp() override {}
|
||||||
void TearDown() override {}
|
void TearDown() override {}
|
||||||
|
|
||||||
|
SCreateDbReq* BuildCreateDbReq(const char* dbname, int32_t* pContLen);
|
||||||
|
SMCreateStbReq* BuildCreateStbReq(const char* stbname, int32_t* pContLen);
|
||||||
|
SMAltertbReq* BuildAlterStbAddTagReq(const char* stbname, int32_t* pContLen);
|
||||||
};
|
};
|
||||||
|
|
||||||
Testbase MndTestStb::test;
|
Testbase MndTestStb::test;
|
||||||
|
|
||||||
TEST_F(MndTestStb, 01_Create_Show_Meta_Drop_Restart_Stb) {
|
SCreateDbReq* MndTestStb::BuildCreateDbReq(const char* dbname, int32_t* pContLen) {
|
||||||
{
|
int32_t contLen = sizeof(SCreateDbReq);
|
||||||
int32_t contLen = sizeof(SCreateDbReq);
|
|
||||||
|
|
||||||
SCreateDbReq* pReq = (SCreateDbReq*)rpcMallocCont(contLen);
|
SCreateDbReq* pReq = (SCreateDbReq*)rpcMallocCont(contLen);
|
||||||
strcpy(pReq->db, "1.d1");
|
strcpy(pReq->db, dbname);
|
||||||
pReq->numOfVgroups = htonl(2);
|
pReq->numOfVgroups = htonl(2);
|
||||||
pReq->cacheBlockSize = htonl(16);
|
pReq->cacheBlockSize = htonl(16);
|
||||||
pReq->totalBlocks = htonl(10);
|
pReq->totalBlocks = htonl(10);
|
||||||
pReq->daysPerFile = htonl(10);
|
pReq->daysPerFile = htonl(10);
|
||||||
pReq->daysToKeep0 = htonl(3650);
|
pReq->daysToKeep0 = htonl(3650);
|
||||||
pReq->daysToKeep1 = htonl(3650);
|
pReq->daysToKeep1 = htonl(3650);
|
||||||
pReq->daysToKeep2 = htonl(3650);
|
pReq->daysToKeep2 = htonl(3650);
|
||||||
pReq->minRows = htonl(100);
|
pReq->minRows = htonl(100);
|
||||||
pReq->maxRows = htonl(4096);
|
pReq->maxRows = htonl(4096);
|
||||||
pReq->commitTime = htonl(3600);
|
pReq->commitTime = htonl(3600);
|
||||||
pReq->fsyncPeriod = htonl(3000);
|
pReq->fsyncPeriod = htonl(3000);
|
||||||
pReq->walLevel = 1;
|
pReq->walLevel = 1;
|
||||||
pReq->precision = 0;
|
pReq->precision = 0;
|
||||||
pReq->compression = 2;
|
pReq->compression = 2;
|
||||||
pReq->replications = 1;
|
pReq->replications = 1;
|
||||||
pReq->quorum = 1;
|
pReq->quorum = 1;
|
||||||
pReq->update = 0;
|
pReq->update = 0;
|
||||||
pReq->cacheLastRow = 0;
|
pReq->cacheLastRow = 0;
|
||||||
pReq->ignoreExist = 1;
|
pReq->ignoreExist = 1;
|
||||||
|
|
||||||
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_DB, pReq, contLen);
|
*pContLen = contLen;
|
||||||
ASSERT_NE(pRsp, nullptr);
|
return pReq;
|
||||||
ASSERT_EQ(pRsp->code, 0);
|
}
|
||||||
}
|
|
||||||
|
SMCreateStbReq* MndTestStb::BuildCreateStbReq(const char *stbname, int32_t* pContLen) {
|
||||||
{
|
int32_t cols = 2;
|
||||||
int32_t cols = 2;
|
int32_t tags = 3;
|
||||||
int32_t tags = 3;
|
int32_t contLen = (tags + cols) * sizeof(SSchema) + sizeof(SMCreateStbReq);
|
||||||
int32_t contLen = (tags + cols) * sizeof(SSchema) + sizeof(SMCreateStbReq);
|
|
||||||
|
SMCreateStbReq* pReq = (SMCreateStbReq*)rpcMallocCont(contLen);
|
||||||
SMCreateStbReq* pReq = (SMCreateStbReq*)rpcMallocCont(contLen);
|
strcpy(pReq->name, stbname);
|
||||||
strcpy(pReq->name, "1.d1.stb");
|
pReq->numOfTags = htonl(tags);
|
||||||
pReq->numOfTags = htonl(tags);
|
pReq->numOfColumns = htonl(cols);
|
||||||
pReq->numOfColumns = htonl(cols);
|
|
||||||
|
{
|
||||||
{
|
SSchema* pSchema = &pReq->pSchemas[0];
|
||||||
SSchema* pSchema = &pReq->pSchemas[0];
|
pSchema->bytes = htonl(8);
|
||||||
pSchema->bytes = htonl(8);
|
pSchema->type = TSDB_DATA_TYPE_TIMESTAMP;
|
||||||
pSchema->type = TSDB_DATA_TYPE_TIMESTAMP;
|
strcpy(pSchema->name, "ts");
|
||||||
strcpy(pSchema->name, "ts");
|
}
|
||||||
}
|
|
||||||
|
{
|
||||||
{
|
SSchema* pSchema = &pReq->pSchemas[1];
|
||||||
SSchema* pSchema = &pReq->pSchemas[1];
|
pSchema->bytes = htonl(4);
|
||||||
pSchema->bytes = htonl(4);
|
pSchema->type = TSDB_DATA_TYPE_INT;
|
||||||
pSchema->type = TSDB_DATA_TYPE_INT;
|
strcpy(pSchema->name, "col1");
|
||||||
strcpy(pSchema->name, "col1");
|
}
|
||||||
}
|
|
||||||
|
{
|
||||||
{
|
SSchema* pSchema = &pReq->pSchemas[2];
|
||||||
SSchema* pSchema = &pReq->pSchemas[2];
|
pSchema->bytes = htonl(2);
|
||||||
pSchema->bytes = htonl(2);
|
pSchema->type = TSDB_DATA_TYPE_TINYINT;
|
||||||
pSchema->type = TSDB_DATA_TYPE_TINYINT;
|
strcpy(pSchema->name, "tag1");
|
||||||
strcpy(pSchema->name, "tag1");
|
}
|
||||||
}
|
|
||||||
|
{
|
||||||
{
|
SSchema* pSchema = &pReq->pSchemas[3];
|
||||||
SSchema* pSchema = &pReq->pSchemas[3];
|
pSchema->bytes = htonl(8);
|
||||||
pSchema->bytes = htonl(8);
|
pSchema->type = TSDB_DATA_TYPE_BIGINT;
|
||||||
pSchema->type = TSDB_DATA_TYPE_BIGINT;
|
strcpy(pSchema->name, "tag2");
|
||||||
strcpy(pSchema->name, "tag2");
|
}
|
||||||
}
|
|
||||||
|
{
|
||||||
{
|
SSchema* pSchema = &pReq->pSchemas[4];
|
||||||
SSchema* pSchema = &pReq->pSchemas[4];
|
pSchema->bytes = htonl(16);
|
||||||
pSchema->bytes = htonl(16);
|
pSchema->type = TSDB_DATA_TYPE_BINARY;
|
||||||
pSchema->type = TSDB_DATA_TYPE_BINARY;
|
strcpy(pSchema->name, "tag3");
|
||||||
strcpy(pSchema->name, "tag3");
|
}
|
||||||
}
|
|
||||||
|
*pContLen = contLen;
|
||||||
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_STB, pReq, contLen);
|
return pReq;
|
||||||
ASSERT_NE(pRsp, nullptr);
|
}
|
||||||
ASSERT_EQ(pRsp->code, 0);
|
|
||||||
}
|
// TEST_F(MndTestStb, 01_Create_Show_Meta_Drop_Restart_Stb) {
|
||||||
|
// const char *dbname = "1.d1";
|
||||||
test.SendShowMetaReq(TSDB_MGMT_TABLE_STB, "1.d1");
|
// const char *stbname = "1.d1.stb";
|
||||||
CHECK_META("show stables", 4);
|
|
||||||
|
// {
|
||||||
CHECK_SCHEMA(0, TSDB_DATA_TYPE_BINARY, TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE, "name");
|
// int32_t contLen = 0;
|
||||||
CHECK_SCHEMA(1, TSDB_DATA_TYPE_TIMESTAMP, 8, "create_time");
|
// SCreateDbReq* pReq = BuildCreateDbReq(dbname, &contLen);
|
||||||
CHECK_SCHEMA(2, TSDB_DATA_TYPE_INT, 4, "columns");
|
// SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_DB, pReq, contLen);
|
||||||
CHECK_SCHEMA(3, TSDB_DATA_TYPE_INT, 4, "tags");
|
// ASSERT_NE(pRsp, nullptr);
|
||||||
|
// ASSERT_EQ(pRsp->code, 0);
|
||||||
test.SendShowRetrieveReq();
|
// }
|
||||||
EXPECT_EQ(test.GetShowRows(), 1);
|
|
||||||
CheckBinary("stb", TSDB_TABLE_NAME_LEN);
|
// {
|
||||||
CheckTimestamp();
|
// int32_t contLen = 0;
|
||||||
CheckInt32(2);
|
// SMCreateStbReq* pReq = BuildCreateStbReq(stbname, &contLen);
|
||||||
CheckInt32(3);
|
// SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_STB, pReq, contLen);
|
||||||
|
// ASSERT_NE(pRsp, nullptr);
|
||||||
// ----- meta ------
|
// ASSERT_EQ(pRsp->code, 0);
|
||||||
{
|
// }
|
||||||
int32_t contLen = sizeof(STableInfoReq);
|
|
||||||
|
// {
|
||||||
STableInfoReq* pReq = (STableInfoReq*)rpcMallocCont(contLen);
|
// test.SendShowMetaReq(TSDB_MGMT_TABLE_STB, dbname);
|
||||||
strcpy(pReq->dbFName, "1.d1");
|
// CHECK_META("show stables", 4);
|
||||||
strcpy(pReq->tbName, "stb");
|
// CHECK_SCHEMA(0, TSDB_DATA_TYPE_BINARY, TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE, "name");
|
||||||
|
// CHECK_SCHEMA(1, TSDB_DATA_TYPE_TIMESTAMP, 8, "create_time");
|
||||||
SRpcMsg* pMsg = test.SendReq(TDMT_MND_STB_META, pReq, contLen);
|
// CHECK_SCHEMA(2, TSDB_DATA_TYPE_INT, 4, "columns");
|
||||||
ASSERT_NE(pMsg, nullptr);
|
// CHECK_SCHEMA(3, TSDB_DATA_TYPE_INT, 4, "tags");
|
||||||
ASSERT_EQ(pMsg->code, 0);
|
|
||||||
|
// test.SendShowRetrieveReq();
|
||||||
STableMetaRsp* pRsp = (STableMetaRsp*)pMsg->pCont;
|
// EXPECT_EQ(test.GetShowRows(), 1);
|
||||||
pRsp->numOfTags = htonl(pRsp->numOfTags);
|
// CheckBinary("stb", TSDB_TABLE_NAME_LEN);
|
||||||
pRsp->numOfColumns = htonl(pRsp->numOfColumns);
|
// CheckTimestamp();
|
||||||
pRsp->sversion = htonl(pRsp->sversion);
|
// CheckInt32(2);
|
||||||
pRsp->tversion = htonl(pRsp->tversion);
|
// CheckInt32(3);
|
||||||
pRsp->suid = be64toh(pRsp->suid);
|
// }
|
||||||
pRsp->tuid = be64toh(pRsp->tuid);
|
|
||||||
pRsp->vgId = be64toh(pRsp->vgId);
|
// // ----- meta ------
|
||||||
for (int32_t i = 0; i < pRsp->numOfTags + pRsp->numOfColumns; ++i) {
|
// {
|
||||||
SSchema* pSchema = &pRsp->pSchema[i];
|
// int32_t contLen = sizeof(STableInfoReq);
|
||||||
pSchema->colId = htonl(pSchema->colId);
|
// STableInfoReq* pReq = (STableInfoReq*)rpcMallocCont(contLen);
|
||||||
pSchema->bytes = htonl(pSchema->bytes);
|
// strcpy(pReq->dbFName, dbname);
|
||||||
}
|
// strcpy(pReq->tbName, "stb");
|
||||||
|
|
||||||
EXPECT_STREQ(pRsp->dbFName, "1.d1");
|
// SRpcMsg* pMsg = test.SendReq(TDMT_MND_STB_META, pReq, contLen);
|
||||||
EXPECT_STREQ(pRsp->tbName, "stb");
|
// ASSERT_NE(pMsg, nullptr);
|
||||||
EXPECT_STREQ(pRsp->stbName, "stb");
|
// ASSERT_EQ(pMsg->code, 0);
|
||||||
EXPECT_EQ(pRsp->numOfColumns, 2);
|
|
||||||
EXPECT_EQ(pRsp->numOfTags, 3);
|
// STableMetaRsp* pRsp = (STableMetaRsp*)pMsg->pCont;
|
||||||
EXPECT_EQ(pRsp->precision, TSDB_TIME_PRECISION_MILLI);
|
// pRsp->numOfTags = htonl(pRsp->numOfTags);
|
||||||
EXPECT_EQ(pRsp->tableType, TSDB_SUPER_TABLE);
|
// pRsp->numOfColumns = htonl(pRsp->numOfColumns);
|
||||||
EXPECT_EQ(pRsp->update, 0);
|
// pRsp->sversion = htonl(pRsp->sversion);
|
||||||
EXPECT_EQ(pRsp->sversion, 1);
|
// pRsp->tversion = htonl(pRsp->tversion);
|
||||||
EXPECT_EQ(pRsp->tversion, 0);
|
// pRsp->suid = be64toh(pRsp->suid);
|
||||||
EXPECT_GT(pRsp->suid, 0);
|
// pRsp->tuid = be64toh(pRsp->tuid);
|
||||||
EXPECT_GT(pRsp->tuid, 0);
|
// pRsp->vgId = be64toh(pRsp->vgId);
|
||||||
EXPECT_EQ(pRsp->vgId, 0);
|
// for (int32_t i = 0; i < pRsp->numOfTags + pRsp->numOfColumns; ++i) {
|
||||||
|
// SSchema* pSchema = &pRsp->pSchema[i];
|
||||||
{
|
// pSchema->colId = htonl(pSchema->colId);
|
||||||
SSchema* pSchema = &pRsp->pSchema[0];
|
// pSchema->bytes = htonl(pSchema->bytes);
|
||||||
EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_TIMESTAMP);
|
// }
|
||||||
EXPECT_EQ(pSchema->colId, 1);
|
|
||||||
EXPECT_EQ(pSchema->bytes, 8);
|
// EXPECT_STREQ(pRsp->dbFName, dbname);
|
||||||
EXPECT_STREQ(pSchema->name, "ts");
|
// EXPECT_STREQ(pRsp->tbName, "stb");
|
||||||
}
|
// EXPECT_STREQ(pRsp->stbName, "stb");
|
||||||
|
// EXPECT_EQ(pRsp->numOfColumns, 2);
|
||||||
{
|
// EXPECT_EQ(pRsp->numOfTags, 3);
|
||||||
SSchema* pSchema = &pRsp->pSchema[1];
|
// EXPECT_EQ(pRsp->precision, TSDB_TIME_PRECISION_MILLI);
|
||||||
EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_INT);
|
// EXPECT_EQ(pRsp->tableType, TSDB_SUPER_TABLE);
|
||||||
EXPECT_EQ(pSchema->colId, 2);
|
// EXPECT_EQ(pRsp->update, 0);
|
||||||
EXPECT_EQ(pSchema->bytes, 4);
|
// EXPECT_EQ(pRsp->sversion, 1);
|
||||||
EXPECT_STREQ(pSchema->name, "col1");
|
// EXPECT_EQ(pRsp->tversion, 0);
|
||||||
}
|
// EXPECT_GT(pRsp->suid, 0);
|
||||||
|
// EXPECT_GT(pRsp->tuid, 0);
|
||||||
{
|
// EXPECT_EQ(pRsp->vgId, 0);
|
||||||
SSchema* pSchema = &pRsp->pSchema[2];
|
|
||||||
EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_TINYINT);
|
// {
|
||||||
EXPECT_EQ(pSchema->colId, 3);
|
// SSchema* pSchema = &pRsp->pSchema[0];
|
||||||
EXPECT_EQ(pSchema->bytes, 2);
|
// EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_TIMESTAMP);
|
||||||
EXPECT_STREQ(pSchema->name, "tag1");
|
// EXPECT_EQ(pSchema->colId, 1);
|
||||||
}
|
// EXPECT_EQ(pSchema->bytes, 8);
|
||||||
|
// EXPECT_STREQ(pSchema->name, "ts");
|
||||||
{
|
// }
|
||||||
SSchema* pSchema = &pRsp->pSchema[3];
|
|
||||||
EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_BIGINT);
|
// {
|
||||||
EXPECT_EQ(pSchema->colId, 4);
|
// SSchema* pSchema = &pRsp->pSchema[1];
|
||||||
EXPECT_EQ(pSchema->bytes, 8);
|
// EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_INT);
|
||||||
EXPECT_STREQ(pSchema->name, "tag2");
|
// EXPECT_EQ(pSchema->colId, 2);
|
||||||
}
|
// EXPECT_EQ(pSchema->bytes, 4);
|
||||||
|
// EXPECT_STREQ(pSchema->name, "col1");
|
||||||
{
|
// }
|
||||||
SSchema* pSchema = &pRsp->pSchema[4];
|
|
||||||
EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_BINARY);
|
// {
|
||||||
EXPECT_EQ(pSchema->colId, 5);
|
// SSchema* pSchema = &pRsp->pSchema[2];
|
||||||
EXPECT_EQ(pSchema->bytes, 16);
|
// EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_TINYINT);
|
||||||
EXPECT_STREQ(pSchema->name, "tag3");
|
// EXPECT_EQ(pSchema->colId, 3);
|
||||||
}
|
// EXPECT_EQ(pSchema->bytes, 2);
|
||||||
}
|
// EXPECT_STREQ(pSchema->name, "tag1");
|
||||||
|
// }
|
||||||
// restart
|
|
||||||
test.Restart();
|
// {
|
||||||
|
// SSchema* pSchema = &pRsp->pSchema[3];
|
||||||
test.SendShowMetaReq(TSDB_MGMT_TABLE_STB, "1.d1");
|
// EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_BIGINT);
|
||||||
CHECK_META("show stables", 4);
|
// EXPECT_EQ(pSchema->colId, 4);
|
||||||
test.SendShowRetrieveReq();
|
// EXPECT_EQ(pSchema->bytes, 8);
|
||||||
EXPECT_EQ(test.GetShowRows(), 1);
|
// EXPECT_STREQ(pSchema->name, "tag2");
|
||||||
|
// }
|
||||||
CheckBinary("stb", TSDB_TABLE_NAME_LEN);
|
|
||||||
CheckTimestamp();
|
// {
|
||||||
CheckInt32(2);
|
// SSchema* pSchema = &pRsp->pSchema[4];
|
||||||
CheckInt32(3);
|
// EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_BINARY);
|
||||||
|
// EXPECT_EQ(pSchema->colId, 5);
|
||||||
{
|
// EXPECT_EQ(pSchema->bytes, 16);
|
||||||
int32_t contLen = sizeof(SMDropStbReq);
|
// EXPECT_STREQ(pSchema->name, "tag3");
|
||||||
|
// }
|
||||||
SMDropStbReq* pReq = (SMDropStbReq*)rpcMallocCont(contLen);
|
// }
|
||||||
strcpy(pReq->name, "1.d1.stb");
|
|
||||||
|
// // restart
|
||||||
SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_STB, pReq, contLen);
|
// test.Restart();
|
||||||
ASSERT_NE(pRsp, nullptr);
|
|
||||||
ASSERT_EQ(pRsp->code, 0);
|
// {
|
||||||
}
|
// test.SendShowMetaReq(TSDB_MGMT_TABLE_STB, dbname);
|
||||||
|
// CHECK_META("show stables", 4);
|
||||||
test.SendShowMetaReq(TSDB_MGMT_TABLE_STB, "1.d1");
|
// test.SendShowRetrieveReq();
|
||||||
CHECK_META("show stables", 4);
|
// EXPECT_EQ(test.GetShowRows(), 1);
|
||||||
test.SendShowRetrieveReq();
|
|
||||||
EXPECT_EQ(test.GetShowRows(), 0);
|
// CheckBinary("stb", TSDB_TABLE_NAME_LEN);
|
||||||
|
// CheckTimestamp();
|
||||||
|
// CheckInt32(2);
|
||||||
|
// CheckInt32(3);
|
||||||
|
// }
|
||||||
|
|
||||||
|
// {
|
||||||
|
// int32_t contLen = sizeof(SMDropStbReq);
|
||||||
|
|
||||||
|
// SMDropStbReq* pReq = (SMDropStbReq*)rpcMallocCont(contLen);
|
||||||
|
// strcpy(pReq->name, stbname);
|
||||||
|
|
||||||
|
// SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_STB, pReq, contLen);
|
||||||
|
// ASSERT_NE(pRsp, nullptr);
|
||||||
|
// ASSERT_EQ(pRsp->code, 0);
|
||||||
|
// }
|
||||||
|
|
||||||
|
// test.SendShowMetaReq(TSDB_MGMT_TABLE_STB, dbname);
|
||||||
|
// CHECK_META("show stables", 4);
|
||||||
|
// test.SendShowRetrieveReq();
|
||||||
|
// EXPECT_EQ(test.GetShowRows(), 0);
|
||||||
|
// }
|
||||||
|
|
||||||
|
SMAltertbReq* MndTestStb::BuildAlterStbAddTagReq(const char* stbname, int32_t* pContLen) {
|
||||||
|
int32_t contLen = sizeof(SMAltertbReq) + sizeof(SSchema);
|
||||||
|
SMAltertbReq* pReq = (SMAltertbReq*)rpcMallocCont(contLen);
|
||||||
|
strcpy(pReq->name, stbname);
|
||||||
|
pReq->numOfSchemas = htonl(1);
|
||||||
|
pReq->alterType = TSDB_ALTER_TABLE_ADD_TAG_COLUMN;
|
||||||
|
|
||||||
|
SSchema* pSchema = &pReq->pSchemas[0];
|
||||||
|
pSchema->bytes = htonl(4);
|
||||||
|
pSchema->type = TSDB_DATA_TYPE_INT;
|
||||||
|
strcpy(pSchema->name, "tag4");
|
||||||
|
|
||||||
|
*pContLen = contLen;
|
||||||
|
return pReq;
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(MndTestStb, 01_Alter_Stb) {
|
||||||
|
const char *dbname = "1.d2";
|
||||||
|
const char *stbname = "1.d2.stb";
|
||||||
|
|
||||||
|
{
|
||||||
|
int32_t contLen = 0;
|
||||||
|
SCreateDbReq* pReq = BuildCreateDbReq(dbname, &contLen);
|
||||||
|
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_DB, pReq, contLen);
|
||||||
|
ASSERT_NE(pRsp, nullptr);
|
||||||
|
ASSERT_EQ(pRsp->code, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
int32_t contLen = 0;
|
||||||
|
SMCreateStbReq* pReq = BuildCreateStbReq(stbname, &contLen);
|
||||||
|
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_STB, pReq, contLen);
|
||||||
|
ASSERT_NE(pRsp, nullptr);
|
||||||
|
ASSERT_EQ(pRsp->code, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
int32_t contLen = 0;
|
||||||
|
SMAltertbReq* pReq = BuildAlterStbAddTagReq(stbname, &contLen);
|
||||||
|
SRpcMsg* pRsp = test.SendReq(TDMT_MND_ALTER_STB, pReq, contLen);
|
||||||
|
ASSERT_NE(pRsp, nullptr);
|
||||||
|
ASSERT_EQ(pRsp->code, 0);
|
||||||
|
|
||||||
|
test.SendShowMetaReq(TSDB_MGMT_TABLE_STB, dbname);
|
||||||
|
test.SendShowRetrieveReq();
|
||||||
|
EXPECT_EQ(test.GetShowRows(), 1);
|
||||||
|
CheckBinary("stb", TSDB_TABLE_NAME_LEN);
|
||||||
|
CheckTimestamp();
|
||||||
|
CheckInt32(2);
|
||||||
|
CheckInt32(4);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -83,7 +83,7 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||||
free(vCreateTbReq.name);
|
free(vCreateTbReq.name);
|
||||||
break;
|
break;
|
||||||
case TDMT_VND_CREATE_TABLE:
|
case TDMT_VND_CREATE_TABLE:
|
||||||
tSVCreateTbBatchReqDeserialize(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vCreateTbBatchReq);
|
tDeserializeSVCreateTbBatchReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vCreateTbBatchReq);
|
||||||
for (int i = 0; i < taosArrayGetSize(vCreateTbBatchReq.pArray); i++) {
|
for (int i = 0; i < taosArrayGetSize(vCreateTbBatchReq.pArray); i++) {
|
||||||
SVCreateTbReq *pCreateTbReq = taosArrayGet(vCreateTbBatchReq.pArray, i);
|
SVCreateTbReq *pCreateTbReq = taosArrayGet(vCreateTbBatchReq.pArray, i);
|
||||||
if (metaCreateTable(pVnode->pMeta, pCreateTbReq) < 0) {
|
if (metaCreateTable(pVnode->pMeta, pCreateTbReq) < 0) {
|
||||||
|
@ -106,7 +106,11 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case TDMT_VND_ALTER_STB:
|
case TDMT_VND_ALTER_STB:
|
||||||
vTrace("vgId:%d, process drop stb req", pVnode->vgId);
|
vTrace("vgId:%d, process alter stb req", pVnode->vgId);
|
||||||
|
tDeserializeSVCreateTbReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vCreateTbReq);
|
||||||
|
free(vCreateTbReq.stbCfg.pSchema);
|
||||||
|
free(vCreateTbReq.stbCfg.pTagSchema);
|
||||||
|
free(vCreateTbReq.name);
|
||||||
break;
|
break;
|
||||||
case TDMT_VND_DROP_STB:
|
case TDMT_VND_DROP_STB:
|
||||||
vTrace("vgId:%d, process drop stb req", pVnode->vgId);
|
vTrace("vgId:%d, process drop stb req", pVnode->vgId);
|
||||||
|
|
|
@ -598,7 +598,7 @@ static int32_t doCheckAndBuildCreateCTableReq(SCreateTableSql* pCreateTable, SPa
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t serializeVgroupTablesBatchImpl(SVgroupTablesBatch* pTbBatch, SArray* pBufArray) {
|
static int32_t serializeVgroupTablesBatchImpl(SVgroupTablesBatch* pTbBatch, SArray* pBufArray) {
|
||||||
int tlen = sizeof(SMsgHead) + tSVCreateTbBatchReqSerialize(NULL, &(pTbBatch->req));
|
int tlen = sizeof(SMsgHead) + tSerializeSVCreateTbBatchReq(NULL, &(pTbBatch->req));
|
||||||
void* buf = malloc(tlen);
|
void* buf = malloc(tlen);
|
||||||
if (buf == NULL) {
|
if (buf == NULL) {
|
||||||
// TODO: handle error
|
// TODO: handle error
|
||||||
|
@ -608,7 +608,7 @@ static int32_t serializeVgroupTablesBatchImpl(SVgroupTablesBatch* pTbBatch, SArr
|
||||||
((SMsgHead*)buf)->contLen = htonl(tlen);
|
((SMsgHead*)buf)->contLen = htonl(tlen);
|
||||||
|
|
||||||
void* pBuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
void* pBuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
||||||
tSVCreateTbBatchReqSerialize(&pBuf, &(pTbBatch->req));
|
tSerializeSVCreateTbBatchReq(&pBuf, &(pTbBatch->req));
|
||||||
|
|
||||||
SVgDataBlocks* pVgData = calloc(1, sizeof(SVgDataBlocks));
|
SVgDataBlocks* pVgData = calloc(1, sizeof(SVgDataBlocks));
|
||||||
pVgData->vg = pTbBatch->info;
|
pVgData->vg = pTbBatch->info;
|
||||||
|
|
|
@ -134,10 +134,12 @@ typedef struct {
|
||||||
// int16_t numOfTry; // number of try for different servers
|
// int16_t numOfTry; // number of try for different servers
|
||||||
// int8_t oldInUse; // server EP inUse passed by app
|
// int8_t oldInUse; // server EP inUse passed by app
|
||||||
// int8_t redirect; // flag to indicate redirect
|
// int8_t redirect; // flag to indicate redirect
|
||||||
int8_t connType; // connection type
|
int8_t connType; // connection type
|
||||||
int64_t rid; // refId returned by taosAddRef
|
int64_t rid; // refId returned by taosAddRef
|
||||||
SRpcMsg* pRsp; // for synchronous API
|
|
||||||
tsem_t* pSem; // for synchronous API
|
SRpcMsg* pRsp; // for synchronous API
|
||||||
|
tsem_t* pSem; // for synchronous API
|
||||||
|
|
||||||
char* ip;
|
char* ip;
|
||||||
uint32_t port;
|
uint32_t port;
|
||||||
// SEpSet* pSet; // for synchronous API
|
// SEpSet* pSet; // for synchronous API
|
||||||
|
|
|
@ -813,8 +813,8 @@ static SRpcConn *rpcSetupConnToServer(SRpcReqContext *pContext) {
|
||||||
SRpcInfo *pRpc = pContext->pRpc;
|
SRpcInfo *pRpc = pContext->pRpc;
|
||||||
SEpSet * pEpSet = &pContext->epSet;
|
SEpSet * pEpSet = &pContext->epSet;
|
||||||
|
|
||||||
pConn =
|
pConn = rpcGetConnFromCache(pRpc->pCache, pEpSet->eps[pEpSet->inUse].fqdn, pEpSet->eps[pEpSet->inUse].port,
|
||||||
rpcGetConnFromCache(pRpc->pCache, pEpSet->eps[pEpSet->inUse].fqdn, pEpSet->eps[pEpSet->inUse].port, pContext->connType);
|
pContext->connType);
|
||||||
if (pConn == NULL || pConn->user[0] == 0) {
|
if (pConn == NULL || pConn->user[0] == 0) {
|
||||||
pConn = rpcOpenConn(pRpc, pEpSet->eps[pEpSet->inUse].fqdn, pEpSet->eps[pEpSet->inUse].port, pContext->connType);
|
pConn = rpcOpenConn(pRpc, pEpSet->eps[pEpSet->inUse].fqdn, pEpSet->eps[pEpSet->inUse].port, pContext->connType);
|
||||||
}
|
}
|
||||||
|
|
|
@ -63,17 +63,41 @@ void rpcFreeCont(void* cont) {
|
||||||
}
|
}
|
||||||
free((char*)cont - TRANS_MSG_OVERHEAD);
|
free((char*)cont - TRANS_MSG_OVERHEAD);
|
||||||
}
|
}
|
||||||
void* rpcReallocCont(void* ptr, int contLen) { return NULL; }
|
void* rpcReallocCont(void* ptr, int contLen) {
|
||||||
|
if (ptr == NULL) {
|
||||||
|
return rpcMallocCont(contLen);
|
||||||
|
}
|
||||||
|
char* st = (char*)ptr - TRANS_MSG_OVERHEAD;
|
||||||
|
int sz = contLen + TRANS_MSG_OVERHEAD;
|
||||||
|
st = realloc(st, sz);
|
||||||
|
if (st == NULL) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
return st + TRANS_MSG_OVERHEAD;
|
||||||
|
}
|
||||||
|
|
||||||
|
void rpcSendRedirectRsp(void* thandle, const SEpSet* pEpSet) {
|
||||||
|
SRpcMsg rpcMsg;
|
||||||
|
memset(&rpcMsg, 0, sizeof(rpcMsg));
|
||||||
|
|
||||||
|
rpcMsg.contLen = sizeof(SEpSet);
|
||||||
|
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
|
||||||
|
if (rpcMsg.pCont == NULL) return;
|
||||||
|
|
||||||
|
memcpy(rpcMsg.pCont, pEpSet, sizeof(SEpSet));
|
||||||
|
|
||||||
|
rpcMsg.code = TSDB_CODE_RPC_REDIRECT;
|
||||||
|
rpcMsg.handle = thandle;
|
||||||
|
|
||||||
|
rpcSendResponse(&rpcMsg);
|
||||||
|
}
|
||||||
|
|
||||||
void rpcSendRedirectRsp(void* pConn, const SEpSet* pEpSet) {}
|
|
||||||
int rpcGetConnInfo(void* thandle, SRpcConnInfo* pInfo) { return -1; }
|
|
||||||
void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pReq, SRpcMsg* pRsp) { return; }
|
|
||||||
int rpcReportProgress(void* pConn, char* pCont, int contLen) { return -1; }
|
int rpcReportProgress(void* pConn, char* pCont, int contLen) { return -1; }
|
||||||
void rpcCancelRequest(int64_t rid) { return; }
|
void rpcCancelRequest(int64_t rid) { return; }
|
||||||
|
|
||||||
int32_t rpcInit(void) {
|
int32_t rpcInit(void) {
|
||||||
// impl later
|
// impl later
|
||||||
return -1;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void rpcCleanup(void) {
|
void rpcCleanup(void) {
|
||||||
|
|
|
@ -123,9 +123,14 @@ static void clientHandleResp(SCliConn* conn) {
|
||||||
rpcMsg.code = pHead->code;
|
rpcMsg.code = pHead->code;
|
||||||
rpcMsg.msgType = pHead->msgType;
|
rpcMsg.msgType = pHead->msgType;
|
||||||
rpcMsg.ahandle = pCtx->ahandle;
|
rpcMsg.ahandle = pCtx->ahandle;
|
||||||
|
if (pCtx->pSem == NULL) {
|
||||||
tDebug("conn %p handle resp", conn);
|
tDebug("conn %p handle resp", conn);
|
||||||
(pRpc->cfp)(NULL, &rpcMsg, NULL);
|
(pRpc->cfp)(NULL, &rpcMsg, NULL);
|
||||||
|
} else {
|
||||||
|
tDebug("conn %p handle resp", conn);
|
||||||
|
memcpy((char*)pCtx->pRsp, (char*)&rpcMsg, sizeof(rpcMsg));
|
||||||
|
tsem_post(pCtx->pSem);
|
||||||
|
}
|
||||||
conn->notifyCount += 1;
|
conn->notifyCount += 1;
|
||||||
|
|
||||||
// buf's mem alread translated to rpcMsg.pCont
|
// buf's mem alread translated to rpcMsg.pCont
|
||||||
|
@ -159,14 +164,20 @@ static void clientHandleExcept(SCliConn* pConn) {
|
||||||
SRpcMsg rpcMsg = {0};
|
SRpcMsg rpcMsg = {0};
|
||||||
rpcMsg.ahandle = pCtx->ahandle;
|
rpcMsg.ahandle = pCtx->ahandle;
|
||||||
rpcMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
|
rpcMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
|
||||||
// SRpcInfo* pRpc = pMsg->ctx->pRpc;
|
if (pCtx->pSem == NULL) {
|
||||||
(pCtx->pTransInst->cfp)(NULL, &rpcMsg, NULL);
|
// SRpcInfo* pRpc = pMsg->ctx->pRpc;
|
||||||
pConn->notifyCount += 1;
|
(pCtx->pTransInst->cfp)(NULL, &rpcMsg, NULL);
|
||||||
|
} else {
|
||||||
|
memcpy((char*)(pCtx->pRsp), (char*)(&rpcMsg), sizeof(rpcMsg));
|
||||||
|
// SRpcMsg rpcMsg
|
||||||
|
tsem_post(pCtx->pSem);
|
||||||
|
}
|
||||||
|
|
||||||
destroyCmsg(pMsg);
|
destroyCmsg(pMsg);
|
||||||
pConn->data = NULL;
|
pConn->data = NULL;
|
||||||
// transDestroyConnCtx(pCtx);
|
// transDestroyConnCtx(pCtx);
|
||||||
clientConnDestroy(pConn, true);
|
clientConnDestroy(pConn, true);
|
||||||
|
pConn->notifyCount += 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void clientTimeoutCb(uv_timer_t* handle) {
|
static void clientTimeoutCb(uv_timer_t* handle) {
|
||||||
|
@ -463,6 +474,7 @@ static void clientAsyncCb(uv_async_t* handle) {
|
||||||
|
|
||||||
static void* clientThread(void* arg) {
|
static void* clientThread(void* arg) {
|
||||||
SCliThrdObj* pThrd = (SCliThrdObj*)arg;
|
SCliThrdObj* pThrd = (SCliThrdObj*)arg;
|
||||||
|
setThreadName("trans-client-work");
|
||||||
uv_run(pThrd->loop, UV_RUN_DEFAULT);
|
uv_run(pThrd->loop, UV_RUN_DEFAULT);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -568,8 +580,8 @@ void taosCloseClient(void* arg) {
|
||||||
}
|
}
|
||||||
void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) {
|
void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) {
|
||||||
// impl later
|
// impl later
|
||||||
char* ip = (char*)(pEpSet->fqdn[pEpSet->inUse]);
|
char* ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn);
|
||||||
uint32_t port = pEpSet->port[pEpSet->inUse];
|
uint32_t port = pEpSet->eps[pEpSet->inUse].port;
|
||||||
|
|
||||||
SRpcInfo* pRpc = (SRpcInfo*)shandle;
|
SRpcInfo* pRpc = (SRpcInfo*)shandle;
|
||||||
|
|
||||||
|
@ -609,4 +621,45 @@ void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t*
|
||||||
// int end = taosGetTimestampUs() - start;
|
// int end = taosGetTimestampUs() - start;
|
||||||
// tError("client sent to rpc, time cost: %d", (int)end);
|
// tError("client sent to rpc, time cost: %d", (int)end);
|
||||||
}
|
}
|
||||||
|
void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pReq, SRpcMsg* pRsp) {
|
||||||
|
char* ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn);
|
||||||
|
uint32_t port = pEpSet->eps[pEpSet->inUse].port;
|
||||||
|
|
||||||
|
SRpcInfo* pRpc = (SRpcInfo*)shandle;
|
||||||
|
|
||||||
|
STransConnCtx* pCtx = calloc(1, sizeof(STransConnCtx));
|
||||||
|
pCtx->pTransInst = (SRpcInfo*)shandle;
|
||||||
|
pCtx->ahandle = pReq->ahandle;
|
||||||
|
pCtx->msgType = pReq->msgType;
|
||||||
|
pCtx->ip = strdup(ip);
|
||||||
|
pCtx->port = port;
|
||||||
|
pCtx->pSem = calloc(1, sizeof(tsem_t));
|
||||||
|
pCtx->pRsp = pRsp;
|
||||||
|
tsem_init(pCtx->pSem, 0, 0);
|
||||||
|
|
||||||
|
int64_t index = pRpc->index;
|
||||||
|
if (pRpc->index++ >= pRpc->numOfThreads) {
|
||||||
|
pRpc->index = 0;
|
||||||
|
}
|
||||||
|
SCliMsg* cliMsg = malloc(sizeof(SCliMsg));
|
||||||
|
cliMsg->ctx = pCtx;
|
||||||
|
cliMsg->msg = *pReq;
|
||||||
|
cliMsg->st = taosGetTimestampUs();
|
||||||
|
|
||||||
|
SCliThrdObj* thrd = ((SClientObj*)pRpc->tcphandle)->pThreadObj[index % pRpc->numOfThreads];
|
||||||
|
|
||||||
|
// pthread_mutex_lock(&thrd->msgMtx);
|
||||||
|
// QUEUE_PUSH(&thrd->msg, &cliMsg->q);
|
||||||
|
// pthread_mutex_unlock(&thrd->msgMtx);
|
||||||
|
|
||||||
|
// int start = taosGetTimestampUs();
|
||||||
|
transSendAsync(thrd->asyncPool, &(cliMsg->q));
|
||||||
|
|
||||||
|
tsem_t* pSem = pCtx->pSem;
|
||||||
|
tsem_wait(pSem);
|
||||||
|
tsem_destroy(pSem);
|
||||||
|
free(pSem);
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -33,6 +33,8 @@ typedef struct SSrvConn {
|
||||||
void* hostThrd;
|
void* hostThrd;
|
||||||
void* pSrvMsg;
|
void* pSrvMsg;
|
||||||
|
|
||||||
|
struct sockaddr peername;
|
||||||
|
|
||||||
// SRpcMsg sendMsg;
|
// SRpcMsg sendMsg;
|
||||||
// del later
|
// del later
|
||||||
char secured;
|
char secured;
|
||||||
|
@ -487,7 +489,13 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
|
||||||
uv_os_fd_t fd;
|
uv_os_fd_t fd;
|
||||||
uv_fileno((const uv_handle_t*)pConn->pTcp, &fd);
|
uv_fileno((const uv_handle_t*)pConn->pTcp, &fd);
|
||||||
tDebug("conn %p created, fd: %d", pConn, fd);
|
tDebug("conn %p created, fd: %d", pConn, fd);
|
||||||
uv_read_start((uv_stream_t*)(pConn->pTcp), uvAllocReadBufferCb, uvOnReadCb);
|
int namelen = sizeof(pConn->peername);
|
||||||
|
if (0 != uv_tcp_getpeername(pConn->pTcp, &pConn->peername, &namelen)) {
|
||||||
|
tError("failed to get peer name");
|
||||||
|
destroyConn(pConn, true);
|
||||||
|
} else {
|
||||||
|
uv_read_start((uv_stream_t*)(pConn->pTcp), uvAllocReadBufferCb, uvOnReadCb);
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
tDebug("failed to create new connection");
|
tDebug("failed to create new connection");
|
||||||
destroyConn(pConn, true);
|
destroyConn(pConn, true);
|
||||||
|
@ -496,6 +504,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
|
||||||
|
|
||||||
void* acceptThread(void* arg) {
|
void* acceptThread(void* arg) {
|
||||||
// opt
|
// opt
|
||||||
|
setThreadName("trans-accept");
|
||||||
SServerObj* srv = (SServerObj*)arg;
|
SServerObj* srv = (SServerObj*)arg;
|
||||||
uv_run(srv->loop, UV_RUN_DEFAULT);
|
uv_run(srv->loop, UV_RUN_DEFAULT);
|
||||||
}
|
}
|
||||||
|
@ -548,6 +557,7 @@ static bool addHandleToAcceptloop(void* arg) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
void* workerThread(void* arg) {
|
void* workerThread(void* arg) {
|
||||||
|
setThreadName("trans-worker");
|
||||||
SWorkThrdObj* pThrd = (SWorkThrdObj*)arg;
|
SWorkThrdObj* pThrd = (SWorkThrdObj*)arg;
|
||||||
uv_run(pThrd->loop, UV_RUN_DEFAULT);
|
uv_run(pThrd->loop, UV_RUN_DEFAULT);
|
||||||
}
|
}
|
||||||
|
@ -723,4 +733,16 @@ void rpcSendResponse(const SRpcMsg* pMsg) {
|
||||||
// uv_async_send(pThrd->workerAsync);
|
// uv_async_send(pThrd->workerAsync);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int rpcGetConnInfo(void* thandle, SRpcConnInfo* pInfo) {
|
||||||
|
SSrvConn* pConn = thandle;
|
||||||
|
struct sockaddr* pPeerName = &pConn->peername;
|
||||||
|
|
||||||
|
struct sockaddr_in caddr = *(struct sockaddr_in*)(pPeerName);
|
||||||
|
pInfo->clientIp = (uint32_t)(caddr.sin_addr.s_addr);
|
||||||
|
pInfo->clientPort = ntohs(caddr.sin_port);
|
||||||
|
|
||||||
|
tstrncpy(pInfo->user, pConn->user, sizeof(pInfo->user));
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -2,6 +2,7 @@ add_executable(transportTest "")
|
||||||
add_executable(client "")
|
add_executable(client "")
|
||||||
add_executable(server "")
|
add_executable(server "")
|
||||||
add_executable(transUT "")
|
add_executable(transUT "")
|
||||||
|
add_executable(syncClient "")
|
||||||
|
|
||||||
target_sources(transUT
|
target_sources(transUT
|
||||||
PRIVATE
|
PRIVATE
|
||||||
|
@ -20,6 +21,10 @@ target_sources (server
|
||||||
PRIVATE
|
PRIVATE
|
||||||
"rserver.c"
|
"rserver.c"
|
||||||
)
|
)
|
||||||
|
target_sources (syncClient
|
||||||
|
PRIVATE
|
||||||
|
"syncClient.c"
|
||||||
|
)
|
||||||
|
|
||||||
target_include_directories(transportTest
|
target_include_directories(transportTest
|
||||||
PUBLIC
|
PUBLIC
|
||||||
|
@ -67,7 +72,6 @@ target_include_directories(transUT
|
||||||
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
|
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
target_link_libraries (server
|
target_link_libraries (server
|
||||||
os
|
os
|
||||||
util
|
util
|
||||||
|
@ -75,4 +79,17 @@ target_link_libraries (server
|
||||||
gtest_main
|
gtest_main
|
||||||
transport
|
transport
|
||||||
)
|
)
|
||||||
|
target_include_directories(syncClient
|
||||||
|
PUBLIC
|
||||||
|
"${CMAKE_SOURCE_DIR}/include/libs/transport"
|
||||||
|
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
|
||||||
|
)
|
||||||
|
target_link_libraries (syncClient
|
||||||
|
os
|
||||||
|
util
|
||||||
|
common
|
||||||
|
gtest_main
|
||||||
|
transport
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -33,7 +33,6 @@ typedef struct {
|
||||||
pthread_t thread;
|
pthread_t thread;
|
||||||
void * pRpc;
|
void * pRpc;
|
||||||
} SInfo;
|
} SInfo;
|
||||||
|
|
||||||
static void processResponse(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
static void processResponse(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||||
SInfo *pInfo = (SInfo *)pMsg->ahandle;
|
SInfo *pInfo = (SInfo *)pMsg->ahandle;
|
||||||
tDebug("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen,
|
tDebug("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen,
|
||||||
|
|
|
@ -0,0 +1,220 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
#include <sys/time.h>
|
||||||
|
|
||||||
|
#include <tep.h>
|
||||||
|
#include "os.h"
|
||||||
|
#include "rpcLog.h"
|
||||||
|
#include "taoserror.h"
|
||||||
|
#include "tglobal.h"
|
||||||
|
#include "trpc.h"
|
||||||
|
#include "tutil.h"
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int index;
|
||||||
|
SEpSet epSet;
|
||||||
|
int num;
|
||||||
|
int numOfReqs;
|
||||||
|
int msgSize;
|
||||||
|
tsem_t rspSem;
|
||||||
|
tsem_t * pOverSem;
|
||||||
|
pthread_t thread;
|
||||||
|
void * pRpc;
|
||||||
|
} SInfo;
|
||||||
|
static void processResponse(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||||
|
SInfo *pInfo = (SInfo *)pMsg->ahandle;
|
||||||
|
tDebug("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen,
|
||||||
|
pMsg->code);
|
||||||
|
|
||||||
|
if (pEpSet) pInfo->epSet = *pEpSet;
|
||||||
|
|
||||||
|
rpcFreeCont(pMsg->pCont);
|
||||||
|
// tsem_post(&pInfo->rspSem);
|
||||||
|
tsem_post(&pInfo->rspSem);
|
||||||
|
}
|
||||||
|
|
||||||
|
static int tcount = 0;
|
||||||
|
|
||||||
|
static void *sendRequest(void *param) {
|
||||||
|
SInfo * pInfo = (SInfo *)param;
|
||||||
|
SRpcMsg rpcMsg = {0};
|
||||||
|
|
||||||
|
tDebug("thread:%d, start to send request", pInfo->index);
|
||||||
|
|
||||||
|
tDebug("thread:%d, reqs: %d", pInfo->index, pInfo->numOfReqs);
|
||||||
|
int u100 = 0;
|
||||||
|
int u500 = 0;
|
||||||
|
int u1000 = 0;
|
||||||
|
int u10000 = 0;
|
||||||
|
SRpcMsg respMsg = {0};
|
||||||
|
while (pInfo->numOfReqs == 0 || pInfo->num < pInfo->numOfReqs) {
|
||||||
|
pInfo->num++;
|
||||||
|
rpcMsg.pCont = rpcMallocCont(pInfo->msgSize);
|
||||||
|
rpcMsg.contLen = pInfo->msgSize;
|
||||||
|
rpcMsg.ahandle = pInfo;
|
||||||
|
rpcMsg.msgType = 1;
|
||||||
|
// tDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num);
|
||||||
|
int64_t start = taosGetTimestampUs();
|
||||||
|
rpcSendRecv(pInfo->pRpc, &pInfo->epSet, &rpcMsg, &respMsg);
|
||||||
|
// rpcSendRequest(pInfo->pRpc, &pInfo->epSet, &rpcMsg, NULL);
|
||||||
|
if (pInfo->num % 20000 == 0) tInfo("thread:%d, %d requests have been sent", pInfo->index, pInfo->num);
|
||||||
|
// tsem_wait(&pInfo->rspSem);
|
||||||
|
// wtsem_wait(&pInfo->rspSem);
|
||||||
|
int64_t end = taosGetTimestampUs() - start;
|
||||||
|
if (end <= 100) {
|
||||||
|
u100++;
|
||||||
|
} else if (end > 100 && end <= 500) {
|
||||||
|
u500++;
|
||||||
|
} else if (end > 500 && end < 1000) {
|
||||||
|
u1000++;
|
||||||
|
} else {
|
||||||
|
u10000++;
|
||||||
|
}
|
||||||
|
|
||||||
|
tDebug("recv response succefully");
|
||||||
|
|
||||||
|
// usleep(100000000);
|
||||||
|
}
|
||||||
|
|
||||||
|
tError("send and recv sum: %d, %d, %d, %d", u100, u500, u1000, u10000);
|
||||||
|
tDebug("thread:%d, it is over", pInfo->index);
|
||||||
|
tcount++;
|
||||||
|
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
int main(int argc, char *argv[]) {
|
||||||
|
SRpcInit rpcInit;
|
||||||
|
SEpSet epSet = {0};
|
||||||
|
int msgSize = 128;
|
||||||
|
int numOfReqs = 0;
|
||||||
|
int appThreads = 1;
|
||||||
|
char serverIp[40] = "127.0.0.1";
|
||||||
|
char secret[20] = "mypassword";
|
||||||
|
struct timeval systemTime;
|
||||||
|
int64_t startTime, endTime;
|
||||||
|
pthread_attr_t thattr;
|
||||||
|
|
||||||
|
// server info
|
||||||
|
epSet.inUse = 0;
|
||||||
|
addEpIntoEpSet(&epSet, serverIp, 7000);
|
||||||
|
addEpIntoEpSet(&epSet, "192.168.0.1", 7000);
|
||||||
|
|
||||||
|
// client info
|
||||||
|
memset(&rpcInit, 0, sizeof(rpcInit));
|
||||||
|
rpcInit.localPort = 0;
|
||||||
|
rpcInit.label = "APP";
|
||||||
|
rpcInit.numOfThreads = 1;
|
||||||
|
rpcInit.cfp = processResponse;
|
||||||
|
rpcInit.sessions = 100;
|
||||||
|
rpcInit.idleTime = 100;
|
||||||
|
rpcInit.user = "michael";
|
||||||
|
rpcInit.secret = secret;
|
||||||
|
rpcInit.ckey = "key";
|
||||||
|
rpcInit.spi = 1;
|
||||||
|
rpcInit.connType = TAOS_CONN_CLIENT;
|
||||||
|
|
||||||
|
for (int i = 1; i < argc; ++i) {
|
||||||
|
if (strcmp(argv[i], "-p") == 0 && i < argc - 1) {
|
||||||
|
epSet.eps[0].port = atoi(argv[++i]);
|
||||||
|
} else if (strcmp(argv[i], "-i") == 0 && i < argc - 1) {
|
||||||
|
tstrncpy(epSet.eps[0].fqdn, argv[++i], sizeof(epSet.eps[0].fqdn));
|
||||||
|
} else if (strcmp(argv[i], "-t") == 0 && i < argc - 1) {
|
||||||
|
rpcInit.numOfThreads = atoi(argv[++i]);
|
||||||
|
} else if (strcmp(argv[i], "-m") == 0 && i < argc - 1) {
|
||||||
|
msgSize = atoi(argv[++i]);
|
||||||
|
} else if (strcmp(argv[i], "-s") == 0 && i < argc - 1) {
|
||||||
|
rpcInit.sessions = atoi(argv[++i]);
|
||||||
|
} else if (strcmp(argv[i], "-n") == 0 && i < argc - 1) {
|
||||||
|
numOfReqs = atoi(argv[++i]);
|
||||||
|
} else if (strcmp(argv[i], "-a") == 0 && i < argc - 1) {
|
||||||
|
appThreads = atoi(argv[++i]);
|
||||||
|
} else if (strcmp(argv[i], "-o") == 0 && i < argc - 1) {
|
||||||
|
tsCompressMsgSize = atoi(argv[++i]);
|
||||||
|
} else if (strcmp(argv[i], "-u") == 0 && i < argc - 1) {
|
||||||
|
rpcInit.user = argv[++i];
|
||||||
|
} else if (strcmp(argv[i], "-k") == 0 && i < argc - 1) {
|
||||||
|
rpcInit.secret = argv[++i];
|
||||||
|
} else if (strcmp(argv[i], "-spi") == 0 && i < argc - 1) {
|
||||||
|
rpcInit.spi = atoi(argv[++i]);
|
||||||
|
} else if (strcmp(argv[i], "-d") == 0 && i < argc - 1) {
|
||||||
|
rpcDebugFlag = atoi(argv[++i]);
|
||||||
|
} else {
|
||||||
|
printf("\nusage: %s [options] \n", argv[0]);
|
||||||
|
printf(" [-i ip]: first server IP address, default is:%s\n", serverIp);
|
||||||
|
printf(" [-p port]: server port number, default is:%d\n", epSet.eps[0].port);
|
||||||
|
printf(" [-t threads]: number of rpc threads, default is:%d\n", rpcInit.numOfThreads);
|
||||||
|
printf(" [-s sessions]: number of rpc sessions, default is:%d\n", rpcInit.sessions);
|
||||||
|
printf(" [-m msgSize]: message body size, default is:%d\n", msgSize);
|
||||||
|
printf(" [-a threads]: number of app threads, default is:%d\n", appThreads);
|
||||||
|
printf(" [-n requests]: number of requests per thread, default is:%d\n", numOfReqs);
|
||||||
|
printf(" [-o compSize]: compression message size, default is:%d\n", tsCompressMsgSize);
|
||||||
|
printf(" [-u user]: user name for the connection, default is:%s\n", rpcInit.user);
|
||||||
|
printf(" [-k secret]: password for the connection, default is:%s\n", rpcInit.secret);
|
||||||
|
printf(" [-spi SPI]: security parameter index, default is:%d\n", rpcInit.spi);
|
||||||
|
printf(" [-d debugFlag]: debug flag, default:%d\n", rpcDebugFlag);
|
||||||
|
printf(" [-h help]: print out this help\n\n");
|
||||||
|
exit(0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
taosInitLog("client.log", 100000, 10);
|
||||||
|
|
||||||
|
void *pRpc = rpcOpen(&rpcInit);
|
||||||
|
if (pRpc == NULL) {
|
||||||
|
tError("failed to initialize RPC");
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
tInfo("client is initialized");
|
||||||
|
tInfo("threads:%d msgSize:%d requests:%d", appThreads, msgSize, numOfReqs);
|
||||||
|
|
||||||
|
gettimeofday(&systemTime, NULL);
|
||||||
|
startTime = systemTime.tv_sec * 1000000 + systemTime.tv_usec;
|
||||||
|
|
||||||
|
SInfo *pInfo = (SInfo *)calloc(1, sizeof(SInfo) * appThreads);
|
||||||
|
|
||||||
|
pthread_attr_init(&thattr);
|
||||||
|
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
|
||||||
|
|
||||||
|
for (int i = 0; i < appThreads; ++i) {
|
||||||
|
pInfo->index = i;
|
||||||
|
pInfo->epSet = epSet;
|
||||||
|
pInfo->numOfReqs = numOfReqs;
|
||||||
|
pInfo->msgSize = msgSize;
|
||||||
|
tsem_init(&pInfo->rspSem, 0, 0);
|
||||||
|
pInfo->pRpc = pRpc;
|
||||||
|
pthread_create(&pInfo->thread, &thattr, sendRequest, pInfo);
|
||||||
|
pInfo++;
|
||||||
|
}
|
||||||
|
|
||||||
|
do {
|
||||||
|
usleep(1);
|
||||||
|
} while (tcount < appThreads);
|
||||||
|
|
||||||
|
gettimeofday(&systemTime, NULL);
|
||||||
|
endTime = systemTime.tv_sec * 1000000 + systemTime.tv_usec;
|
||||||
|
float usedTime = (endTime - startTime) / 1000.0f; // mseconds
|
||||||
|
|
||||||
|
tInfo("it takes %.3f mseconds to send %d requests to server", usedTime, numOfReqs * appThreads);
|
||||||
|
tInfo("Performance: %.3f requests per second, msgSize:%d bytes", 1000.0 * numOfReqs * appThreads / usedTime, msgSize);
|
||||||
|
|
||||||
|
int ch = getchar();
|
||||||
|
UNUSED(ch);
|
||||||
|
|
||||||
|
taosCloseLog();
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
|
@ -15,6 +15,7 @@
|
||||||
#include <gtest/gtest.h>
|
#include <gtest/gtest.h>
|
||||||
#include <cstdio>
|
#include <cstdio>
|
||||||
#include <cstring>
|
#include <cstring>
|
||||||
|
#include "tep.h"
|
||||||
#include "trpc.h"
|
#include "trpc.h"
|
||||||
using namespace std;
|
using namespace std;
|
||||||
|
|
||||||
|
@ -50,6 +51,25 @@ class TransObj {
|
||||||
trans = rpcOpen(&rpcInit);
|
trans = rpcOpen(&rpcInit);
|
||||||
return trans != NULL ? true : false;
|
return trans != NULL ? true : false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool sendAndRecv() {
|
||||||
|
SEpSet epSet = {0};
|
||||||
|
epSet.inUse = 0;
|
||||||
|
addEpIntoEpSet(&epSet, "192.168.1.1", 7000);
|
||||||
|
addEpIntoEpSet(&epSet, "192.168.0.1", 7000);
|
||||||
|
|
||||||
|
if (trans == NULL) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
SRpcMsg rpcMsg = {0}, reqMsg = {0};
|
||||||
|
reqMsg.pCont = rpcMallocCont(10);
|
||||||
|
reqMsg.contLen = 10;
|
||||||
|
reqMsg.ahandle = NULL;
|
||||||
|
rpcSendRecv(trans, &epSet, &reqMsg, &rpcMsg);
|
||||||
|
int code = rpcMsg.code;
|
||||||
|
std::cout << tstrerror(code) << std::endl;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
bool stop() {
|
bool stop() {
|
||||||
rpcClose(trans);
|
rpcClose(trans);
|
||||||
trans = NULL;
|
trans = NULL;
|
||||||
|
@ -75,6 +95,7 @@ class TransEnv : public ::testing::Test {
|
||||||
};
|
};
|
||||||
TEST_F(TransEnv, test_start_stop) {
|
TEST_F(TransEnv, test_start_stop) {
|
||||||
assert(tr->startCli());
|
assert(tr->startCli());
|
||||||
|
assert(tr->sendAndRecv());
|
||||||
assert(tr->stop());
|
assert(tr->stop());
|
||||||
|
|
||||||
assert(tr->startSrv());
|
assert(tr->startSrv());
|
||||||
|
|
Loading…
Reference in New Issue