From 246311d4fcfe20f35a7b3d910393f5a76341ad58 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 16 Feb 2022 15:27:19 +0800 Subject: [PATCH] minor changes --- include/common/tmsg.h | 33 ++++++------------- include/libs/parser/parsenodes.h | 2 +- source/common/src/tmsg.c | 6 ++-- source/dnode/mnode/impl/src/mndProfile.c | 3 -- source/dnode/vnode/inc/tsdb.h | 2 +- source/dnode/vnode/inc/vnode.h | 4 +-- source/dnode/vnode/src/inc/tsdbMemTable.h | 2 +- source/dnode/vnode/src/tq/tq.c | 4 +-- source/dnode/vnode/src/tq/tqRead.c | 2 +- source/dnode/vnode/src/tsdb/tsdbMemTable.c | 12 +++---- source/dnode/vnode/src/tsdb/tsdbWrite.c | 2 +- source/dnode/vnode/src/vnd/vnodeWrite.c | 2 +- source/libs/parser/src/dataBlockMgt.c | 2 +- source/libs/parser/src/insertParser.c | 2 +- source/libs/parser/test/insertParserTest.cpp | 4 +-- source/libs/scheduler/src/scheduler.c | 2 +- source/libs/scheduler/test/schedulerTests.cpp | 2 +- 17 files changed, 35 insertions(+), 51 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index cd327d61ca..015678f193 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -201,18 +201,6 @@ typedef struct SSubmitBlk { char data[]; } SSubmitBlk; -typedef struct { - /* data */ -} SSubmitReq; - -typedef struct { - /* data */ -} SSubmitRsp; - -typedef struct { - /* data */ -} SSubmitReqReader; - // Submit message for this TSDB typedef struct { SMsgHead header; @@ -220,7 +208,7 @@ typedef struct { int32_t length; int32_t numOfBlocks; char blocks[]; -} SSubmitMsg; +} SSubmitReq; typedef struct { int32_t totalLen; @@ -234,7 +222,7 @@ typedef struct { void* pMsg; } SSubmitMsgIter; -int32_t tInitSubmitMsgIter(SSubmitMsg* pMsg, SSubmitMsgIter* pIter); +int32_t tInitSubmitMsgIter(SSubmitReq* pMsg, SSubmitMsgIter* pIter); int32_t tGetSubmitMsgNext(SSubmitMsgIter* pIter, SSubmitBlk** pPBlock); int32_t tInitSubmitBlkIter(SSubmitBlk* pBlock, SSubmitBlkIter* pIter); STSRow* tGetSubmitBlkNext(SSubmitBlkIter* pIter); @@ -244,16 +232,16 @@ typedef struct { int32_t vnode; // vnode index of failed block int32_t sid; // table index of failed block int32_t code; // errorcode while write data to vnode, such as not created, dropped, no space, invalid table -} SShellSubmitRspBlock; +} SSubmitRspBlock; typedef struct { - int32_t code; // 0-success, > 0 error code - int32_t numOfRows; // number of records the client is trying to write - int32_t affectedRows; // number of records actually written - int32_t failedRows; // number of failed records (exclude duplicate records) - int32_t numOfFailedBlocks; - SShellSubmitRspBlock failedBlocks[]; -} SShellSubmitRsp; + int32_t code; // 0-success, > 0 error code + int32_t numOfRows; // number of records the client is trying to write + int32_t affectedRows; // number of records actually written + int32_t failedRows; // number of failed records (exclude duplicate records) + int32_t numOfFailedBlocks; + SSubmitRspBlock failedBlocks[]; +} SSubmitRsp; typedef struct SSchema { int8_t type; @@ -888,7 +876,6 @@ typedef struct { int8_t precision; int8_t compressed; int32_t compLen; - int32_t numOfRows; char data[]; } SRetrieveTableRsp; diff --git a/include/libs/parser/parsenodes.h b/include/libs/parser/parsenodes.h index 292b4ab2c9..6cd5feca24 100644 --- a/include/libs/parser/parsenodes.h +++ b/include/libs/parser/parsenodes.h @@ -135,7 +135,7 @@ typedef struct SVgDataBlocks { SVgroupInfo vg; int32_t numOfTables; // number of tables in current submit block uint32_t size; - char *pData; // SMsgDesc + SSubmitMsg + SSubmitBlk + ... + char *pData; // SMsgDesc + SSubmitReq + SSubmitBlk + ... } SVgDataBlocks; typedef struct SVnodeModifOpStmtInfo { diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index f6af9782ad..d450b0a6ca 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -27,7 +27,7 @@ #undef TD_MSG_SEG_CODE_ #include "tmsgdef.h" -int32_t tInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIter) { +int32_t tInitSubmitMsgIter(SSubmitReq *pMsg, SSubmitMsgIter *pIter) { if (pMsg == NULL) { terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP; return -1; @@ -36,7 +36,7 @@ int32_t tInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIter) { pIter->totalLen = pMsg->length; pIter->len = 0; pIter->pMsg = pMsg; - if (pMsg->length <= sizeof(SSubmitMsg)) { + if (pMsg->length <= sizeof(SSubmitReq)) { terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP; return -1; } @@ -46,7 +46,7 @@ int32_t tInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIter) { int32_t tGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock) { if (pIter->len == 0) { - pIter->len += sizeof(SSubmitMsg); + pIter->len += sizeof(SSubmitReq); } else { SSubmitBlk *pSubmitBlk = (SSubmitBlk *)POINTER_SHIFT(pIter->pMsg, pIter->len); pIter->len += (sizeof(SSubmitBlk) + pSubmitBlk->dataLen + pSubmitBlk->schemaLen); diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c index 566dcff380..2b3bf6ca68 100644 --- a/source/dnode/mnode/impl/src/mndProfile.c +++ b/source/dnode/mnode/impl/src/mndProfile.c @@ -16,14 +16,11 @@ #define _DEFAULT_SOURCE #include "tglobal.h" #include "mndProfile.h" -//#include "mndConsumer.h" #include "mndDb.h" #include "mndStb.h" #include "mndMnode.h" #include "mndShow.h" -//#include "mndTopic.h" #include "mndUser.h" -//#include "mndVgroup.h" #define QUERY_ID_SIZE 20 #define QUERY_OBJ_ID_SIZE 18 diff --git a/source/dnode/vnode/inc/tsdb.h b/source/dnode/vnode/inc/tsdb.h index a2f935683e..7bdd8fc266 100644 --- a/source/dnode/vnode/inc/tsdb.h +++ b/source/dnode/vnode/inc/tsdb.h @@ -83,7 +83,7 @@ typedef struct { STsdb *tsdbOpen(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF, SMeta *pMeta, STfs *pTfs); void tsdbClose(STsdb *); void tsdbRemove(const char *path); -int tsdbInsertData(STsdb *pTsdb, SSubmitMsg *pMsg, SSubmitRsp *pRsp); +int tsdbInsertData(STsdb *pTsdb, SSubmitReq *pMsg, SSubmitRsp *pRsp); int tsdbPrepareCommit(STsdb *pTsdb); int tsdbCommit(STsdb *pTsdb); diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 394076f433..eaf30da1bb 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -72,7 +72,7 @@ typedef struct { int64_t ver; int64_t tbUid; SHashObj *tbIdHash; - const SSubmitMsg *pMsg; + const SSubmitReq *pMsg; SSubmitBlk *pBlock; SSubmitMsgIter msgIter; SSubmitBlkIter blkIter; @@ -225,7 +225,7 @@ static FORCE_INLINE int tqReadHandleSetTbUidList(STqReadHandle *pHandle, const S return 0; } -void tqReadHandleSetMsg(STqReadHandle *pHandle, SSubmitMsg *pMsg, int64_t ver); +void tqReadHandleSetMsg(STqReadHandle *pHandle, SSubmitReq *pMsg, int64_t ver); bool tqNextDataBlock(STqReadHandle *pHandle); int tqRetrieveDataBlockInfo(STqReadHandle *pHandle, SDataBlockInfo *pBlockInfo); // return SArray diff --git a/source/dnode/vnode/src/inc/tsdbMemTable.h b/source/dnode/vnode/src/inc/tsdbMemTable.h index c6fbdb407c..0b9d153239 100644 --- a/source/dnode/vnode/src/inc/tsdbMemTable.h +++ b/source/dnode/vnode/src/inc/tsdbMemTable.h @@ -54,7 +54,7 @@ typedef struct STsdbMemTable { STsdbMemTable *tsdbNewMemTable(STsdb *pTsdb); void tsdbFreeMemTable(STsdb *pTsdb, STsdbMemTable *pMemTable); -int tsdbMemTableInsert(STsdb *pTsdb, STsdbMemTable *pMemTable, SSubmitMsg *pMsg, SShellSubmitRsp *pRsp); +int tsdbMemTableInsert(STsdb *pTsdb, STsdbMemTable *pMemTable, SSubmitReq *pMsg, SSubmitRsp *pRsp); int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, SDataCols *pCols, TKEY *filterKeys, int nFilterKeys, bool keepDup, SMergeInfo *pMergeInfo); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 252bc889f5..bbb5e56533 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -251,7 +251,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { } pHead = pTopic->pReadhandle->pHead; if (pHead->head.msgType == TDMT_VND_SUBMIT) { - SSubmitMsg* pCont = (SSubmitMsg*)&pHead->head.body; + SSubmitReq* pCont = (SSubmitReq*)&pHead->head.body; qTaskInfo_t task = pTopic->buffer.output[pos].task; qSetStreamInput(task, pCont); SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); @@ -397,7 +397,7 @@ int32_t tqProcessConsumeReqV0(STQ* pTq, SRpcMsg* pMsg) { fetchOffset++; } if (skip == 1) continue; - SSubmitMsg* pCont = (SSubmitMsg*)&pHead->head.body; + SSubmitReq* pCont = (SSubmitReq*)&pHead->head.body; qTaskInfo_t task = pTopic->buffer.output[pos].task; printf("current fetch offset %ld\n", fetchOffset); diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 9f76c6b76a..e76d43becd 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -31,7 +31,7 @@ STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta) { return pReadHandle; } -void tqReadHandleSetMsg(STqReadHandle* pReadHandle, SSubmitMsg* pMsg, int64_t ver) { +void tqReadHandleSetMsg(STqReadHandle* pReadHandle, SSubmitReq* pMsg, int64_t ver) { pReadHandle->pMsg = pMsg; pMsg->length = htonl(pMsg->length); pMsg->numOfBlocks = htonl(pMsg->numOfBlocks); diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index b51539f697..01813af556 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -15,7 +15,7 @@ #include "tsdbDef.h" -static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitMsg *pMsg); +static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg); static int tsdbMemTableInsertTbData(STsdb *pRepo, SSubmitBlk *pBlock, int32_t *pAffectedRows); static STbData *tsdbNewTbData(tb_uid_t uid); static void tsdbFreeTbData(STbData *pTbData); @@ -73,7 +73,7 @@ void tsdbFreeMemTable(STsdb *pTsdb, STsdbMemTable *pMemTable) { } } -int tsdbMemTableInsert(STsdb *pTsdb, STsdbMemTable *pMemTable, SSubmitMsg *pMsg, SShellSubmitRsp *pRsp) { +int tsdbMemTableInsert(STsdb *pTsdb, STsdbMemTable *pMemTable, SSubmitReq *pMsg, SSubmitRsp *pRsp) { SSubmitBlk * pBlock = NULL; SSubmitMsgIter msgIter = {0}; int32_t affectedrows = 0, numOfRows = 0; @@ -227,7 +227,7 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey return 0; } -static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitMsg *pMsg) { +static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg) { ASSERT(pMsg != NULL); // STsdbMeta * pMeta = pTsdb->tsdbMeta; SSubmitMsgIter msgIter = {0}; @@ -455,7 +455,7 @@ static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema * /* ------------------------ REFACTORING ------------------------ */ #if 0 -int tsdbInsertDataToMemTable(STsdbMemTable *pMemTable, SSubmitMsg *pMsg) { +int tsdbInsertDataToMemTable(STsdbMemTable *pMemTable, SSubmitReq *pMsg) { SMemAllocator *pMA = pMemTable->pMA; STbData * pTbData = (STbData *)TD_MA_MALLOC(pMA, sizeof(*pTbData)); if (pTbData == NULL) { @@ -496,9 +496,9 @@ static int tsdbAdjustMemMaxTables(SMemTable *pMemTable, int maxTables); static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema **ppSchema, STSRow* row); static int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter); static STSRow* tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter); -static int tsdbScanAndConvertSubmitMsg(STsdbRepo *pRepo, SSubmitMsg *pMsg); +static int tsdbScanAndConvertSubmitMsg(STsdbRepo *pRepo, SSubmitReq *pMsg); static int tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, int32_t *affectedrows); -static int tsdbInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIter); +static int tsdbInitSubmitMsgIter(SSubmitReq *pMsg, SSubmitMsgIter *pIter); static int tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock); static int tsdbCheckTableSchema(STsdbRepo *pRepo, SSubmitBlk *pBlock, STable *pTable); static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, STSRow* row); diff --git a/source/dnode/vnode/src/tsdb/tsdbWrite.c b/source/dnode/vnode/src/tsdb/tsdbWrite.c index 5f937f17e9..78067f8f83 100644 --- a/source/dnode/vnode/src/tsdb/tsdbWrite.c +++ b/source/dnode/vnode/src/tsdb/tsdbWrite.c @@ -15,7 +15,7 @@ #include "tsdbDef.h" -int tsdbInsertData(STsdb *pTsdb, SSubmitMsg *pMsg, SSubmitRsp *pRsp) { +int tsdbInsertData(STsdb *pTsdb, SSubmitReq *pMsg, SSubmitRsp *pRsp) { // Check if mem is there. If not, create one. if (pTsdb->mem == NULL) { pTsdb->mem = tsdbNewMemTable(pTsdb); diff --git a/source/dnode/vnode/src/vnd/vnodeWrite.c b/source/dnode/vnode/src/vnd/vnodeWrite.c index 46e60329ae..5c39c65f9f 100644 --- a/source/dnode/vnode/src/vnd/vnodeWrite.c +++ b/source/dnode/vnode/src/vnd/vnodeWrite.c @@ -109,7 +109,7 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { // } break; case TDMT_VND_SUBMIT: - if (tsdbInsertData(pVnode->pTsdb, (SSubmitMsg *)ptr, NULL) < 0) { + if (tsdbInsertData(pVnode->pTsdb, (SSubmitReq *)ptr, NULL) < 0) { // TODO: handle error } break; diff --git a/source/libs/parser/src/dataBlockMgt.c b/source/libs/parser/src/dataBlockMgt.c index eefaa2c147..381dec4f15 100644 --- a/source/libs/parser/src/dataBlockMgt.c +++ b/source/libs/parser/src/dataBlockMgt.c @@ -417,7 +417,7 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, SB } int32_t mergeTableDataBlocks(SHashObj* pHashObj, int8_t schemaAttached, uint8_t payloadType, SArray** pVgDataBlocks) { - const int INSERT_HEAD_SIZE = sizeof(SSubmitMsg); + const int INSERT_HEAD_SIZE = sizeof(SSubmitReq); int code = 0; bool isRawPayload = IS_RAW_PAYLOAD(payloadType); SHashObj* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false); diff --git a/source/libs/parser/src/insertParser.c b/source/libs/parser/src/insertParser.c index 08fe0852b1..e138e583f5 100644 --- a/source/libs/parser/src/insertParser.c +++ b/source/libs/parser/src/insertParser.c @@ -121,7 +121,7 @@ static int32_t findCol(SToken* pColname, int32_t start, int32_t end, SSchema* pS } static void buildMsgHeader(SVgDataBlocks* blocks) { - SSubmitMsg* submit = (SSubmitMsg*)blocks->pData; + SSubmitReq* submit = (SSubmitReq*)blocks->pData; submit->header.vgId = htonl(blocks->vg.vgId); submit->header.contLen = htonl(blocks->size); submit->length = submit->header.contLen; diff --git a/source/libs/parser/test/insertParserTest.cpp b/source/libs/parser/test/insertParserTest.cpp index 86e8b1d7aa..b6992e5157 100644 --- a/source/libs/parser/test/insertParserTest.cpp +++ b/source/libs/parser/test/insertParserTest.cpp @@ -70,7 +70,7 @@ protected: for (size_t i = 0; i < num; ++i) { SVgDataBlocks* vg = (SVgDataBlocks*)taosArrayGetP(res_->pDataBlocks, i); cout << "vgId:" << vg->vg.vgId << ", numOfTables:" << vg->numOfTables << ", dataSize:" << vg->size << endl; - SSubmitMsg* submit = (SSubmitMsg*)vg->pData; + SSubmitReq* submit = (SSubmitReq*)vg->pData; cout << "length:" << ntohl(submit->length) << ", numOfBlocks:" << ntohl(submit->numOfBlocks) << endl; int32_t numOfBlocks = ntohl(submit->numOfBlocks); SSubmitBlk* blk = (SSubmitBlk*)(submit + 1); @@ -93,7 +93,7 @@ protected: SVgDataBlocks* vg = (SVgDataBlocks*)taosArrayGetP(res_->pDataBlocks, i); ASSERT_EQ(vg->numOfTables, numOfTables); ASSERT_GE(vg->size, 0); - SSubmitMsg* submit = (SSubmitMsg*)vg->pData; + SSubmitReq* submit = (SSubmitReq*)vg->pData; ASSERT_GE(ntohl(submit->length), 0); ASSERT_GE(ntohl(submit->numOfBlocks), 0); int32_t numOfBlocks = ntohl(submit->numOfBlocks); diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 1b6d871274..c14ae40b59 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -823,7 +823,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, rspCode)); } - SShellSubmitRsp *rsp = (SShellSubmitRsp *)msg; + SSubmitRsp *rsp = (SSubmitRsp *)msg; if (rsp) { pJob->resNumOfRows += rsp->affectedRows; } diff --git a/source/libs/scheduler/test/schedulerTests.cpp b/source/libs/scheduler/test/schedulerTests.cpp index 2c75683ecb..6bfff0197c 100644 --- a/source/libs/scheduler/test/schedulerTests.cpp +++ b/source/libs/scheduler/test/schedulerTests.cpp @@ -273,7 +273,7 @@ void *schtSendRsp(void *param) { while (pIter) { SSchTask *task = *(SSchTask **)pIter; - SShellSubmitRsp rsp = {0}; + SSubmitRsp rsp = {0}; rsp.affectedRows = 10; schHandleResponseMsg(job, task, TDMT_VND_SUBMIT_RSP, (char *)&rsp, sizeof(rsp), 0);