diff --git a/include/common/taosmsg.h b/include/common/taosmsg.h index eb2adda394..39107e0b4f 100644 --- a/include/common/taosmsg.h +++ b/include/common/taosmsg.h @@ -785,12 +785,8 @@ typedef struct { } SAuthVnodeMsg; typedef struct { - char name[TSDB_TABLE_FNAME_LEN]; -} SStbInfoMsg; - -typedef struct { - SMsgHead msgHead; - char tableFname[TSDB_TABLE_FNAME_LEN]; + int32_t vgId; + char tableFname[TSDB_TABLE_FNAME_LEN]; } STableInfoMsg; typedef struct { @@ -801,10 +797,6 @@ typedef struct { char tableNames[]; } SMultiTableInfoMsg; -typedef struct SSTableVgroupMsg { - int32_t numOfTables; -} SSTableVgroupMsg, SSTableVgroupRspMsg; - typedef struct SVgroupInfo { int32_t vgId; uint32_t hashBegin; @@ -814,12 +806,6 @@ typedef struct SVgroupInfo { SEpAddrMsg epAddr[TSDB_MAX_REPLICA]; } SVgroupInfo; -typedef struct SVgroupListRspMsg { - int32_t vgroupNum; - int32_t vgroupVersion; - SVgroupInfo vgroupInfo[]; -} SVgroupListRspMsg; - typedef struct { int32_t vgId; int8_t numOfEps; @@ -841,8 +827,8 @@ typedef struct { int8_t update; int32_t sversion; int32_t tversion; - uint64_t tuid; uint64_t suid; + uint64_t tuid; int32_t vgId; SSchema pSchema[]; } STableMetaMsg; diff --git a/include/libs/query/query.h b/include/libs/query/query.h index 8720fd085c..fc0ea1729b 100644 --- a/include/libs/query/query.h +++ b/include/libs/query/query.h @@ -23,8 +23,6 @@ extern "C" { #include "tarray.h" #include "thash.h" -typedef SVgroupListRspMsg SVgroupListInfo; - typedef struct STableComInfo { uint8_t numOfTags; // the number of tags in schema uint8_t precision; // the number of precision diff --git a/source/dnode/mgmt/impl/src/dndTransport.c b/source/dnode/mgmt/impl/src/dndTransport.c index 6dc46cefcd..17cf39b019 100644 --- a/source/dnode/mgmt/impl/src/dndTransport.c +++ b/source/dnode/mgmt/impl/src/dndTransport.c @@ -38,7 +38,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) { pMgmt->msgFp[TSDB_MSG_TYPE_DROP_TABLE] = dndProcessVnodeWriteMsg; pMgmt->msgFp[TSDB_MSG_TYPE_ALTER_TABLE] = dndProcessVnodeWriteMsg; pMgmt->msgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = dndProcessVnodeWriteMsg; - pMgmt->msgFp[TSDB_MSG_TYPE_TABLE_META] = dndProcessVnodeQueryMsg; + pMgmt->msgFp[TSDB_MSG_TYPE_TABLE_META] = dndProcessMnodeReadMsg; pMgmt->msgFp[TSDB_MSG_TYPE_TABLES_META] = dndProcessVnodeQueryMsg; pMgmt->msgFp[TSDB_MSG_TYPE_MQ_QUERY] = dndProcessVnodeQueryMsg; pMgmt->msgFp[TSDB_MSG_TYPE_MQ_CONSUME] = dndProcessVnodeQueryMsg; diff --git a/source/dnode/mgmt/impl/test/stb/stb.cpp b/source/dnode/mgmt/impl/test/stb/stb.cpp index 40af7fbb52..a6addbe5a9 100644 --- a/source/dnode/mgmt/impl/test/stb/stb.cpp +++ b/source/dnode/mgmt/impl/test/stb/stb.cpp @@ -179,7 +179,7 @@ SServer* DndTestStb::pServer; SClient* DndTestStb::pClient; int32_t DndTestStb::connId; -TEST_F(DndTestStb, 01_Create_Alter_Drop_Stb) { +TEST_F(DndTestStb, 01_Create_Show_Meta_Drop_Restart_Stb) { { SCreateDbMsg* pReq = (SCreateDbMsg*)rpcMallocCont(sizeof(SCreateDbMsg)); strcpy(pReq->db, "1.d1"); @@ -215,8 +215,8 @@ TEST_F(DndTestStb, 01_Create_Alter_Drop_Stb) { } { - int32_t tags = 2; - int32_t cols = 3; + int32_t cols = 2; + int32_t tags = 3; int32_t size = (tags + cols) * sizeof(SSchema) + sizeof(SCreateStbMsg); SCreateStbMsg* pReq = (SCreateStbMsg*)rpcMallocCont(size); @@ -266,7 +266,7 @@ TEST_F(DndTestStb, 01_Create_Alter_Drop_Stb) { SRpcMsg rpcMsg = {0}; rpcMsg.pCont = pReq; - rpcMsg.contLen = sizeof(SCreateStbMsg); + rpcMsg.contLen = size; rpcMsg.msgType = TSDB_MSG_TYPE_CREATE_STB; sendMsg(pClient, &rpcMsg); @@ -274,66 +274,70 @@ TEST_F(DndTestStb, 01_Create_Alter_Drop_Stb) { ASSERT_NE(pMsg, nullptr); ASSERT_EQ(pMsg->code, 0); } - taosMsleep(10000); - SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_DB, "show stables", 4, NULL); + SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_STB, "show stables", 4, "1.d1"); CheckSchema(0, TSDB_DATA_TYPE_BINARY, TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE, "name"); CheckSchema(1, TSDB_DATA_TYPE_TIMESTAMP, 8, "create time"); - CheckSchema(2, TSDB_DATA_TYPE_INT, 2, "columns"); - CheckSchema(3, TSDB_DATA_TYPE_INT, 2, "tags"); + CheckSchema(2, TSDB_DATA_TYPE_INT, 4, "columns"); + CheckSchema(3, TSDB_DATA_TYPE_INT, 4, "tags"); SendThenCheckShowRetrieveMsg(1); - CheckBinary("stb", TSDB_DB_NAME_LEN - 1); + CheckBinary("stb", TSDB_TABLE_NAME_LEN); CheckTimestamp(); - CheckInt16(2); - CheckInt16(3); + CheckInt32(2); + CheckInt32(3); -#if 0 + // ----- meta ------ { - SAlterDbMsg* pReq = (SAlterDbMsg*)rpcMallocCont(sizeof(SAlterDbMsg)); - strcpy(pReq->db, "1.d1"); - pReq->totalBlocks = htonl(12); - pReq->daysToKeep0 = htonl(300); - pReq->daysToKeep1 = htonl(400); - pReq->daysToKeep2 = htonl(500); - pReq->fsyncPeriod = htonl(4000); - pReq->walLevel = 2; - pReq->quorum = 2; - pReq->cacheLastRow = 1; + STableInfoMsg* pReq = (STableInfoMsg*)rpcMallocCont(sizeof(STableInfoMsg)); + strcpy(pReq->tableFname, "1.d1.stb"); SRpcMsg rpcMsg = {0}; rpcMsg.pCont = pReq; - rpcMsg.contLen = sizeof(SAlterDbMsg); - rpcMsg.msgType = TSDB_MSG_TYPE_ALTER_DB; + rpcMsg.contLen = sizeof(STableInfoMsg); + rpcMsg.msgType = TSDB_MSG_TYPE_TABLE_META; sendMsg(pClient, &rpcMsg); SRpcMsg* pMsg = pClient->pRsp; ASSERT_NE(pMsg, nullptr); ASSERT_EQ(pMsg->code, 0); + + STableMetaMsg* pRsp = (STableMetaMsg*)pMsg->pCont; + pRsp->numOfTags = htonl(pRsp->numOfTags); + pRsp->numOfColumns = htonl(pRsp->numOfColumns); + pRsp->sversion = htonl(pRsp->sversion); + pRsp->tversion = htonl(pRsp->tversion); + pRsp->suid = htobe64(pRsp->suid); + pRsp->tuid = htobe64(pRsp->tuid); + pRsp->vgId = htobe64(pRsp->vgId); + for (int32_t i = 0; i < pRsp->numOfTags + pRsp->numOfColumns; ++i) { + SSchema* pSchema = &pRsp->pSchema[i]; + pSchema->colId = htonl(pSchema->colId); + pSchema->bytes = htonl(pSchema->bytes); + } + + EXPECT_STREQ(pRsp->tbFname, ""); + EXPECT_STREQ(pRsp->stbFname, "1.d1.stb"); + EXPECT_EQ(pRsp->numOfColumns, 2); + EXPECT_EQ(pRsp->numOfTags, 3); + EXPECT_EQ(pRsp->precision, TSDB_TIME_PRECISION_MILLI); + EXPECT_EQ(pRsp->tableType, TSDB_SUPER_TABLE); + EXPECT_EQ(pRsp->update, 0); + EXPECT_EQ(pRsp->sversion, 1); + EXPECT_EQ(pRsp->tversion, 0); + EXPECT_GT(pRsp->suid, 0); + EXPECT_EQ(pRsp->tuid, 0); + EXPECT_EQ(pRsp->vgId, 0); + + { + SSchema* pSchema = &pRsp->pSchema[0]; + EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_TIMESTAMP); + EXPECT_EQ(pSchema->colId, 0); + EXPECT_EQ(pSchema->bytes, 8); + EXPECT_STREQ(pSchema->name, "ts"); + } } - SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_DB, "show databases", 17, NULL); - SendThenCheckShowRetrieveMsg(1); - CheckBinary("d1", TSDB_DB_NAME_LEN - 1); - CheckTimestamp(); - CheckInt16(2); // vgroups - CheckInt16(1); // replica - CheckInt16(2); // quorum - CheckInt16(10); // days - CheckBinary("300,400,500", 24); // days - CheckInt32(16); // cache - CheckInt32(12); // blocks - CheckInt32(100); // minrows - CheckInt32(4096); // maxrows - CheckInt8(2); // wallevel - CheckInt32(4000); // fsync - CheckInt8(2); // comp - CheckInt8(1); // cachelast - CheckBinary("ms", 3); // precision - CheckInt8(0); // update - -#endif - // restart stopServer(pServer); pServer = NULL; @@ -346,19 +350,13 @@ TEST_F(DndTestStb, 01_Create_Alter_Drop_Stb) { uInfo("all server is running"); - SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_DB, "show stables", 4, NULL); - CheckSchema(0, TSDB_DATA_TYPE_BINARY, TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE, "name"); - CheckSchema(1, TSDB_DATA_TYPE_TIMESTAMP, 8, "create time"); - CheckSchema(2, TSDB_DATA_TYPE_INT, 2, "columns"); - CheckSchema(3, TSDB_DATA_TYPE_INT, 2, "tags"); - + SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_STB, "show stables", 4, "1.d1"); SendThenCheckShowRetrieveMsg(1); - CheckBinary("stb", TSDB_DB_NAME_LEN - 1); + CheckBinary("stb", TSDB_TABLE_NAME_LEN); CheckTimestamp(); - CheckInt16(2); - CheckInt16(3); + CheckInt32(2); + CheckInt32(3); -#if 0 { SDropStbMsg* pReq = (SDropStbMsg*)rpcMallocCont(sizeof(SDropStbMsg)); strcpy(pReq->name, "1.d1.stb"); @@ -374,106 +372,6 @@ TEST_F(DndTestStb, 01_Create_Alter_Drop_Stb) { ASSERT_EQ(pMsg->code, 0); } - SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_DB, "show stables", 4, NULL); + SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_STB, "show stables", 4, "1.d1"); SendThenCheckShowRetrieveMsg(0); -#endif } - -#if 0 -TEST_F(DndTestStb, 03_Create_Use_Restart_Use_Db) { - { - SCreateDbMsg* pReq = (SCreateDbMsg*)rpcMallocCont(sizeof(SCreateDbMsg)); - strcpy(pReq->db, "1.d2"); - pReq->numOfVgroups = htonl(2); - pReq->cacheBlockSize = htonl(16); - pReq->totalBlocks = htonl(10); - pReq->daysPerFile = htonl(10); - pReq->daysToKeep0 = htonl(3650); - pReq->daysToKeep1 = htonl(3650); - pReq->daysToKeep2 = htonl(3650); - pReq->minRowsPerFileBlock = htonl(100); - pReq->maxRowsPerFileBlock = htonl(4096); - pReq->commitTime = htonl(3600); - pReq->fsyncPeriod = htonl(3000); - pReq->walLevel = 1; - pReq->precision = 0; - pReq->compression = 2; - pReq->replications = 1; - pReq->quorum = 1; - pReq->update = 0; - pReq->cacheLastRow = 0; - pReq->ignoreExist = 1; - - SRpcMsg rpcMsg = {0}; - rpcMsg.pCont = pReq; - rpcMsg.contLen = sizeof(SCreateDbMsg); - rpcMsg.msgType = TSDB_MSG_TYPE_CREATE_DB; - - sendMsg(pClient, &rpcMsg); - SRpcMsg* pMsg = pClient->pRsp; - ASSERT_NE(pMsg, nullptr); - ASSERT_EQ(pMsg->code, 0); - } - - SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_DB, "show databases", 17, NULL); - SendThenCheckShowRetrieveMsg(1); - CheckBinary("d2", TSDB_DB_NAME_LEN - 1); - - { - SUseDbMsg* pReq = (SUseDbMsg*)rpcMallocCont(sizeof(SUseDbMsg)); - strcpy(pReq->db, "1.d2"); - pReq->vgVersion = htonl(-1); - - SRpcMsg rpcMsg = {0}; - rpcMsg.pCont = pReq; - rpcMsg.contLen = sizeof(SUseDbMsg); - rpcMsg.msgType = TSDB_MSG_TYPE_USE_DB; - - sendMsg(pClient, &rpcMsg); - SRpcMsg* pMsg = pClient->pRsp; - ASSERT_NE(pMsg, nullptr); - ASSERT_EQ(pMsg->code, 0); - - SUseDbRsp* pRsp = (SUseDbRsp*)pMsg->pCont; - EXPECT_STREQ(pRsp->db, "1.d2"); - pRsp->vgVersion = htonl(pRsp->vgVersion); - pRsp->vgNum = htonl(pRsp->vgNum); - pRsp->hashMethod = pRsp->hashMethod; - EXPECT_EQ(pRsp->vgVersion, 1); - EXPECT_EQ(pRsp->vgNum, 2); - EXPECT_EQ(pRsp->hashMethod, 1); - - { - SVgroupInfo* pInfo = &pRsp->vgroupInfo[0]; - pInfo->vgId = htonl(pInfo->vgId); - pInfo->hashBegin = htonl(pInfo->hashBegin); - pInfo->hashEnd = htonl(pInfo->hashEnd); - EXPECT_GT(pInfo->vgId, 0); - EXPECT_EQ(pInfo->hashBegin, 0); - EXPECT_EQ(pInfo->hashEnd, UINT32_MAX / 2 - 1); - EXPECT_EQ(pInfo->inUse, 0); - EXPECT_EQ(pInfo->numOfEps, 1); - SEpAddrMsg* pAddr = &pInfo->epAddr[0]; - pAddr->port = htons(pAddr->port); - EXPECT_EQ(pAddr->port, 9101); - EXPECT_STREQ(pAddr->fqdn, "localhost"); - } - - { - SVgroupInfo* pInfo = &pRsp->vgroupInfo[1]; - pInfo->vgId = htonl(pInfo->vgId); - pInfo->hashBegin = htonl(pInfo->hashBegin); - pInfo->hashEnd = htonl(pInfo->hashEnd); - EXPECT_GT(pInfo->vgId, 0); - EXPECT_EQ(pInfo->hashBegin, UINT32_MAX / 2); - EXPECT_EQ(pInfo->hashEnd, UINT32_MAX); - EXPECT_EQ(pInfo->inUse, 0); - EXPECT_EQ(pInfo->numOfEps, 1); - SEpAddrMsg* pAddr = &pInfo->epAddr[0]; - pAddr->port = htons(pAddr->port); - EXPECT_EQ(pAddr->port, 9101); - EXPECT_STREQ(pAddr->fqdn, "localhost"); - } - } -} -#endif \ No newline at end of file diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 0f10e853cd..0ecd42aada 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -80,7 +80,7 @@ static SSdbRaw *mndStbActionEncode(SStbObj *pStb) { SDB_SET_INT64(pRaw, dataPos, pStb->updateTime) SDB_SET_INT64(pRaw, dataPos, pStb->uid) SDB_SET_INT64(pRaw, dataPos, pStb->dbUid) - SDB_SET_INT64(pRaw, dataPos, pStb->version) + SDB_SET_INT32(pRaw, dataPos, pStb->version) SDB_SET_INT32(pRaw, dataPos, pStb->numOfColumns) SDB_SET_INT32(pRaw, dataPos, pStb->numOfTags) @@ -487,31 +487,37 @@ static int32_t mndProcessDropStbMsg(SMnodeMsg *pMsg) { static int32_t mndProcessDropStbInRsp(SMnodeMsg *pMsg) { return 0; } static int32_t mndProcessStbMetaMsg(SMnodeMsg *pMsg) { - SMnode *pMnode = pMsg->pMnode; - SStbInfoMsg *pInfo = pMsg->rpcMsg.pCont; + SMnode *pMnode = pMsg->pMnode; + STableInfoMsg *pInfo = pMsg->rpcMsg.pCont; - mDebug("stb:%s, start to retrieve meta", pInfo->name); + mDebug("stb:%s, start to retrieve meta", pInfo->tableFname); - SDbObj *pDb = mndAcquireDbByStb(pMnode, pInfo->name); + SDbObj *pDb = mndAcquireDbByStb(pMnode, pInfo->tableFname); if (pDb == NULL) { terrno = TSDB_CODE_MND_DB_NOT_SELECTED; - mError("stb:%s, failed to retrieve meta since %s", pInfo->name, terrstr()); + mError("stb:%s, failed to retrieve meta since %s", pInfo->tableFname, terrstr()); return -1; } - SStbObj *pStb = mndAcquireStb(pMnode, pInfo->name); + SStbObj *pStb = mndAcquireStb(pMnode, pInfo->tableFname); if (pStb == NULL) { mndReleaseDb(pMnode, pDb); terrno = TSDB_CODE_MND_INVALID_STB; - mError("stb:%s, failed to get meta since %s", pInfo->name, terrstr()); + mError("stb:%s, failed to get meta since %s", pInfo->tableFname, terrstr()); return -1; } - int32_t contLen = sizeof(STableMetaMsg) + (pStb->numOfColumns + pStb->numOfTags) * sizeof(SSchema); + taosRLockLatch(&pStb->lock); + int32_t totalCols = pStb->numOfColumns + pStb->numOfTags; + int32_t contLen = sizeof(STableMetaMsg) + totalCols * sizeof(SSchema); + STableMetaMsg *pMeta = rpcMallocCont(contLen); if (pMeta == NULL) { + taosRUnLockLatch(&pStb->lock); + mndReleaseDb(pMnode, pDb); + mndReleaseStb(pMnode, pStb); terrno = TSDB_CODE_OUT_OF_MEMORY; - mError("stb:%s, failed to get meta since %s", pInfo->name, terrstr()); + mError("stb:%s, failed to get meta since %s", pInfo->tableFname, terrstr()); return -1; } @@ -524,7 +530,7 @@ static int32_t mndProcessStbMetaMsg(SMnodeMsg *pMsg) { pMeta->sversion = htonl(pStb->version); pMeta->suid = htonl(pStb->uid); - for (int32_t i = 0; i < pStb->numOfColumns; ++i) { + for (int32_t i = 0; i < totalCols; ++i) { SSchema *pSchema = &pMeta->pSchema[i]; SSchema *pSrcSchema = &pStb->pSchema[i]; memcpy(pSchema->name, pSrcSchema->name, TSDB_COL_NAME_LEN); @@ -532,11 +538,14 @@ static int32_t mndProcessStbMetaMsg(SMnodeMsg *pMsg) { pSchema->colId = htonl(pSrcSchema->colId); pSchema->bytes = htonl(pSrcSchema->bytes); } + taosRUnLockLatch(&pStb->lock); + mndReleaseDb(pMnode, pDb); + mndReleaseStb(pMnode, pStb); pMsg->pCont = pMeta; pMsg->contLen = contLen; - mDebug("stb:%s, meta is retrieved, cols:%d tags:%d", pInfo->name, pStb->numOfColumns, pStb->numOfTags); + mDebug("stb:%s, meta is retrieved, cols:%d tags:%d", pInfo->tableFname, pStb->numOfColumns, pStb->numOfTags); return 0; } @@ -553,7 +562,7 @@ static int32_t mndGetNumOfStbs(SMnode *pMnode, char *dbName, int32_t *pNumOfStbs void *pIter = NULL; while (1) { SStbObj *pStb = NULL; - pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pStb); + pIter = sdbFetch(pSdb, SDB_STB, pIter, (void **)&pStb); if (pIter == NULL) break; if (strcmp(pStb->db, dbName) == 0) { @@ -610,6 +619,7 @@ static int32_t mndGetStbMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pM pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1]; } + pShow->numOfRows = sdbGetSize(pSdb, SDB_STB); pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; strcpy(pMeta->tbFname, mndShowStr(pShow->type)); @@ -653,8 +663,8 @@ static int32_t mndRetrieveStb(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int3 cols = 0; - char stbName[TSDB_TABLE_FNAME_LEN] = {0}; - memcpy(stbName, pStb->name + prefixLen, TSDB_TABLE_FNAME_LEN - prefixLen); + char stbName[TSDB_TABLE_NAME_LEN] = {0}; + tstrncpy(stbName, pStb->name + prefixLen, TSDB_TABLE_NAME_LEN); pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; STR_TO_VARSTR(pWrite, stbName); cols++; @@ -664,11 +674,11 @@ static int32_t mndRetrieveStb(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int3 cols++; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(int16_t *)pWrite = pStb->numOfColumns; + *(int32_t *)pWrite = pStb->numOfColumns; cols++; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(int16_t *)pWrite = pStb->numOfTags; + *(int32_t *)pWrite = pStb->numOfTags; cols++; numOfRows++; diff --git a/source/libs/query/src/querymsg.c b/source/libs/query/src/querymsg.c index 9d99b568a5..486c9718a2 100644 --- a/source/libs/query/src/querymsg.c +++ b/source/libs/query/src/querymsg.c @@ -39,7 +39,7 @@ int32_t queryBuildTableMetaReqMsg(void* input, char **msg, int32_t msgSize, int3 STableInfoMsg *bMsg = (STableInfoMsg *)*msg; - bMsg->msgHead.vgId = bInput->vgId; + bMsg->vgId = bInput->vgId; strncpy(bMsg->tableFname, bInput->tableFullName, sizeof(bMsg->tableFname)); bMsg->tableFname[sizeof(bMsg->tableFname) - 1] = 0;