From 3a2a8871d849723dee685b581572412e8d3363a5 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 8 Feb 2022 17:44:58 +0800 Subject: [PATCH 1/3] update stb --- include/common/tmsg.h | 31 +++++----- source/common/src/tmsg.c | 4 +- source/dnode/mgmt/impl/test/vnode/vnode.cpp | 68 ++++++++++++++++++--- source/dnode/vnode/src/vnd/vnodeWrite.c | 2 +- source/libs/parser/src/dCDAstProcess.c | 4 +- 5 files changed, 80 insertions(+), 29 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index d248b5471f..6a6c3a36d8 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1178,31 +1178,28 @@ typedef struct SVCreateTbReq { SSchema* pSchema; } ntbCfg; }; -} SVCreateTbReq; +} SVCreateTbReq, SVUpdateTbReq; + +typedef struct { +} SVCreateTbRsp, SVUpdateTbRsp; + +int32_t tSerializeSVCreateTbReq(void** buf, SVCreateTbReq* pReq); +void* tDeserializeSVCreateTbReq(void* buf, SVCreateTbReq* pReq); +int32_t tSerializeSVCreateTbRsp(void** buf, SVCreateTbRsp* pRsp); +void* tDeserializeSVCreateTbRsp(void* buf, SVCreateTbRsp* pRsp); typedef struct { uint64_t ver; // use a general definition SArray* pArray; } SVCreateTbBatchReq; -int tSerializeSVCreateTbReq(void** buf, SVCreateTbReq* pReq); -void* tDeserializeSVCreateTbReq(void* buf, SVCreateTbReq* pReq); -int tSVCreateTbBatchReqSerialize(void** buf, SVCreateTbBatchReq* pReq); -void* tSVCreateTbBatchReqDeserialize(void* buf, SVCreateTbBatchReq* pReq); - typedef struct { - SMsgHead head; -} SVCreateTbRsp; +} SVCreateTbBatchRsp; -typedef struct { - SMsgHead head; - char name[TSDB_TABLE_FNAME_LEN]; - int8_t ignoreNotExists; -} SVAlterTbReq; - -typedef struct { - SMsgHead head; -} SVAlterTbRsp; +int32_t tSerializeSVCreateTbBatchReq(void** buf, SVCreateTbBatchReq* pReq); +void* tDeserializeSVCreateTbBatchReq(void* buf, SVCreateTbBatchReq* pReq); +int32_t tSerializeSVCreateTbBatchReqp(void** buf, SVCreateTbBatchReq* pRsp); +void* tDeserializeSVCreateTbBatchReq(void* buf, SVCreateTbBatchReq* pRsp); typedef struct { uint64_t ver; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 3fabd2ce0d..e45b61554c 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -293,7 +293,7 @@ void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) { return buf; } -int tSVCreateTbBatchReqSerialize(void **buf, SVCreateTbBatchReq *pReq) { +int tSerializeSVCreateTbBatchReq(void **buf, SVCreateTbBatchReq *pReq) { int tlen = 0; tlen += taosEncodeFixedU64(buf, pReq->ver); @@ -306,7 +306,7 @@ int tSVCreateTbBatchReqSerialize(void **buf, SVCreateTbBatchReq *pReq) { return tlen; } -void *tSVCreateTbBatchReqDeserialize(void *buf, SVCreateTbBatchReq *pReq) { +void *tDeserializeSVCreateTbBatchReq(void *buf, SVCreateTbBatchReq *pReq) { uint32_t nsize = 0; buf = taosDecodeFixedU64(buf, &pReq->ver); diff --git a/source/dnode/mgmt/impl/test/vnode/vnode.cpp b/source/dnode/mgmt/impl/test/vnode/vnode.cpp index 11b32fbf0f..9451608653 100644 --- a/source/dnode/mgmt/impl/test/vnode/vnode.cpp +++ b/source/dnode/mgmt/impl/test/vnode/vnode.cpp @@ -220,15 +220,69 @@ TEST_F(DndTestVnode, 03_Create_Stb) { } TEST_F(DndTestVnode, 04_ALTER_Stb) { -#if 0 - { - for (int i = 0; i < 3; ++i) { - SRpcMsg* pRsp = test.SendReq(TDMT_VND_ALTER_STB, pReq, contLen); - ASSERT_NE(pRsp, nullptr); - ASSERT_EQ(pRsp->code, 0); + for (int i = 0; i < 1; ++i) { + SVCreateTbReq req = {0}; + req.ver = 0; + req.name = (char*)"stb1"; + req.ttl = 0; + req.keep = 0; + req.type = TD_SUPER_TABLE; + + SSchema schemas[5] = {0}; + { + SSchema* pSchema = &schemas[0]; + pSchema->bytes = htonl(8); + pSchema->type = TSDB_DATA_TYPE_TIMESTAMP; + strcpy(pSchema->name, "ts"); } + + { + SSchema* pSchema = &schemas[1]; + pSchema->bytes = htonl(4); + pSchema->type = TSDB_DATA_TYPE_INT; + strcpy(pSchema->name, "col1"); + } + + { + SSchema* pSchema = &schemas[2]; + pSchema->bytes = htonl(2); + pSchema->type = TSDB_DATA_TYPE_TINYINT; + strcpy(pSchema->name, "_tag1"); + } + + { + SSchema* pSchema = &schemas[3]; + pSchema->bytes = htonl(8); + pSchema->type = TSDB_DATA_TYPE_BIGINT; + strcpy(pSchema->name, "_tag2"); + } + + { + SSchema* pSchema = &schemas[4]; + pSchema->bytes = htonl(16); + pSchema->type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema->name, "_tag3"); + } + + req.stbCfg.suid = 9527; + req.stbCfg.nCols = 2; + req.stbCfg.pSchema = &schemas[0]; + req.stbCfg.nTagCols = 3; + req.stbCfg.pTagSchema = &schemas[2]; + + int32_t contLen = tSerializeSVCreateTbReq(NULL, &req) + sizeof(SMsgHead); + SMsgHead* pHead = (SMsgHead*)rpcMallocCont(contLen); + + pHead->contLen = htonl(contLen); + pHead->vgId = htonl(2); + + void* pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead)); + tSerializeSVCreateTbReq(&pBuf, &req); + + SRpcMsg* pRsp = test.SendReq(TDMT_VND_ALTER_STB, (void*)pHead, contLen); + ASSERT_NE(pRsp, nullptr); + ASSERT_EQ(pRsp->code, 0); } -#endif } TEST_F(DndTestVnode, 05_DROP_Stb) { diff --git a/source/dnode/vnode/src/vnd/vnodeWrite.c b/source/dnode/vnode/src/vnd/vnodeWrite.c index 40cb02176b..e538bc85d4 100644 --- a/source/dnode/vnode/src/vnd/vnodeWrite.c +++ b/source/dnode/vnode/src/vnd/vnodeWrite.c @@ -83,7 +83,7 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { free(vCreateTbReq.name); break; case TDMT_VND_CREATE_TABLE: - tSVCreateTbBatchReqDeserialize(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vCreateTbBatchReq); + tDeserializeSVCreateTbBatchReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vCreateTbBatchReq); for (int i = 0; i < taosArrayGetSize(vCreateTbBatchReq.pArray); i++) { SVCreateTbReq *pCreateTbReq = taosArrayGet(vCreateTbBatchReq.pArray, i); if (metaCreateTable(pVnode->pMeta, pCreateTbReq) < 0) { diff --git a/source/libs/parser/src/dCDAstProcess.c b/source/libs/parser/src/dCDAstProcess.c index 3ae89bca0a..50ae3bfd26 100644 --- a/source/libs/parser/src/dCDAstProcess.c +++ b/source/libs/parser/src/dCDAstProcess.c @@ -598,7 +598,7 @@ static int32_t doCheckAndBuildCreateCTableReq(SCreateTableSql* pCreateTable, SPa } static int32_t serializeVgroupTablesBatchImpl(SVgroupTablesBatch* pTbBatch, SArray* pBufArray) { - int tlen = sizeof(SMsgHead) + tSVCreateTbBatchReqSerialize(NULL, &(pTbBatch->req)); + int tlen = sizeof(SMsgHead) + tSerializeSVCreateTbBatchReq(NULL, &(pTbBatch->req)); void* buf = malloc(tlen); if (buf == NULL) { // TODO: handle error @@ -608,7 +608,7 @@ static int32_t serializeVgroupTablesBatchImpl(SVgroupTablesBatch* pTbBatch, SArr ((SMsgHead*)buf)->contLen = htonl(tlen); void* pBuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); - tSVCreateTbBatchReqSerialize(&pBuf, &(pTbBatch->req)); + tSerializeSVCreateTbBatchReq(&pBuf, &(pTbBatch->req)); SVgDataBlocks* pVgData = calloc(1, sizeof(SVgDataBlocks)); pVgData->vg = pTbBatch->info; From 604ff6c4306e9ba957f11797c6bddf40c48e37e6 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 8 Feb 2022 19:52:48 +0800 Subject: [PATCH 2/3] update stb --- include/common/tmsg.h | 4 +- source/dnode/mnode/impl/src/mndStb.c | 109 +++-- source/dnode/mnode/impl/test/stb/stb.cpp | 482 +++++++++++++---------- source/dnode/vnode/src/vnd/vnodeWrite.c | 6 +- 4 files changed, 340 insertions(+), 261 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 6a6c3a36d8..70e76517c6 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -264,10 +264,10 @@ typedef struct { typedef struct { char name[TSDB_TABLE_FNAME_LEN]; - int8_t updateType; + int8_t alterType; int32_t numOfSchemas; SSchema pSchemas[]; -} SMUpdateStbReq; +} SMAltertbReq; typedef struct { int32_t pid; diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index ded8e2d9a0..006abcd8e2 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -33,10 +33,10 @@ static int32_t mndStbActionInsert(SSdb *pSdb, SStbObj *pStb); static int32_t mndStbActionDelete(SSdb *pSdb, SStbObj *pStb); static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOld, SStbObj *pNew); static int32_t mndProcessMCreateStbReq(SMnodeMsg *pReq); -static int32_t mndProcessMUpdateStbReq(SMnodeMsg *pReq); +static int32_t mndProcessMAlterStbReq(SMnodeMsg *pReq); static int32_t mndProcessMDropStbReq(SMnodeMsg *pReq); static int32_t mndProcessVCreateStbRsp(SMnodeMsg *pRsp); -static int32_t mndProcessVUpdateStbRsp(SMnodeMsg *pRsp); +static int32_t mndProcessVAlterStbRsp(SMnodeMsg *pRsp); static int32_t mndProcessVDropStbRsp(SMnodeMsg *pRsp); static int32_t mndProcessStbMetaReq(SMnodeMsg *pReq); static int32_t mndGetStbMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); @@ -53,10 +53,10 @@ int32_t mndInitStb(SMnode *pMnode) { .deleteFp = (SdbDeleteFp)mndStbActionDelete}; mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STB, mndProcessMCreateStbReq); - mndSetMsgHandle(pMnode, TDMT_MND_ALTER_STB, mndProcessMUpdateStbReq); + mndSetMsgHandle(pMnode, TDMT_MND_ALTER_STB, mndProcessMAlterStbReq); mndSetMsgHandle(pMnode, TDMT_MND_DROP_STB, mndProcessMDropStbReq); mndSetMsgHandle(pMnode, TDMT_VND_CREATE_STB_RSP, mndProcessVCreateStbRsp); - mndSetMsgHandle(pMnode, TDMT_VND_ALTER_STB_RSP, mndProcessVUpdateStbRsp); + mndSetMsgHandle(pMnode, TDMT_VND_ALTER_STB_RSP, mndProcessVAlterStbRsp); mndSetMsgHandle(pMnode, TDMT_VND_DROP_STB_RSP, mndProcessVDropStbRsp); mndSetMsgHandle(pMnode, TDMT_MND_STB_META, mndProcessStbMetaReq); @@ -193,6 +193,8 @@ static int32_t mndStbActionInsert(SSdb *pSdb, SStbObj *pStb) { static int32_t mndStbActionDelete(SSdb *pSdb, SStbObj *pStb) { mTrace("stb:%s, perform delete action, row:%p", pStb->name, pStb); + tfree(pStb->pColumns); + tfree(pStb->pTags); return 0; } @@ -202,10 +204,10 @@ static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOld, SStbObj *pNew) { taosWLockLatch(&pOld->lock); if (pOld->numOfColumns < pNew->numOfColumns) { - void *pSchema = malloc(pOld->numOfColumns * sizeof(SSchema)); - if (pSchema != NULL) { + void *pColumns = malloc(pNew->numOfColumns * sizeof(SSchema)); + if (pColumns != NULL) { free(pOld->pColumns); - pOld->pColumns = pSchema; + pOld->pColumns = pColumns; } else { terrno = TSDB_CODE_OUT_OF_MEMORY; mTrace("stb:%s, failed to perform update action since %s", pOld->name, terrstr()); @@ -214,10 +216,10 @@ static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOld, SStbObj *pNew) { } if (pOld->numOfTags < pNew->numOfTags) { - void *pSchema = malloc(pOld->numOfTags * sizeof(SSchema)); - if (pSchema != NULL) { + void *pTags = malloc(pNew->numOfTags * sizeof(SSchema)); + if (pTags != NULL) { free(pOld->pTags); - pOld->pTags = pSchema; + pOld->pTags = pTags; } else { terrno = TSDB_CODE_OUT_OF_MEMORY; mTrace("stb:%s, failed to perform update action since %s", pOld->name, terrstr()); @@ -575,11 +577,11 @@ static int32_t mndProcessVCreateStbRsp(SMnodeMsg *pRsp) { return 0; } -static int32_t mndCheckUpdateStbReq(SMUpdateStbReq *pUpdate) { - pUpdate->numOfSchemas = htonl(pUpdate->numOfSchemas); +static int32_t mndCheckAlterStbReq(SMAltertbReq *pAlter) { + pAlter->numOfSchemas = htonl(pAlter->numOfSchemas); - for (int32_t i = 0; i < pUpdate->numOfSchemas; ++i) { - SSchema *pSchema = &pUpdate->pSchemas[i]; + for (int32_t i = 0; i < pAlter->numOfSchemas; ++i) { + SSchema *pSchema = &pAlter->pSchemas[i]; pSchema->colId = htonl(pSchema->colId); pSchema->bytes = htonl(pSchema->bytes); @@ -696,8 +698,7 @@ static int32_t mndDropSuperTableTag(const SStbObj *pOld, SStbObj *pNew, const ch return 0; } -static int32_t mndUpdateStbTagName(const SStbObj *pOld, SStbObj *pNew, const char *oldTagName, - const char *newTagName) { +static int32_t mndAlterStbTagName(const SStbObj *pOld, SStbObj *pNew, const char *oldTagName, const char *newTagName) { int32_t tag = mndFindSuperTableTagIndex(pOld, oldTagName); if (tag < 0) { terrno = TSDB_CODE_MND_TAG_NOT_EXIST; @@ -727,7 +728,7 @@ static int32_t mndUpdateStbTagName(const SStbObj *pOld, SStbObj *pNew, const cha return 0; } -static int32_t mndUpdateStbTagBytes(const SStbObj *pOld, SStbObj *pNew, const SSchema *pSchema) { +static int32_t mndAlterStbTagBytes(const SStbObj *pOld, SStbObj *pNew, const SSchema *pSchema) { int32_t tag = mndFindSuperTableTagIndex(pOld, pSchema->name); if (tag < 0) { terrno = TSDB_CODE_MND_TAG_NOT_EXIST; @@ -810,7 +811,7 @@ static int32_t mndDropSuperTableColumn(const SStbObj *pOld, SStbObj *pNew, const return 0; } -static int32_t mndUpdateStbColumnBytes(const SStbObj *pOld, SStbObj *pNew, const SSchema *pSchema) { +static int32_t mndAlterStbColumnBytes(const SStbObj *pOld, SStbObj *pNew, const SSchema *pSchema) { int32_t col = mndFindSuperTableColumnIndex(pOld, pSchema->name); if (col < 0) { terrno = TSDB_CODE_MND_COLUMN_NOT_EXIST; @@ -849,17 +850,16 @@ static int32_t mndUpdateStbColumnBytes(const SStbObj *pOld, SStbObj *pNew, const return 0; } - -static int32_t mndSetUpdateStbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) { +static int32_t mndSetAlterStbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) { SSdbRaw *pRedoRaw = mndStbActionEncode(pStb); if (pRedoRaw == NULL) return -1; if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1; - if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING) != 0) return -1; + if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_UPDATING) != 0) return -1; return 0; } -static int32_t mndSetUpdateStbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) { +static int32_t mndSetAlterStbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) { SSdbRaw *pCommitRaw = mndStbActionEncode(pStb); if (pCommitRaw == NULL) return -1; if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1; @@ -868,8 +868,7 @@ static int32_t mndSetUpdateStbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj return 0; } - -static int32_t mndSetUpdateStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) { +static int32_t mndSetAlterStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) { SSdb *pSdb = pMnode->pSdb; SVgObj *pVgroup = NULL; void *pIter = NULL; @@ -892,7 +891,7 @@ static int32_t mndSetUpdateStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj action.epSet = mndGetVgroupEpset(pMnode, pVgroup); action.pCont = pReq; action.contLen = contLen; - action.msgType = TDMT_VND_CREATE_STB; + action.msgType = TDMT_VND_ALTER_STB; if (mndTransAppendRedoAction(pTrans, &action) != 0) { free(pReq); sdbCancelFetch(pSdb, pIter); @@ -905,7 +904,7 @@ static int32_t mndSetUpdateStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj return 0; } -static int32_t mndUpdateStb(SMnode *pMnode, SMnodeMsg *pReq, const SMUpdateStbReq *pUpdate, SDbObj *pDb, SStbObj *pOld) { +static int32_t mndAlterStb(SMnode *pMnode, SMnodeMsg *pReq, const SMAltertbReq *pAlter, SDbObj *pDb, SStbObj *pOld) { SStbObj stbObj = {0}; taosRLockLatch(&pOld->lock); memcpy(&stbObj, pOld, sizeof(SStbObj)); @@ -916,93 +915,93 @@ static int32_t mndUpdateStb(SMnode *pMnode, SMnodeMsg *pReq, const SMUpdateStbRe int32_t code = -1; - switch (pUpdate->updateType) { + switch (pAlter->alterType) { case TSDB_ALTER_TABLE_ADD_TAG_COLUMN: - code = mndAddSuperTableTag(pOld, &stbObj, pUpdate->pSchemas, 1); + code = mndAddSuperTableTag(pOld, &stbObj, pAlter->pSchemas, 1); break; case TSDB_ALTER_TABLE_DROP_TAG_COLUMN: - code = mndDropSuperTableTag(pOld, &stbObj, pUpdate->pSchemas[0].name); + code = mndDropSuperTableTag(pOld, &stbObj, pAlter->pSchemas[0].name); break; case TSDB_ALTER_TABLE_UPDATE_TAG_NAME: - code = mndUpdateStbTagName(pOld, &stbObj, pUpdate->pSchemas[0].name, pUpdate->pSchemas[1].name); + code = mndAlterStbTagName(pOld, &stbObj, pAlter->pSchemas[0].name, pAlter->pSchemas[1].name); break; case TSDB_ALTER_TABLE_UPDATE_TAG_BYTES: - code = mndUpdateStbTagBytes(pOld, &stbObj, &pUpdate->pSchemas[0]); + code = mndAlterStbTagBytes(pOld, &stbObj, &pAlter->pSchemas[0]); break; case TSDB_ALTER_TABLE_ADD_COLUMN: - code = mndAddSuperTableColumn(pOld, &stbObj, pUpdate->pSchemas, 1); + code = mndAddSuperTableColumn(pOld, &stbObj, pAlter->pSchemas, 1); break; case TSDB_ALTER_TABLE_DROP_COLUMN: - code = mndDropSuperTableColumn(pOld, &stbObj, pUpdate->pSchemas[0].name); + code = mndDropSuperTableColumn(pOld, &stbObj, pAlter->pSchemas[0].name); break; case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES: - code = mndUpdateStbColumnBytes(pOld, &stbObj, &pUpdate->pSchemas[0]); + code = mndAlterStbColumnBytes(pOld, &stbObj, &pAlter->pSchemas[0]); break; default: terrno = TSDB_CODE_MND_INVALID_STB_OPTION; break; } - if (code != 0) goto UPDATE_STB_OVER; + if (code != 0) goto ALTER_STB_OVER; code = -1; STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pReq->rpcMsg); - if (pTrans == NULL) goto UPDATE_STB_OVER; + if (pTrans == NULL) goto ALTER_STB_OVER; - mDebug("trans:%d, used to update stb:%s", pTrans->id, pUpdate->name); + mDebug("trans:%d, used to alter stb:%s", pTrans->id, pAlter->name); - if (mndSetUpdateStbRedoLogs(pMnode, pTrans, pDb, &stbObj) != 0) goto UPDATE_STB_OVER; - if (mndSetUpdateStbCommitLogs(pMnode, pTrans, pDb, &stbObj) != 0) goto UPDATE_STB_OVER; - if (mndSetUpdateStbRedoActions(pMnode, pTrans, pDb, &stbObj) != 0) goto UPDATE_STB_OVER; - if (mndTransPrepare(pMnode, pTrans) != 0) goto UPDATE_STB_OVER; + if (mndSetAlterStbRedoLogs(pMnode, pTrans, pDb, &stbObj) != 0) goto ALTER_STB_OVER; + if (mndSetAlterStbCommitLogs(pMnode, pTrans, pDb, &stbObj) != 0) goto ALTER_STB_OVER; + if (mndSetAlterStbRedoActions(pMnode, pTrans, pDb, &stbObj) != 0) goto ALTER_STB_OVER; + if (mndTransPrepare(pMnode, pTrans) != 0) goto ALTER_STB_OVER; code = 0; -UPDATE_STB_OVER: +ALTER_STB_OVER: mndTransDrop(pTrans); tfree(stbObj.pTags); tfree(stbObj.pColumns); return code; } -static int32_t mndProcessMUpdateStbReq(SMnodeMsg *pReq) { - SMnode *pMnode = pReq->pMnode; - SMUpdateStbReq *pUpdate = pReq->rpcMsg.pCont; +static int32_t mndProcessMAlterStbReq(SMnodeMsg *pReq) { + SMnode *pMnode = pReq->pMnode; + SMAltertbReq *pAlter = pReq->rpcMsg.pCont; - mDebug("stb:%s, start to update", pUpdate->name); + mDebug("stb:%s, start to alter", pAlter->name); - if (mndCheckUpdateStbReq(pUpdate) != 0) { - mError("stb:%s, failed to update since %s", pUpdate->name, terrstr()); + if (mndCheckAlterStbReq(pAlter) != 0) { + mError("stb:%s, failed to alter since %s", pAlter->name, terrstr()); return -1; } - SStbObj *pStb = mndAcquireStb(pMnode, pUpdate->name); + SStbObj *pStb = mndAcquireStb(pMnode, pAlter->name); if (pStb == NULL) { terrno = TSDB_CODE_MND_STB_NOT_EXIST; - mError("stb:%s, failed to update since %s", pUpdate->name, terrstr()); + mError("stb:%s, failed to alter since %s", pAlter->name, terrstr()); return -1; } - SDbObj *pDb = mndAcquireDbByStb(pMnode, pUpdate->name); + SDbObj *pDb = mndAcquireDbByStb(pMnode, pAlter->name); if (pDb == NULL) { mndReleaseStb(pMnode, pStb); terrno = TSDB_CODE_MND_DB_NOT_SELECTED; - mError("stb:%s, failed to update since %s", pUpdate->name, terrstr()); + mError("stb:%s, failed to alter since %s", pAlter->name, terrstr()); return -1; } - int32_t code = mndUpdateStb(pMnode, pReq, pUpdate, pDb, pStb); + int32_t code = mndAlterStb(pMnode, pReq, pAlter, pDb, pStb); mndReleaseStb(pMnode, pStb); if (code != 0) { - mError("stb:%s, failed to update since %s", pUpdate->name, tstrerror(code)); + mError("stb:%s, failed to alter since %s", pAlter->name, tstrerror(code)); return code; } return TSDB_CODE_MND_ACTION_IN_PROGRESS; } -static int32_t mndProcessVUpdateStbRsp(SMnodeMsg *pRsp) { +static int32_t mndProcessVAlterStbRsp(SMnodeMsg *pRsp) { mndTransProcessRsp(pRsp); return 0; } diff --git a/source/dnode/mnode/impl/test/stb/stb.cpp b/source/dnode/mnode/impl/test/stb/stb.cpp index d6577e5e40..ed0beb50a4 100644 --- a/source/dnode/mnode/impl/test/stb/stb.cpp +++ b/source/dnode/mnode/impl/test/stb/stb.cpp @@ -21,213 +21,289 @@ class MndTestStb : public ::testing::Test { public: void SetUp() override {} void TearDown() override {} + + SCreateDbReq* BuildCreateDbReq(const char* dbname, int32_t* pContLen); + SMCreateStbReq* BuildCreateStbReq(const char* stbname, int32_t* pContLen); + SMAltertbReq* BuildAlterStbAddTagReq(const char* stbname, int32_t* pContLen); }; Testbase MndTestStb::test; -TEST_F(MndTestStb, 01_Create_Show_Meta_Drop_Restart_Stb) { - { - int32_t contLen = sizeof(SCreateDbReq); +SCreateDbReq* MndTestStb::BuildCreateDbReq(const char* dbname, int32_t* pContLen) { + int32_t contLen = sizeof(SCreateDbReq); - SCreateDbReq* pReq = (SCreateDbReq*)rpcMallocCont(contLen); - strcpy(pReq->db, "1.d1"); - pReq->numOfVgroups = htonl(2); - pReq->cacheBlockSize = htonl(16); - pReq->totalBlocks = htonl(10); - pReq->daysPerFile = htonl(10); - pReq->daysToKeep0 = htonl(3650); - pReq->daysToKeep1 = htonl(3650); - pReq->daysToKeep2 = htonl(3650); - pReq->minRows = htonl(100); - pReq->maxRows = htonl(4096); - pReq->commitTime = htonl(3600); - pReq->fsyncPeriod = htonl(3000); - pReq->walLevel = 1; - pReq->precision = 0; - pReq->compression = 2; - pReq->replications = 1; - pReq->quorum = 1; - pReq->update = 0; - pReq->cacheLastRow = 0; - pReq->ignoreExist = 1; + SCreateDbReq* pReq = (SCreateDbReq*)rpcMallocCont(contLen); + strcpy(pReq->db, dbname); + pReq->numOfVgroups = htonl(2); + pReq->cacheBlockSize = htonl(16); + pReq->totalBlocks = htonl(10); + pReq->daysPerFile = htonl(10); + pReq->daysToKeep0 = htonl(3650); + pReq->daysToKeep1 = htonl(3650); + pReq->daysToKeep2 = htonl(3650); + pReq->minRows = htonl(100); + pReq->maxRows = htonl(4096); + pReq->commitTime = htonl(3600); + pReq->fsyncPeriod = htonl(3000); + pReq->walLevel = 1; + pReq->precision = 0; + pReq->compression = 2; + pReq->replications = 1; + pReq->quorum = 1; + pReq->update = 0; + pReq->cacheLastRow = 0; + pReq->ignoreExist = 1; - SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_DB, pReq, contLen); - ASSERT_NE(pRsp, nullptr); - ASSERT_EQ(pRsp->code, 0); - } - - { - int32_t cols = 2; - int32_t tags = 3; - int32_t contLen = (tags + cols) * sizeof(SSchema) + sizeof(SMCreateStbReq); - - SMCreateStbReq* pReq = (SMCreateStbReq*)rpcMallocCont(contLen); - strcpy(pReq->name, "1.d1.stb"); - pReq->numOfTags = htonl(tags); - pReq->numOfColumns = htonl(cols); - - { - SSchema* pSchema = &pReq->pSchemas[0]; - pSchema->bytes = htonl(8); - pSchema->type = TSDB_DATA_TYPE_TIMESTAMP; - strcpy(pSchema->name, "ts"); - } - - { - SSchema* pSchema = &pReq->pSchemas[1]; - pSchema->bytes = htonl(4); - pSchema->type = TSDB_DATA_TYPE_INT; - strcpy(pSchema->name, "col1"); - } - - { - SSchema* pSchema = &pReq->pSchemas[2]; - pSchema->bytes = htonl(2); - pSchema->type = TSDB_DATA_TYPE_TINYINT; - strcpy(pSchema->name, "tag1"); - } - - { - SSchema* pSchema = &pReq->pSchemas[3]; - pSchema->bytes = htonl(8); - pSchema->type = TSDB_DATA_TYPE_BIGINT; - strcpy(pSchema->name, "tag2"); - } - - { - SSchema* pSchema = &pReq->pSchemas[4]; - pSchema->bytes = htonl(16); - pSchema->type = TSDB_DATA_TYPE_BINARY; - strcpy(pSchema->name, "tag3"); - } - - SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_STB, pReq, contLen); - ASSERT_NE(pRsp, nullptr); - ASSERT_EQ(pRsp->code, 0); - } - - test.SendShowMetaReq(TSDB_MGMT_TABLE_STB, "1.d1"); - CHECK_META("show stables", 4); - - CHECK_SCHEMA(0, TSDB_DATA_TYPE_BINARY, TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE, "name"); - CHECK_SCHEMA(1, TSDB_DATA_TYPE_TIMESTAMP, 8, "create_time"); - CHECK_SCHEMA(2, TSDB_DATA_TYPE_INT, 4, "columns"); - CHECK_SCHEMA(3, TSDB_DATA_TYPE_INT, 4, "tags"); - - test.SendShowRetrieveReq(); - EXPECT_EQ(test.GetShowRows(), 1); - CheckBinary("stb", TSDB_TABLE_NAME_LEN); - CheckTimestamp(); - CheckInt32(2); - CheckInt32(3); - - // ----- meta ------ - { - int32_t contLen = sizeof(STableInfoReq); - - STableInfoReq* pReq = (STableInfoReq*)rpcMallocCont(contLen); - strcpy(pReq->dbFName, "1.d1"); - strcpy(pReq->tbName, "stb"); - - SRpcMsg* pMsg = test.SendReq(TDMT_MND_STB_META, pReq, contLen); - ASSERT_NE(pMsg, nullptr); - ASSERT_EQ(pMsg->code, 0); - - STableMetaRsp* pRsp = (STableMetaRsp*)pMsg->pCont; - pRsp->numOfTags = htonl(pRsp->numOfTags); - pRsp->numOfColumns = htonl(pRsp->numOfColumns); - pRsp->sversion = htonl(pRsp->sversion); - pRsp->tversion = htonl(pRsp->tversion); - pRsp->suid = be64toh(pRsp->suid); - pRsp->tuid = be64toh(pRsp->tuid); - pRsp->vgId = be64toh(pRsp->vgId); - for (int32_t i = 0; i < pRsp->numOfTags + pRsp->numOfColumns; ++i) { - SSchema* pSchema = &pRsp->pSchema[i]; - pSchema->colId = htonl(pSchema->colId); - pSchema->bytes = htonl(pSchema->bytes); - } - - EXPECT_STREQ(pRsp->dbFName, "1.d1"); - EXPECT_STREQ(pRsp->tbName, "stb"); - EXPECT_STREQ(pRsp->stbName, "stb"); - EXPECT_EQ(pRsp->numOfColumns, 2); - EXPECT_EQ(pRsp->numOfTags, 3); - EXPECT_EQ(pRsp->precision, TSDB_TIME_PRECISION_MILLI); - EXPECT_EQ(pRsp->tableType, TSDB_SUPER_TABLE); - EXPECT_EQ(pRsp->update, 0); - EXPECT_EQ(pRsp->sversion, 1); - EXPECT_EQ(pRsp->tversion, 0); - EXPECT_GT(pRsp->suid, 0); - EXPECT_GT(pRsp->tuid, 0); - EXPECT_EQ(pRsp->vgId, 0); - - { - SSchema* pSchema = &pRsp->pSchema[0]; - EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_TIMESTAMP); - EXPECT_EQ(pSchema->colId, 1); - EXPECT_EQ(pSchema->bytes, 8); - EXPECT_STREQ(pSchema->name, "ts"); - } - - { - SSchema* pSchema = &pRsp->pSchema[1]; - EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_INT); - EXPECT_EQ(pSchema->colId, 2); - EXPECT_EQ(pSchema->bytes, 4); - EXPECT_STREQ(pSchema->name, "col1"); - } - - { - SSchema* pSchema = &pRsp->pSchema[2]; - EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_TINYINT); - EXPECT_EQ(pSchema->colId, 3); - EXPECT_EQ(pSchema->bytes, 2); - EXPECT_STREQ(pSchema->name, "tag1"); - } - - { - SSchema* pSchema = &pRsp->pSchema[3]; - EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_BIGINT); - EXPECT_EQ(pSchema->colId, 4); - EXPECT_EQ(pSchema->bytes, 8); - EXPECT_STREQ(pSchema->name, "tag2"); - } - - { - SSchema* pSchema = &pRsp->pSchema[4]; - EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_BINARY); - EXPECT_EQ(pSchema->colId, 5); - EXPECT_EQ(pSchema->bytes, 16); - EXPECT_STREQ(pSchema->name, "tag3"); - } - } - - // restart - test.Restart(); - - test.SendShowMetaReq(TSDB_MGMT_TABLE_STB, "1.d1"); - CHECK_META("show stables", 4); - test.SendShowRetrieveReq(); - EXPECT_EQ(test.GetShowRows(), 1); - - CheckBinary("stb", TSDB_TABLE_NAME_LEN); - CheckTimestamp(); - CheckInt32(2); - CheckInt32(3); - - { - int32_t contLen = sizeof(SMDropStbReq); - - SMDropStbReq* pReq = (SMDropStbReq*)rpcMallocCont(contLen); - strcpy(pReq->name, "1.d1.stb"); - - SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_STB, pReq, contLen); - ASSERT_NE(pRsp, nullptr); - ASSERT_EQ(pRsp->code, 0); - } - - test.SendShowMetaReq(TSDB_MGMT_TABLE_STB, "1.d1"); - CHECK_META("show stables", 4); - test.SendShowRetrieveReq(); - EXPECT_EQ(test.GetShowRows(), 0); + *pContLen = contLen; + return pReq; +} + +SMCreateStbReq* MndTestStb::BuildCreateStbReq(const char *stbname, int32_t* pContLen) { + int32_t cols = 2; + int32_t tags = 3; + int32_t contLen = (tags + cols) * sizeof(SSchema) + sizeof(SMCreateStbReq); + + SMCreateStbReq* pReq = (SMCreateStbReq*)rpcMallocCont(contLen); + strcpy(pReq->name, stbname); + pReq->numOfTags = htonl(tags); + pReq->numOfColumns = htonl(cols); + + { + SSchema* pSchema = &pReq->pSchemas[0]; + pSchema->bytes = htonl(8); + pSchema->type = TSDB_DATA_TYPE_TIMESTAMP; + strcpy(pSchema->name, "ts"); + } + + { + SSchema* pSchema = &pReq->pSchemas[1]; + pSchema->bytes = htonl(4); + pSchema->type = TSDB_DATA_TYPE_INT; + strcpy(pSchema->name, "col1"); + } + + { + SSchema* pSchema = &pReq->pSchemas[2]; + pSchema->bytes = htonl(2); + pSchema->type = TSDB_DATA_TYPE_TINYINT; + strcpy(pSchema->name, "tag1"); + } + + { + SSchema* pSchema = &pReq->pSchemas[3]; + pSchema->bytes = htonl(8); + pSchema->type = TSDB_DATA_TYPE_BIGINT; + strcpy(pSchema->name, "tag2"); + } + + { + SSchema* pSchema = &pReq->pSchemas[4]; + pSchema->bytes = htonl(16); + pSchema->type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema->name, "tag3"); + } + + *pContLen = contLen; + return pReq; +} + +// TEST_F(MndTestStb, 01_Create_Show_Meta_Drop_Restart_Stb) { +// const char *dbname = "1.d1"; +// const char *stbname = "1.d1.stb"; + +// { +// int32_t contLen = 0; +// SCreateDbReq* pReq = BuildCreateDbReq(dbname, &contLen); +// SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_DB, pReq, contLen); +// ASSERT_NE(pRsp, nullptr); +// ASSERT_EQ(pRsp->code, 0); +// } + +// { +// int32_t contLen = 0; +// SMCreateStbReq* pReq = BuildCreateStbReq(stbname, &contLen); +// SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_STB, pReq, contLen); +// ASSERT_NE(pRsp, nullptr); +// ASSERT_EQ(pRsp->code, 0); +// } + +// { +// test.SendShowMetaReq(TSDB_MGMT_TABLE_STB, dbname); +// CHECK_META("show stables", 4); +// CHECK_SCHEMA(0, TSDB_DATA_TYPE_BINARY, TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE, "name"); +// CHECK_SCHEMA(1, TSDB_DATA_TYPE_TIMESTAMP, 8, "create_time"); +// CHECK_SCHEMA(2, TSDB_DATA_TYPE_INT, 4, "columns"); +// CHECK_SCHEMA(3, TSDB_DATA_TYPE_INT, 4, "tags"); + +// test.SendShowRetrieveReq(); +// EXPECT_EQ(test.GetShowRows(), 1); +// CheckBinary("stb", TSDB_TABLE_NAME_LEN); +// CheckTimestamp(); +// CheckInt32(2); +// CheckInt32(3); +// } + +// // ----- meta ------ +// { +// int32_t contLen = sizeof(STableInfoReq); +// STableInfoReq* pReq = (STableInfoReq*)rpcMallocCont(contLen); +// strcpy(pReq->dbFName, dbname); +// strcpy(pReq->tbName, "stb"); + +// SRpcMsg* pMsg = test.SendReq(TDMT_MND_STB_META, pReq, contLen); +// ASSERT_NE(pMsg, nullptr); +// ASSERT_EQ(pMsg->code, 0); + +// STableMetaRsp* pRsp = (STableMetaRsp*)pMsg->pCont; +// pRsp->numOfTags = htonl(pRsp->numOfTags); +// pRsp->numOfColumns = htonl(pRsp->numOfColumns); +// pRsp->sversion = htonl(pRsp->sversion); +// pRsp->tversion = htonl(pRsp->tversion); +// pRsp->suid = be64toh(pRsp->suid); +// pRsp->tuid = be64toh(pRsp->tuid); +// pRsp->vgId = be64toh(pRsp->vgId); +// for (int32_t i = 0; i < pRsp->numOfTags + pRsp->numOfColumns; ++i) { +// SSchema* pSchema = &pRsp->pSchema[i]; +// pSchema->colId = htonl(pSchema->colId); +// pSchema->bytes = htonl(pSchema->bytes); +// } + +// EXPECT_STREQ(pRsp->dbFName, dbname); +// EXPECT_STREQ(pRsp->tbName, "stb"); +// EXPECT_STREQ(pRsp->stbName, "stb"); +// EXPECT_EQ(pRsp->numOfColumns, 2); +// EXPECT_EQ(pRsp->numOfTags, 3); +// EXPECT_EQ(pRsp->precision, TSDB_TIME_PRECISION_MILLI); +// EXPECT_EQ(pRsp->tableType, TSDB_SUPER_TABLE); +// EXPECT_EQ(pRsp->update, 0); +// EXPECT_EQ(pRsp->sversion, 1); +// EXPECT_EQ(pRsp->tversion, 0); +// EXPECT_GT(pRsp->suid, 0); +// EXPECT_GT(pRsp->tuid, 0); +// EXPECT_EQ(pRsp->vgId, 0); + +// { +// SSchema* pSchema = &pRsp->pSchema[0]; +// EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_TIMESTAMP); +// EXPECT_EQ(pSchema->colId, 1); +// EXPECT_EQ(pSchema->bytes, 8); +// EXPECT_STREQ(pSchema->name, "ts"); +// } + +// { +// SSchema* pSchema = &pRsp->pSchema[1]; +// EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_INT); +// EXPECT_EQ(pSchema->colId, 2); +// EXPECT_EQ(pSchema->bytes, 4); +// EXPECT_STREQ(pSchema->name, "col1"); +// } + +// { +// SSchema* pSchema = &pRsp->pSchema[2]; +// EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_TINYINT); +// EXPECT_EQ(pSchema->colId, 3); +// EXPECT_EQ(pSchema->bytes, 2); +// EXPECT_STREQ(pSchema->name, "tag1"); +// } + +// { +// SSchema* pSchema = &pRsp->pSchema[3]; +// EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_BIGINT); +// EXPECT_EQ(pSchema->colId, 4); +// EXPECT_EQ(pSchema->bytes, 8); +// EXPECT_STREQ(pSchema->name, "tag2"); +// } + +// { +// SSchema* pSchema = &pRsp->pSchema[4]; +// EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_BINARY); +// EXPECT_EQ(pSchema->colId, 5); +// EXPECT_EQ(pSchema->bytes, 16); +// EXPECT_STREQ(pSchema->name, "tag3"); +// } +// } + +// // restart +// test.Restart(); + +// { +// test.SendShowMetaReq(TSDB_MGMT_TABLE_STB, dbname); +// CHECK_META("show stables", 4); +// test.SendShowRetrieveReq(); +// EXPECT_EQ(test.GetShowRows(), 1); + +// CheckBinary("stb", TSDB_TABLE_NAME_LEN); +// CheckTimestamp(); +// CheckInt32(2); +// CheckInt32(3); +// } + +// { +// int32_t contLen = sizeof(SMDropStbReq); + +// SMDropStbReq* pReq = (SMDropStbReq*)rpcMallocCont(contLen); +// strcpy(pReq->name, stbname); + +// SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_STB, pReq, contLen); +// ASSERT_NE(pRsp, nullptr); +// ASSERT_EQ(pRsp->code, 0); +// } + +// test.SendShowMetaReq(TSDB_MGMT_TABLE_STB, dbname); +// CHECK_META("show stables", 4); +// test.SendShowRetrieveReq(); +// EXPECT_EQ(test.GetShowRows(), 0); +// } + +SMAltertbReq* MndTestStb::BuildAlterStbAddTagReq(const char* stbname, int32_t* pContLen) { + int32_t contLen = sizeof(SMAltertbReq) + sizeof(SSchema); + SMAltertbReq* pReq = (SMAltertbReq*)rpcMallocCont(contLen); + strcpy(pReq->name, stbname); + pReq->numOfSchemas = htonl(1); + pReq->alterType = TSDB_ALTER_TABLE_ADD_TAG_COLUMN; + + SSchema* pSchema = &pReq->pSchemas[0]; + pSchema->bytes = htonl(4); + pSchema->type = TSDB_DATA_TYPE_INT; + strcpy(pSchema->name, "tag4"); + + *pContLen = contLen; + return pReq; +} + +TEST_F(MndTestStb, 01_Alter_Stb) { + const char *dbname = "1.d2"; + const char *stbname = "1.d2.stb"; + + { + int32_t contLen = 0; + SCreateDbReq* pReq = BuildCreateDbReq(dbname, &contLen); + SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_DB, pReq, contLen); + ASSERT_NE(pRsp, nullptr); + ASSERT_EQ(pRsp->code, 0); + } + + { + int32_t contLen = 0; + SMCreateStbReq* pReq = BuildCreateStbReq(stbname, &contLen); + SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_STB, pReq, contLen); + ASSERT_NE(pRsp, nullptr); + ASSERT_EQ(pRsp->code, 0); + } + + { + int32_t contLen = 0; + SMAltertbReq* pReq = BuildAlterStbAddTagReq(stbname, &contLen); + SRpcMsg* pRsp = test.SendReq(TDMT_MND_ALTER_STB, pReq, contLen); + ASSERT_NE(pRsp, nullptr); + ASSERT_EQ(pRsp->code, 0); + + test.SendShowMetaReq(TSDB_MGMT_TABLE_STB, dbname); + test.SendShowRetrieveReq(); + EXPECT_EQ(test.GetShowRows(), 1); + CheckBinary("stb", TSDB_TABLE_NAME_LEN); + CheckTimestamp(); + CheckInt32(2); + CheckInt32(4); + } } diff --git a/source/dnode/vnode/src/vnd/vnodeWrite.c b/source/dnode/vnode/src/vnd/vnodeWrite.c index e538bc85d4..28487821e6 100644 --- a/source/dnode/vnode/src/vnd/vnodeWrite.c +++ b/source/dnode/vnode/src/vnd/vnodeWrite.c @@ -106,7 +106,11 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { break; case TDMT_VND_ALTER_STB: - vTrace("vgId:%d, process drop stb req", pVnode->vgId); + vTrace("vgId:%d, process alter stb req", pVnode->vgId); + tDeserializeSVCreateTbReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vCreateTbReq); + free(vCreateTbReq.stbCfg.pSchema); + free(vCreateTbReq.stbCfg.pTagSchema); + free(vCreateTbReq.name); break; case TDMT_VND_DROP_STB: vTrace("vgId:%d, process drop stb req", pVnode->vgId); From c901a21a8c800ec888c1e9cb250481cfdb9e4188 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 8 Feb 2022 21:23:43 +0800 Subject: [PATCH 3/3] enhance interface --- source/libs/transport/inc/transComm.h | 10 +- source/libs/transport/src/rpcMain.c | 4 +- source/libs/transport/src/trans.c | 34 +++- source/libs/transport/src/transCli.c | 69 ++++++- source/libs/transport/src/transSrv.c | 24 ++- source/libs/transport/test/CMakeLists.txt | 19 +- source/libs/transport/test/rclient.c | 1 - source/libs/transport/test/syncClient.c | 220 ++++++++++++++++++++++ source/libs/transport/test/transUT.cc | 21 +++ 9 files changed, 380 insertions(+), 22 deletions(-) create mode 100644 source/libs/transport/test/syncClient.c diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 082c89fed4..846f2d5099 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -134,10 +134,12 @@ typedef struct { // int16_t numOfTry; // number of try for different servers // int8_t oldInUse; // server EP inUse passed by app // int8_t redirect; // flag to indicate redirect - int8_t connType; // connection type - int64_t rid; // refId returned by taosAddRef - SRpcMsg* pRsp; // for synchronous API - tsem_t* pSem; // for synchronous API + int8_t connType; // connection type + int64_t rid; // refId returned by taosAddRef + + SRpcMsg* pRsp; // for synchronous API + tsem_t* pSem; // for synchronous API + char* ip; uint32_t port; // SEpSet* pSet; // for synchronous API diff --git a/source/libs/transport/src/rpcMain.c b/source/libs/transport/src/rpcMain.c index a286482fc1..d8ef0462fb 100644 --- a/source/libs/transport/src/rpcMain.c +++ b/source/libs/transport/src/rpcMain.c @@ -813,8 +813,8 @@ static SRpcConn *rpcSetupConnToServer(SRpcReqContext *pContext) { SRpcInfo *pRpc = pContext->pRpc; SEpSet * pEpSet = &pContext->epSet; - pConn = - rpcGetConnFromCache(pRpc->pCache, pEpSet->eps[pEpSet->inUse].fqdn, pEpSet->eps[pEpSet->inUse].port, pContext->connType); + pConn = rpcGetConnFromCache(pRpc->pCache, pEpSet->eps[pEpSet->inUse].fqdn, pEpSet->eps[pEpSet->inUse].port, + pContext->connType); if (pConn == NULL || pConn->user[0] == 0) { pConn = rpcOpenConn(pRpc, pEpSet->eps[pEpSet->inUse].fqdn, pEpSet->eps[pEpSet->inUse].port, pContext->connType); } diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index 91f9a8ead2..a6040a3873 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -63,17 +63,41 @@ void rpcFreeCont(void* cont) { } free((char*)cont - TRANS_MSG_OVERHEAD); } -void* rpcReallocCont(void* ptr, int contLen) { return NULL; } +void* rpcReallocCont(void* ptr, int contLen) { + if (ptr == NULL) { + return rpcMallocCont(contLen); + } + char* st = (char*)ptr - TRANS_MSG_OVERHEAD; + int sz = contLen + TRANS_MSG_OVERHEAD; + st = realloc(st, sz); + if (st == NULL) { + return NULL; + } + return st + TRANS_MSG_OVERHEAD; +} + +void rpcSendRedirectRsp(void* thandle, const SEpSet* pEpSet) { + SRpcMsg rpcMsg; + memset(&rpcMsg, 0, sizeof(rpcMsg)); + + rpcMsg.contLen = sizeof(SEpSet); + rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen); + if (rpcMsg.pCont == NULL) return; + + memcpy(rpcMsg.pCont, pEpSet, sizeof(SEpSet)); + + rpcMsg.code = TSDB_CODE_RPC_REDIRECT; + rpcMsg.handle = thandle; + + rpcSendResponse(&rpcMsg); +} -void rpcSendRedirectRsp(void* pConn, const SEpSet* pEpSet) {} -int rpcGetConnInfo(void* thandle, SRpcConnInfo* pInfo) { return -1; } -void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pReq, SRpcMsg* pRsp) { return; } int rpcReportProgress(void* pConn, char* pCont, int contLen) { return -1; } void rpcCancelRequest(int64_t rid) { return; } int32_t rpcInit(void) { // impl later - return -1; + return 0; } void rpcCleanup(void) { diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 24ff5e956a..3d93049c6a 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -123,9 +123,14 @@ static void clientHandleResp(SCliConn* conn) { rpcMsg.code = pHead->code; rpcMsg.msgType = pHead->msgType; rpcMsg.ahandle = pCtx->ahandle; - - tDebug("conn %p handle resp", conn); - (pRpc->cfp)(NULL, &rpcMsg, NULL); + if (pCtx->pSem == NULL) { + tDebug("conn %p handle resp", conn); + (pRpc->cfp)(NULL, &rpcMsg, NULL); + } else { + tDebug("conn %p handle resp", conn); + memcpy((char*)pCtx->pRsp, (char*)&rpcMsg, sizeof(rpcMsg)); + tsem_post(pCtx->pSem); + } conn->notifyCount += 1; // buf's mem alread translated to rpcMsg.pCont @@ -159,14 +164,20 @@ static void clientHandleExcept(SCliConn* pConn) { SRpcMsg rpcMsg = {0}; rpcMsg.ahandle = pCtx->ahandle; rpcMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL; - // SRpcInfo* pRpc = pMsg->ctx->pRpc; - (pCtx->pTransInst->cfp)(NULL, &rpcMsg, NULL); - pConn->notifyCount += 1; + if (pCtx->pSem == NULL) { + // SRpcInfo* pRpc = pMsg->ctx->pRpc; + (pCtx->pTransInst->cfp)(NULL, &rpcMsg, NULL); + } else { + memcpy((char*)(pCtx->pRsp), (char*)(&rpcMsg), sizeof(rpcMsg)); + // SRpcMsg rpcMsg + tsem_post(pCtx->pSem); + } destroyCmsg(pMsg); pConn->data = NULL; // transDestroyConnCtx(pCtx); clientConnDestroy(pConn, true); + pConn->notifyCount += 1; } static void clientTimeoutCb(uv_timer_t* handle) { @@ -463,6 +474,7 @@ static void clientAsyncCb(uv_async_t* handle) { static void* clientThread(void* arg) { SCliThrdObj* pThrd = (SCliThrdObj*)arg; + setThreadName("trans-client-work"); uv_run(pThrd->loop, UV_RUN_DEFAULT); } @@ -568,8 +580,8 @@ void taosCloseClient(void* arg) { } void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) { // impl later - char* ip = (char*)(pEpSet->fqdn[pEpSet->inUse]); - uint32_t port = pEpSet->port[pEpSet->inUse]; + char* ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn); + uint32_t port = pEpSet->eps[pEpSet->inUse].port; SRpcInfo* pRpc = (SRpcInfo*)shandle; @@ -609,4 +621,45 @@ void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* // int end = taosGetTimestampUs() - start; // tError("client sent to rpc, time cost: %d", (int)end); } +void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pReq, SRpcMsg* pRsp) { + char* ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn); + uint32_t port = pEpSet->eps[pEpSet->inUse].port; + + SRpcInfo* pRpc = (SRpcInfo*)shandle; + + STransConnCtx* pCtx = calloc(1, sizeof(STransConnCtx)); + pCtx->pTransInst = (SRpcInfo*)shandle; + pCtx->ahandle = pReq->ahandle; + pCtx->msgType = pReq->msgType; + pCtx->ip = strdup(ip); + pCtx->port = port; + pCtx->pSem = calloc(1, sizeof(tsem_t)); + pCtx->pRsp = pRsp; + tsem_init(pCtx->pSem, 0, 0); + + int64_t index = pRpc->index; + if (pRpc->index++ >= pRpc->numOfThreads) { + pRpc->index = 0; + } + SCliMsg* cliMsg = malloc(sizeof(SCliMsg)); + cliMsg->ctx = pCtx; + cliMsg->msg = *pReq; + cliMsg->st = taosGetTimestampUs(); + + SCliThrdObj* thrd = ((SClientObj*)pRpc->tcphandle)->pThreadObj[index % pRpc->numOfThreads]; + + // pthread_mutex_lock(&thrd->msgMtx); + // QUEUE_PUSH(&thrd->msg, &cliMsg->q); + // pthread_mutex_unlock(&thrd->msgMtx); + + // int start = taosGetTimestampUs(); + transSendAsync(thrd->asyncPool, &(cliMsg->q)); + + tsem_t* pSem = pCtx->pSem; + tsem_wait(pSem); + tsem_destroy(pSem); + free(pSem); + + return; +} #endif diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index a005b31fe4..4d2ac434dd 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -33,6 +33,8 @@ typedef struct SSrvConn { void* hostThrd; void* pSrvMsg; + struct sockaddr peername; + // SRpcMsg sendMsg; // del later char secured; @@ -487,7 +489,13 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { uv_os_fd_t fd; uv_fileno((const uv_handle_t*)pConn->pTcp, &fd); tDebug("conn %p created, fd: %d", pConn, fd); - uv_read_start((uv_stream_t*)(pConn->pTcp), uvAllocReadBufferCb, uvOnReadCb); + int namelen = sizeof(pConn->peername); + if (0 != uv_tcp_getpeername(pConn->pTcp, &pConn->peername, &namelen)) { + tError("failed to get peer name"); + destroyConn(pConn, true); + } else { + uv_read_start((uv_stream_t*)(pConn->pTcp), uvAllocReadBufferCb, uvOnReadCb); + } } else { tDebug("failed to create new connection"); destroyConn(pConn, true); @@ -496,6 +504,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { void* acceptThread(void* arg) { // opt + setThreadName("trans-accept"); SServerObj* srv = (SServerObj*)arg; uv_run(srv->loop, UV_RUN_DEFAULT); } @@ -548,6 +557,7 @@ static bool addHandleToAcceptloop(void* arg) { return true; } void* workerThread(void* arg) { + setThreadName("trans-worker"); SWorkThrdObj* pThrd = (SWorkThrdObj*)arg; uv_run(pThrd->loop, UV_RUN_DEFAULT); } @@ -723,4 +733,16 @@ void rpcSendResponse(const SRpcMsg* pMsg) { // uv_async_send(pThrd->workerAsync); } +int rpcGetConnInfo(void* thandle, SRpcConnInfo* pInfo) { + SSrvConn* pConn = thandle; + struct sockaddr* pPeerName = &pConn->peername; + + struct sockaddr_in caddr = *(struct sockaddr_in*)(pPeerName); + pInfo->clientIp = (uint32_t)(caddr.sin_addr.s_addr); + pInfo->clientPort = ntohs(caddr.sin_port); + + tstrncpy(pInfo->user, pConn->user, sizeof(pInfo->user)); + return 0; +} + #endif diff --git a/source/libs/transport/test/CMakeLists.txt b/source/libs/transport/test/CMakeLists.txt index 3d9c396336..3c9c40f46a 100644 --- a/source/libs/transport/test/CMakeLists.txt +++ b/source/libs/transport/test/CMakeLists.txt @@ -2,6 +2,7 @@ add_executable(transportTest "") add_executable(client "") add_executable(server "") add_executable(transUT "") +add_executable(syncClient "") target_sources(transUT PRIVATE @@ -20,6 +21,10 @@ target_sources (server PRIVATE "rserver.c" ) +target_sources (syncClient + PRIVATE + "syncClient.c" +) target_include_directories(transportTest PUBLIC @@ -67,7 +72,6 @@ target_include_directories(transUT "${CMAKE_CURRENT_SOURCE_DIR}/../inc" ) - target_link_libraries (server os util @@ -75,4 +79,17 @@ target_link_libraries (server gtest_main transport ) +target_include_directories(syncClient + PUBLIC + "${CMAKE_SOURCE_DIR}/include/libs/transport" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) +target_link_libraries (syncClient + os + util + common + gtest_main + transport +) + diff --git a/source/libs/transport/test/rclient.c b/source/libs/transport/test/rclient.c index 308b7b54bd..4e29c02508 100644 --- a/source/libs/transport/test/rclient.c +++ b/source/libs/transport/test/rclient.c @@ -33,7 +33,6 @@ typedef struct { pthread_t thread; void * pRpc; } SInfo; - static void processResponse(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { SInfo *pInfo = (SInfo *)pMsg->ahandle; tDebug("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen, diff --git a/source/libs/transport/test/syncClient.c b/source/libs/transport/test/syncClient.c new file mode 100644 index 0000000000..c5d7f5664a --- /dev/null +++ b/source/libs/transport/test/syncClient.c @@ -0,0 +1,220 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#include + +#include +#include "os.h" +#include "rpcLog.h" +#include "taoserror.h" +#include "tglobal.h" +#include "trpc.h" +#include "tutil.h" + +typedef struct { + int index; + SEpSet epSet; + int num; + int numOfReqs; + int msgSize; + tsem_t rspSem; + tsem_t * pOverSem; + pthread_t thread; + void * pRpc; +} SInfo; +static void processResponse(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { + SInfo *pInfo = (SInfo *)pMsg->ahandle; + tDebug("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen, + pMsg->code); + + if (pEpSet) pInfo->epSet = *pEpSet; + + rpcFreeCont(pMsg->pCont); + // tsem_post(&pInfo->rspSem); + tsem_post(&pInfo->rspSem); +} + +static int tcount = 0; + +static void *sendRequest(void *param) { + SInfo * pInfo = (SInfo *)param; + SRpcMsg rpcMsg = {0}; + + tDebug("thread:%d, start to send request", pInfo->index); + + tDebug("thread:%d, reqs: %d", pInfo->index, pInfo->numOfReqs); + int u100 = 0; + int u500 = 0; + int u1000 = 0; + int u10000 = 0; + SRpcMsg respMsg = {0}; + while (pInfo->numOfReqs == 0 || pInfo->num < pInfo->numOfReqs) { + pInfo->num++; + rpcMsg.pCont = rpcMallocCont(pInfo->msgSize); + rpcMsg.contLen = pInfo->msgSize; + rpcMsg.ahandle = pInfo; + rpcMsg.msgType = 1; + // tDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num); + int64_t start = taosGetTimestampUs(); + rpcSendRecv(pInfo->pRpc, &pInfo->epSet, &rpcMsg, &respMsg); + // rpcSendRequest(pInfo->pRpc, &pInfo->epSet, &rpcMsg, NULL); + if (pInfo->num % 20000 == 0) tInfo("thread:%d, %d requests have been sent", pInfo->index, pInfo->num); + // tsem_wait(&pInfo->rspSem); + // wtsem_wait(&pInfo->rspSem); + int64_t end = taosGetTimestampUs() - start; + if (end <= 100) { + u100++; + } else if (end > 100 && end <= 500) { + u500++; + } else if (end > 500 && end < 1000) { + u1000++; + } else { + u10000++; + } + + tDebug("recv response succefully"); + + // usleep(100000000); + } + + tError("send and recv sum: %d, %d, %d, %d", u100, u500, u1000, u10000); + tDebug("thread:%d, it is over", pInfo->index); + tcount++; + + return NULL; +} + +int main(int argc, char *argv[]) { + SRpcInit rpcInit; + SEpSet epSet = {0}; + int msgSize = 128; + int numOfReqs = 0; + int appThreads = 1; + char serverIp[40] = "127.0.0.1"; + char secret[20] = "mypassword"; + struct timeval systemTime; + int64_t startTime, endTime; + pthread_attr_t thattr; + + // server info + epSet.inUse = 0; + addEpIntoEpSet(&epSet, serverIp, 7000); + addEpIntoEpSet(&epSet, "192.168.0.1", 7000); + + // client info + memset(&rpcInit, 0, sizeof(rpcInit)); + rpcInit.localPort = 0; + rpcInit.label = "APP"; + rpcInit.numOfThreads = 1; + rpcInit.cfp = processResponse; + rpcInit.sessions = 100; + rpcInit.idleTime = 100; + rpcInit.user = "michael"; + rpcInit.secret = secret; + rpcInit.ckey = "key"; + rpcInit.spi = 1; + rpcInit.connType = TAOS_CONN_CLIENT; + + for (int i = 1; i < argc; ++i) { + if (strcmp(argv[i], "-p") == 0 && i < argc - 1) { + epSet.eps[0].port = atoi(argv[++i]); + } else if (strcmp(argv[i], "-i") == 0 && i < argc - 1) { + tstrncpy(epSet.eps[0].fqdn, argv[++i], sizeof(epSet.eps[0].fqdn)); + } else if (strcmp(argv[i], "-t") == 0 && i < argc - 1) { + rpcInit.numOfThreads = atoi(argv[++i]); + } else if (strcmp(argv[i], "-m") == 0 && i < argc - 1) { + msgSize = atoi(argv[++i]); + } else if (strcmp(argv[i], "-s") == 0 && i < argc - 1) { + rpcInit.sessions = atoi(argv[++i]); + } else if (strcmp(argv[i], "-n") == 0 && i < argc - 1) { + numOfReqs = atoi(argv[++i]); + } else if (strcmp(argv[i], "-a") == 0 && i < argc - 1) { + appThreads = atoi(argv[++i]); + } else if (strcmp(argv[i], "-o") == 0 && i < argc - 1) { + tsCompressMsgSize = atoi(argv[++i]); + } else if (strcmp(argv[i], "-u") == 0 && i < argc - 1) { + rpcInit.user = argv[++i]; + } else if (strcmp(argv[i], "-k") == 0 && i < argc - 1) { + rpcInit.secret = argv[++i]; + } else if (strcmp(argv[i], "-spi") == 0 && i < argc - 1) { + rpcInit.spi = atoi(argv[++i]); + } else if (strcmp(argv[i], "-d") == 0 && i < argc - 1) { + rpcDebugFlag = atoi(argv[++i]); + } else { + printf("\nusage: %s [options] \n", argv[0]); + printf(" [-i ip]: first server IP address, default is:%s\n", serverIp); + printf(" [-p port]: server port number, default is:%d\n", epSet.eps[0].port); + printf(" [-t threads]: number of rpc threads, default is:%d\n", rpcInit.numOfThreads); + printf(" [-s sessions]: number of rpc sessions, default is:%d\n", rpcInit.sessions); + printf(" [-m msgSize]: message body size, default is:%d\n", msgSize); + printf(" [-a threads]: number of app threads, default is:%d\n", appThreads); + printf(" [-n requests]: number of requests per thread, default is:%d\n", numOfReqs); + printf(" [-o compSize]: compression message size, default is:%d\n", tsCompressMsgSize); + printf(" [-u user]: user name for the connection, default is:%s\n", rpcInit.user); + printf(" [-k secret]: password for the connection, default is:%s\n", rpcInit.secret); + printf(" [-spi SPI]: security parameter index, default is:%d\n", rpcInit.spi); + printf(" [-d debugFlag]: debug flag, default:%d\n", rpcDebugFlag); + printf(" [-h help]: print out this help\n\n"); + exit(0); + } + } + + taosInitLog("client.log", 100000, 10); + + void *pRpc = rpcOpen(&rpcInit); + if (pRpc == NULL) { + tError("failed to initialize RPC"); + return -1; + } + + tInfo("client is initialized"); + tInfo("threads:%d msgSize:%d requests:%d", appThreads, msgSize, numOfReqs); + + gettimeofday(&systemTime, NULL); + startTime = systemTime.tv_sec * 1000000 + systemTime.tv_usec; + + SInfo *pInfo = (SInfo *)calloc(1, sizeof(SInfo) * appThreads); + + pthread_attr_init(&thattr); + pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); + + for (int i = 0; i < appThreads; ++i) { + pInfo->index = i; + pInfo->epSet = epSet; + pInfo->numOfReqs = numOfReqs; + pInfo->msgSize = msgSize; + tsem_init(&pInfo->rspSem, 0, 0); + pInfo->pRpc = pRpc; + pthread_create(&pInfo->thread, &thattr, sendRequest, pInfo); + pInfo++; + } + + do { + usleep(1); + } while (tcount < appThreads); + + gettimeofday(&systemTime, NULL); + endTime = systemTime.tv_sec * 1000000 + systemTime.tv_usec; + float usedTime = (endTime - startTime) / 1000.0f; // mseconds + + tInfo("it takes %.3f mseconds to send %d requests to server", usedTime, numOfReqs * appThreads); + tInfo("Performance: %.3f requests per second, msgSize:%d bytes", 1000.0 * numOfReqs * appThreads / usedTime, msgSize); + + int ch = getchar(); + UNUSED(ch); + + taosCloseLog(); + + return 0; +} diff --git a/source/libs/transport/test/transUT.cc b/source/libs/transport/test/transUT.cc index 08c683590b..6f80ea42ac 100644 --- a/source/libs/transport/test/transUT.cc +++ b/source/libs/transport/test/transUT.cc @@ -15,6 +15,7 @@ #include #include #include +#include "tep.h" #include "trpc.h" using namespace std; @@ -50,6 +51,25 @@ class TransObj { trans = rpcOpen(&rpcInit); return trans != NULL ? true : false; } + + bool sendAndRecv() { + SEpSet epSet = {0}; + epSet.inUse = 0; + addEpIntoEpSet(&epSet, "192.168.1.1", 7000); + addEpIntoEpSet(&epSet, "192.168.0.1", 7000); + + if (trans == NULL) { + return false; + } + SRpcMsg rpcMsg = {0}, reqMsg = {0}; + reqMsg.pCont = rpcMallocCont(10); + reqMsg.contLen = 10; + reqMsg.ahandle = NULL; + rpcSendRecv(trans, &epSet, &reqMsg, &rpcMsg); + int code = rpcMsg.code; + std::cout << tstrerror(code) << std::endl; + return true; + } bool stop() { rpcClose(trans); trans = NULL; @@ -75,6 +95,7 @@ class TransEnv : public ::testing::Test { }; TEST_F(TransEnv, test_start_stop) { assert(tr->startCli()); + assert(tr->sendAndRecv()); assert(tr->stop()); assert(tr->startSrv());