diff --git a/include/common/tmsg.h b/include/common/tmsg.h index b9b1d3fe76..ba8bb7ca2f 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -219,6 +219,26 @@ typedef struct { char data[]; } SMDCreateTableMsg; +// typedef struct { +// int32_t len; // one create table message +// char tableName[TSDB_TABLE_FNAME_LEN]; +// int16_t numOfColumns; +// int16_t sqlLen; // the length of SQL, it starts after schema , sql is a null-terminated string +// int8_t igExists; +// int8_t rspMeta; +// int8_t reserved[16]; +// char schema[]; +//} SCreateTableMsg; + +typedef struct { + char tableName[TSDB_TABLE_FNAME_LEN]; + int16_t numOfColumns; + int16_t numOfTags; + int8_t igExists; + int8_t rspMeta; + char schema[]; +} SCreateCTableMsg; + typedef struct { char name[TSDB_TABLE_FNAME_LEN]; int8_t igExists; @@ -299,6 +319,18 @@ typedef struct SEpSet { char fqdn[TSDB_MAX_REPLICA][TSDB_FQDN_LEN]; } SEpSet; +static FORCE_INLINE int taosEncodeSEpSet(void** buf, const SEpSet* pEp) { + if(buf == NULL) return sizeof(SEpSet); + memcpy(buf, pEp, sizeof(SEpSet)); + //TODO: endian conversion + return sizeof(SEpSet); +} + +static FORCE_INLINE void* taosDecodeSEpSet(void* buf, SEpSet* pEpSet) { + memcpy(pEpSet, buf, sizeof(SEpSet)); + return buf; +} + typedef struct { int32_t acctId; int64_t clusterId; @@ -1056,26 +1088,94 @@ typedef struct STaskDropRsp { } STaskDropRsp; typedef struct { - int8_t igExists; - char* name; - char* phyPlan; + int8_t igExists; + char* name; + char* physicalPlan; + char* logicalPlan; } SCMCreateTopicReq; static FORCE_INLINE int tSerializeSCMCreateTopicReq(void** buf, const SCMCreateTopicReq* pReq) { int tlen = 0; tlen += taosEncodeString(buf, pReq->name); tlen += taosEncodeFixedI8(buf, pReq->igExists); - tlen += taosEncodeString(buf, pReq->phyPlan); + tlen += taosEncodeString(buf, pReq->physicalPlan); + tlen += taosEncodeString(buf, pReq->logicalPlan); return tlen; } static FORCE_INLINE void* tDeserializeSCMCreateTopicReq(void* buf, SCMCreateTopicReq* pReq) { buf = taosDecodeFixedI8(buf, &(pReq->igExists)); buf = taosDecodeString(buf, &(pReq->name)); - buf = taosDecodeString(buf, &(pReq->phyPlan)); + buf = taosDecodeString(buf, &(pReq->physicalPlan)); + buf = taosDecodeString(buf, &(pReq->logicalPlan)); return buf; } +typedef struct { + int64_t topicId; +} SCMCreateTopicRsp; + +static FORCE_INLINE int tSerializeSCMCreateTopicRsp(void** buf, const SCMCreateTopicRsp* pRsp) { + int tlen = 0; + tlen += taosEncodeFixedI64(buf, pRsp->topicId); + return tlen; +} + +static FORCE_INLINE void* tDeserializeSCMCreateTopicRsp(void* buf, SCMCreateTopicRsp* pRsp) { + buf = taosDecodeFixedI64(buf, &pRsp->topicId); + return buf; +} + +typedef struct { + char* topicName; + char* consumerGroup; + int64_t consumerId; +} SCMSubscribeReq; + +static FORCE_INLINE int tSerializeSCMSubscribeReq(void** buf, const SCMSubscribeReq* pReq) { + int tlen = 0; + tlen += taosEncodeString(buf, pReq->topicName); + tlen += taosEncodeString(buf, pReq->consumerGroup); + tlen += taosEncodeFixedI64(buf, pReq->consumerId); + return tlen; +} + +static FORCE_INLINE void* tDeserializeSCMSubscribeReq(void* buf, SCMSubscribeReq* pReq) { + buf = taosDecodeString(buf, &pReq->topicName); + buf = taosDecodeString(buf, &pReq->consumerGroup); + buf = taosDecodeFixedI64(buf, &pReq->consumerId); + return buf; +} + +typedef struct { + int32_t vgId; + SEpSet pEpSet; +} SCMSubscribeRsp; + +static FORCE_INLINE int tSerializeSCMSubscribeRsp(void** buf, const SCMSubscribeRsp* pRsp) { + int tlen = 0; + tlen += taosEncodeFixedI32(buf, pRsp->vgId); + tlen += taosEncodeSEpSet(buf, &pRsp->pEpSet); + return tlen; +} + +static FORCE_INLINE void* tDeserializeSCMSubscribeRsp(void* buf, SCMSubscribeRsp* pRsp) { + buf = taosDecodeFixedI32(buf, &pRsp->vgId); + buf = taosDecodeSEpSet(buf, &pRsp->pEpSet); + return buf; +} + +typedef struct { + int64_t topicId; + int64_t consumerId; + int64_t consumerGroupId; + int64_t offset; +} SMVSubscribeReq; + +typedef struct { + int64_t newOffset; +} SMVSubscribeRsp; + typedef struct { char name[TSDB_TOPIC_FNAME_LEN]; int8_t igExists; diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 2ed817fca1..76ee31d9c9 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -116,9 +116,10 @@ enum { TD_DEF_MSG_TYPE(TDMT_MND_TRANS, "mnode-trans", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_GRANT, "mnode-grant", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_AUTH, "mnode-auth", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_CREATE_TOPIC, "mnode-create-topic", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_CREATE_TOPIC, "mnode-create-topic", SCMCreateTopicReq, SCMCreateTopicRsp) TD_DEF_MSG_TYPE(TDMT_MND_ALTER_TOPIC, "mnode-alter-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_DROP_TOPIC, "mnode-drop-topic", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_SUBSCRIBE, "mnode-subscribe", SCMSubscribeReq, SCMSubscribeRsp) // Requests handled by VNODE TD_NEW_MSG_SEG(TDMT_VND_MSG) @@ -149,6 +150,9 @@ enum { TD_DEF_MSG_TYPE(TDMT_VND_SHOW_TABLES, "vnode-show-tables", SVShowTablesReq, SVShowTablesRsp) TD_DEF_MSG_TYPE(TDMT_VND_SHOW_TABLES_FETCH, "vnode-show-tables-fetch", SVShowTablesFetchReq, SVShowTablesFetchRsp) + TD_DEF_MSG_TYPE(TDMT_VND_SUBSCRIBE, "vnode-subscribe", SMVSubscribeReq, SMVSubscribeRsp) + + // Requests handled by QNODE TD_NEW_MSG_SEG(TDMT_QND_MSG) @@ -158,4 +162,4 @@ enum { #if defined(TD_MSG_NUMBER_) TDMT_MAX #endif -}; \ No newline at end of file +}; diff --git a/include/dnode/mnode/sdb/sdb.h b/include/dnode/mnode/sdb/sdb.h index 474e526186..f94090d7de 100644 --- a/include/dnode/mnode/sdb/sdb.h +++ b/include/dnode/mnode/sdb/sdb.h @@ -161,12 +161,14 @@ typedef enum { SDB_USER = 5, SDB_AUTH = 6, SDB_ACCT = 7, - SDB_TOPIC = 8, - SDB_VGROUP = 9, - SDB_STB = 10, - SDB_DB = 11, - SDB_FUNC = 12, - SDB_MAX = 13 + SDB_CONSUMER = 8, + SDB_CGROUP = 9, + SDB_TOPIC = 10, + SDB_VGROUP = 11, + SDB_STB = 12, + SDB_DB = 13, + SDB_FUNC = 14, + SDB_MAX = 15 } ESdbType; typedef struct SSdb SSdb; diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index 7d7cc73174..8f4b9e1807 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -62,6 +62,18 @@ int32_t catalogGetHandle(const char *clusterId, struct SCatalog** catalogHandle) int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, int32_t* version); +/** + * Get a DB's all vgroup info. + * @param pCatalog (input, got with catalogGetHandle) + * @param pRpc (input, rpc object) + * @param pMgmtEps (input, mnode EPs) + * @param pDBName (input, full db name) + * @param forceUpdate (input, force update db vgroup info from mnode) + * @param pVgroupList (output, vgroup info list, element is SVgroupInfo, NEED to simply free the array by caller) + * @return error code + */ +int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, int32_t forceUpdate, SArray** pVgroupList); + int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo); /** diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index a50c618be1..4ea35f1d2c 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -75,6 +75,7 @@ typedef struct STableMeta { } STableMeta; typedef struct SDBVgroupInfo { + int32_t lock; int32_t vgVersion; int8_t hashMethod; SHashObj *vgInfo; //key:vgId, value:SVgroupInfo diff --git a/include/util/thash.h b/include/util/thash.h index d0247a0729..f38ab50893 100644 --- a/include/util/thash.h +++ b/include/util/thash.h @@ -144,6 +144,16 @@ void *taosHashGet(SHashObj *pHashObj, const void *key, size_t keyLen); */ void *taosHashGetClone(SHashObj *pHashObj, const void *key, size_t keyLen, void* destBuf); +/** + * Clone the result to interval allocated buffer + * @param pHashObj + * @param key + * @param keyLen + * @param destBuf + * @return + */ +void* taosHashGetCloneExt(SHashObj *pHashObj, const void *key, size_t keyLen, void (*fp)(void *), void** d, size_t *sz); + /** * remove item with the specified key * @param pHashObj @@ -200,6 +210,26 @@ void taosHashCancelIterate(SHashObj *pHashObj, void *p); */ int32_t taosHashGetKey(void *data, void** key, size_t* keyLen); +/** + * return the payload data with the specified key(reference number added) + * + * @param pHashObj + * @param key + * @param keyLen + * @return + */ +void* taosHashAcquire(SHashObj *pHashObj, const void *key, size_t keyLen); + +/** + * release the prevous acquired obj + * + * @param pHashObj + * @param data + * @return + */ +void taosHashRelease(SHashObj *pHashObj, void *p); + + #ifdef __cplusplus } #endif diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 6ecd9a59f7..3d295830c7 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -210,6 +210,9 @@ TAOS_RES *tmq_create_topic(TAOS* taos, const char* name, const char* sql, int sq terrno = TSDB_CODE_SUCCESS; CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return); + +//temporary disabled until planner ready +#if 0 CHECK_CODE_GOTO(parseSql(pRequest, &pQuery), _return); //TODO: check sql valid @@ -219,15 +222,24 @@ TAOS_RES *tmq_create_topic(TAOS* taos, const char* name, const char* sql, int sq if(dagStr == NULL) { //TODO } +#endif SCMCreateTopicReq req = { .name = (char*)name, .igExists = 0, - .phyPlan = dagStr, + /*.physicalPlan = dagStr,*/ + .physicalPlan = (char*)sql, + .logicalPlan = "", }; - void* buf = NULL; - int tlen = tSerializeSCMCreateTopicReq(&buf, &req); + int tlen = tSerializeSCMCreateTopicReq(NULL, &req); + void* buf = malloc(tlen); + if(buf == NULL) { + goto _return; + } + void* abuf = buf; + tSerializeSCMCreateTopicReq(&abuf, &req); + /*printf("formatted: %s\n", dagStr);*/ pRequest->body.requestMsg = (SDataBuf){ .pData = buf, .len = tlen }; @@ -239,8 +251,6 @@ TAOS_RES *tmq_create_topic(TAOS* taos, const char* name, const char* sql, int sq tsem_wait(&pRequest->body.rspSem); - destroySendMsgInfo(body); - _return: qDestroyQuery(pQuery); qDestroyQueryDag(pDag); diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index be001780ca..e88e7411bf 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -49,6 +49,7 @@ int main(int argc, char** argv) { TEST(testCase, driverInit_Test) { taos_init(); } +#if 0 TEST(testCase, connect_Test) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); assert(pConn != NULL); @@ -398,6 +399,7 @@ TEST(testCase, drop_stable_Test) { taos_free_result(pRes); taos_close(pConn); } +#endif //TEST(testCase, create_topic_Test) { // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); diff --git a/source/dnode/mnode/impl/inc/mndConsumer.h b/source/dnode/mnode/impl/inc/mndConsumer.h new file mode 100644 index 0000000000..60f186d7d2 --- /dev/null +++ b/source/dnode/mnode/impl/inc/mndConsumer.h @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_MND_CONSUMER_H_ +#define _TD_MND_CONSUMER_H_ + +#include "mndInt.h" + +#ifdef __cplusplus +extern "C" { +#endif + +int32_t mndInitConsumer(SMnode *pMnode); +void mndCleanupConsumer(SMnode *pMnode); + +SConsumerObj *mndAcquireConsumer(SMnode *pMnode, int32_t consumerId); +void mndReleaseConsumer(SMnode *pMnode, SConsumerObj *pConsumer); + +SCGroupObj *mndAcquireCGroup(SMnode *pMnode, char *consumerGroup); +void mndReleaseCGroup(SMnode *pMnode, SCGroupObj *pCGroup); + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_MND_CONSUMER_H_*/ diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 1a1306c3da..a874e67210 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -301,8 +301,34 @@ typedef struct { void* executor; int32_t sqlLen; char* sql; + char* logicalPlan; + char* physicalPlan; } STopicObj; +typedef struct { + char name[TSDB_TOPIC_FNAME_LEN]; + char db[TSDB_DB_FNAME_LEN]; + int64_t createTime; + int64_t updateTime; + uint64_t uid; + //uint64_t dbUid; + int32_t version; + SRWLatch lock; + +} SConsumerObj; + +typedef struct { + char name[TSDB_TOPIC_FNAME_LEN]; + char db[TSDB_DB_FNAME_LEN]; + int64_t createTime; + int64_t updateTime; + uint64_t uid; + //uint64_t dbUid; + int32_t version; + SRWLatch lock; + +} SCGroupObj; + typedef struct SMnodeMsg { char user[TSDB_USER_LEN]; char db[TSDB_DB_FNAME_LEN]; diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c new file mode 100644 index 0000000000..9e7cdbf09e --- /dev/null +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -0,0 +1,373 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "mndConsumer.h" +#include "mndDb.h" +#include "mndDnode.h" +#include "mndMnode.h" +#include "mndShow.h" +#include "mndStb.h" +#include "mndTopic.h" +#include "mndTrans.h" +#include "mndUser.h" +#include "mndVgroup.h" +#include "tname.h" + +#define MND_CONSUMER_VER_NUMBER 1 +#define MND_CONSUMER_RESERVE_SIZE 64 + +static SSdbRaw *mndConsumerActionEncode(SConsumerObj *pConsumer); +static SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw); +static int32_t mndConsumerActionInsert(SSdb *pSdb, SConsumerObj *pConsumer); +static int32_t mndConsumerActionDelete(SSdb *pSdb, SConsumerObj *pConsumer); +static int32_t mndConsumerActionUpdate(SSdb *pSdb, SConsumerObj *pConsumer, SConsumerObj *pNewConsumer); +static int32_t mndProcessCreateConsumerMsg(SMnodeMsg *pMsg); +static int32_t mndProcessDropConsumerMsg(SMnodeMsg *pMsg); +static int32_t mndProcessDropConsumerInRsp(SMnodeMsg *pMsg); +static int32_t mndProcessConsumerMetaMsg(SMnodeMsg *pMsg); +static int32_t mndGetConsumerMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta); +static int32_t mndRetrieveConsumer(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows); +static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter); + +static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg); +static int32_t mndProcessSubscribeRsp(SMnodeMsg *pMsg); +static int32_t mndProcessSubscribeInternalReq(SMnodeMsg *pMsg); +static int32_t mndProcessSubscribeInternalRsp(SMnodeMsg *pMsg); + +int32_t mndInitConsumer(SMnode *pMnode) { + SSdbTable table = {.sdbType = SDB_CONSUMER, + .keyType = SDB_KEY_BINARY, + .encodeFp = (SdbEncodeFp)mndConsumerActionEncode, + .decodeFp = (SdbDecodeFp)mndConsumerActionDecode, + .insertFp = (SdbInsertFp)mndConsumerActionInsert, + .updateFp = (SdbUpdateFp)mndConsumerActionUpdate, + .deleteFp = (SdbDeleteFp)mndConsumerActionDelete}; + + mndSetMsgHandle(pMnode, TDMT_MND_SUBSCRIBE, mndProcessSubscribeReq); + mndSetMsgHandle(pMnode, TDMT_MND_SUBSCRIBE_RSP, mndProcessSubscribeRsp); + mndSetMsgHandle(pMnode, TDMT_VND_SUBSCRIBE, mndProcessSubscribeInternalReq); + mndSetMsgHandle(pMnode, TDMT_VND_SUBSCRIBE_RSP, mndProcessSubscribeInternalRsp); + + return sdbSetTable(pMnode->pSdb, table); +} + +void mndCleanupConsumer(SMnode *pMnode) {} + +static SSdbRaw *mndConsumerActionEncode(SConsumerObj *pConsumer) { + int32_t size = sizeof(SConsumerObj) + MND_CONSUMER_RESERVE_SIZE; + SSdbRaw *pRaw = sdbAllocRaw(SDB_CONSUMER, MND_CONSUMER_VER_NUMBER, size); + if (pRaw == NULL) return NULL; + + int32_t dataPos = 0; + SDB_SET_BINARY(pRaw, dataPos, pConsumer->name, TSDB_TABLE_FNAME_LEN); + SDB_SET_BINARY(pRaw, dataPos, pConsumer->db, TSDB_DB_FNAME_LEN); + SDB_SET_INT64(pRaw, dataPos, pConsumer->createTime); + SDB_SET_INT64(pRaw, dataPos, pConsumer->updateTime); + SDB_SET_INT64(pRaw, dataPos, pConsumer->uid); + /*SDB_SET_INT64(pRaw, dataPos, pConsumer->dbUid);*/ + SDB_SET_INT32(pRaw, dataPos, pConsumer->version); + + SDB_SET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE); + SDB_SET_DATALEN(pRaw, dataPos); + + return pRaw; +} + +static SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) { + int8_t sver = 0; + if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL; + + if (sver != MND_CONSUMER_VER_NUMBER) { + terrno = TSDB_CODE_SDB_INVALID_DATA_VER; + mError("failed to decode consumer since %s", terrstr()); + return NULL; + } + + int32_t size = sizeof(SConsumerObj) + TSDB_MAX_COLUMNS * sizeof(SSchema); + SSdbRow *pRow = sdbAllocRow(size); + SConsumerObj *pConsumer = sdbGetRowObj(pRow); + if (pConsumer == NULL) return NULL; + + int32_t dataPos = 0; + SDB_GET_BINARY(pRaw, pRow, dataPos, pConsumer->name, TSDB_TABLE_FNAME_LEN); + SDB_GET_BINARY(pRaw, pRow, dataPos, pConsumer->db, TSDB_DB_FNAME_LEN); + SDB_GET_INT64(pRaw, pRow, dataPos, &pConsumer->createTime); + SDB_GET_INT64(pRaw, pRow, dataPos, &pConsumer->updateTime); + SDB_GET_INT64(pRaw, pRow, dataPos, &pConsumer->uid); + /*SDB_GET_INT64(pRaw, pRow, dataPos, &pConsumer->dbUid);*/ + SDB_GET_INT32(pRaw, pRow, dataPos, &pConsumer->version); + + SDB_GET_RESERVE(pRaw, pRow, dataPos, MND_CONSUMER_RESERVE_SIZE); + + return pRow; +} + +static int32_t mndConsumerActionInsert(SSdb *pSdb, SConsumerObj *pConsumer) { + mTrace("consumer:%s, perform insert action", pConsumer->name); + return 0; +} + +static int32_t mndConsumerActionDelete(SSdb *pSdb, SConsumerObj *pConsumer) { + mTrace("consumer:%s, perform delete action", pConsumer->name); + return 0; +} + +static int32_t mndConsumerActionUpdate(SSdb *pSdb, SConsumerObj *pOldConsumer, SConsumerObj *pNewConsumer) { + mTrace("consumer:%s, perform update action", pOldConsumer->name); + atomic_exchange_32(&pOldConsumer->updateTime, pNewConsumer->updateTime); + atomic_exchange_32(&pOldConsumer->version, pNewConsumer->version); + + taosWLockLatch(&pOldConsumer->lock); + + // TODO handle update + + taosWUnLockLatch(&pOldConsumer->lock); + return 0; +} + +SConsumerObj *mndAcquireConsumer(SMnode *pMnode, int32_t consumerId) { + SSdb *pSdb = pMnode->pSdb; + SConsumerObj *pConsumer = sdbAcquire(pSdb, SDB_CONSUMER, &consumerId); + if (pConsumer == NULL) { + /*terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;*/ + } + return pConsumer; +} + +void mndReleaseConsumer(SMnode *pMnode, SConsumerObj *pConsumer) { + SSdb *pSdb = pMnode->pSdb; + sdbRelease(pSdb, pConsumer); +} + +static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { + SMnode *pMnode = pMsg->pMnode; + char *msgStr = pMsg->rpcMsg.pCont; + SCMSubscribeReq *pSubscribe; + tDeserializeSCMSubscribeReq(msgStr, pSubscribe); + // add consumerGroupId -> list to sdb + // add consumerId -> list to sdb + // add consumer -> list to sdb + return 0; +} + +static int32_t mndProcessSubscribeRsp(SMnodeMsg *pMsg) { return 0; } + +static int32_t mndProcessSubscribeInternalReq(SMnodeMsg *pMsg) { return 0; } + +static int32_t mndProcessSubscribeInternalRsp(SMnodeMsg *pMsg) { return 0; } + +static int32_t mndProcessDropConsumerInRsp(SMnodeMsg *pMsg) { + mndTransProcessRsp(pMsg); + return 0; +} + +static int32_t mndProcessConsumerMetaMsg(SMnodeMsg *pMsg) { + SMnode *pMnode = pMsg->pMnode; + STableInfoMsg *pInfo = pMsg->rpcMsg.pCont; + + mDebug("consumer:%s, start to retrieve meta", pInfo->tableFname); + +#if 0 + SDbObj *pDb = mndAcquireDbByConsumer(pMnode, pInfo->tableFname); + if (pDb == NULL) { + terrno = TSDB_CODE_MND_DB_NOT_SELECTED; + mError("consumer:%s, failed to retrieve meta since %s", pInfo->tableFname, terrstr()); + return -1; + } + + SConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pInfo->tableFname); + if (pConsumer == NULL) { + mndReleaseDb(pMnode, pDb); + terrno = TSDB_CODE_MND_INVALID_CONSUMER; + mError("consumer:%s, failed to get meta since %s", pInfo->tableFname, terrstr()); + return -1; + } + + taosRLockLatch(&pConsumer->lock); + int32_t totalCols = pConsumer->numOfColumns + pConsumer->numOfTags; + int32_t contLen = sizeof(STableMetaMsg) + totalCols * sizeof(SSchema); + + STableMetaMsg *pMeta = rpcMallocCont(contLen); + if (pMeta == NULL) { + taosRUnLockLatch(&pConsumer->lock); + mndReleaseDb(pMnode, pDb); + mndReleaseConsumer(pMnode, pConsumer); + terrno = TSDB_CODE_OUT_OF_MEMORY; + mError("consumer:%s, failed to get meta since %s", pInfo->tableFname, terrstr()); + return -1; + } + + memcpy(pMeta->consumerFname, pConsumer->name, TSDB_TABLE_FNAME_LEN); + pMeta->numOfTags = htonl(pConsumer->numOfTags); + pMeta->numOfColumns = htonl(pConsumer->numOfColumns); + pMeta->precision = pDb->cfg.precision; + pMeta->tableType = TSDB_SUPER_TABLE; + pMeta->update = pDb->cfg.update; + pMeta->sversion = htonl(pConsumer->version); + pMeta->tuid = htonl(pConsumer->uid); + + for (int32_t i = 0; i < totalCols; ++i) { + SSchema *pSchema = &pMeta->pSchema[i]; + SSchema *pSrcSchema = &pConsumer->pSchema[i]; + memcpy(pSchema->name, pSrcSchema->name, TSDB_COL_NAME_LEN); + pSchema->type = pSrcSchema->type; + pSchema->colId = htonl(pSrcSchema->colId); + pSchema->bytes = htonl(pSrcSchema->bytes); + } + taosRUnLockLatch(&pConsumer->lock); + mndReleaseDb(pMnode, pDb); + mndReleaseConsumer(pMnode, pConsumer); + + pMsg->pCont = pMeta; + pMsg->contLen = contLen; + + mDebug("consumer:%s, meta is retrieved, cols:%d tags:%d", pInfo->tableFname, pConsumer->numOfColumns, pConsumer->numOfTags); +#endif + return 0; +} + +static int32_t mndGetNumOfConsumers(SMnode *pMnode, char *dbName, int32_t *pNumOfConsumers) { + SSdb *pSdb = pMnode->pSdb; + + SDbObj *pDb = mndAcquireDb(pMnode, dbName); + if (pDb == NULL) { + terrno = TSDB_CODE_MND_DB_NOT_SELECTED; + return -1; + } + + int32_t numOfConsumers = 0; + void *pIter = NULL; + while (1) { + SConsumerObj *pConsumer = NULL; + pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer); + if (pIter == NULL) break; + + if (strcmp(pConsumer->db, dbName) == 0) { + numOfConsumers++; + } + + sdbRelease(pSdb, pConsumer); + } + + *pNumOfConsumers = numOfConsumers; + return 0; +} + +static int32_t mndGetConsumerMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta) { + SMnode *pMnode = pMsg->pMnode; + SSdb *pSdb = pMnode->pSdb; + + if (mndGetNumOfConsumers(pMnode, pShow->db, &pShow->numOfRows) != 0) { + return -1; + } + + int32_t cols = 0; + SSchema *pSchema = pMeta->pSchema; + + pShow->bytes[cols] = TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE; + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema[cols].name, "name"); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 8; + pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP; + strcpy(pSchema[cols].name, "create_time"); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 4; + pSchema[cols].type = TSDB_DATA_TYPE_INT; + strcpy(pSchema[cols].name, "columns"); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 4; + pSchema[cols].type = TSDB_DATA_TYPE_INT; + strcpy(pSchema[cols].name, "tags"); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); + cols++; + + pMeta->numOfColumns = htonl(cols); + pShow->numOfColumns = cols; + + pShow->offset[0] = 0; + for (int32_t i = 1; i < cols; ++i) { + pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1]; + } + + pShow->numOfRows = sdbGetSize(pSdb, SDB_CONSUMER); + pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; + strcpy(pMeta->tbFname, mndShowStr(pShow->type)); + + return 0; +} + +static int32_t mndRetrieveConsumer(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows) { + SMnode *pMnode = pMsg->pMnode; + SSdb *pSdb = pMnode->pSdb; + int32_t numOfRows = 0; + SConsumerObj *pConsumer = NULL; + int32_t cols = 0; + char *pWrite; + char prefix[64] = {0}; + + tstrncpy(prefix, pShow->db, 64); + strcat(prefix, TS_PATH_DELIMITER); + int32_t prefixLen = (int32_t)strlen(prefix); + + while (numOfRows < rows) { + pShow->pIter = sdbFetch(pSdb, SDB_CONSUMER, pShow->pIter, (void **)&pConsumer); + if (pShow->pIter == NULL) break; + + if (strncmp(pConsumer->name, prefix, prefixLen) != 0) { + sdbRelease(pSdb, pConsumer); + continue; + } + + cols = 0; + + char consumerName[TSDB_TABLE_NAME_LEN] = {0}; + tstrncpy(consumerName, pConsumer->name + prefixLen, TSDB_TABLE_NAME_LEN); + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + STR_TO_VARSTR(pWrite, consumerName); + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(int64_t *)pWrite = pConsumer->createTime; + cols++; + + /*pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;*/ + /**(int32_t *)pWrite = pConsumer->numOfColumns;*/ + /*cols++;*/ + + /*pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;*/ + /**(int32_t *)pWrite = pConsumer->numOfTags;*/ + /*cols++;*/ + + numOfRows++; + sdbRelease(pSdb, pConsumer); + } + + pShow->numOfReads += numOfRows; + mndVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow); + return numOfRows; +} + +static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter) { + SSdb *pSdb = pMnode->pSdb; + sdbCancelFetch(pSdb, pIter); +} diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index 597142d326..49c96967e6 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -33,10 +33,7 @@ static int32_t mndTopicActionInsert(SSdb *pSdb, STopicObj *pTopic); static int32_t mndTopicActionDelete(SSdb *pSdb, STopicObj *pTopic); static int32_t mndTopicActionUpdate(SSdb *pSdb, STopicObj *pTopic, STopicObj *pNewTopic); static int32_t mndProcessCreateTopicMsg(SMnodeMsg *pMsg); -static int32_t mndProcessAlterTopicMsg(SMnodeMsg *pMsg); static int32_t mndProcessDropTopicMsg(SMnodeMsg *pMsg); -static int32_t mndProcessCreateTopicInRsp(SMnodeMsg *pMsg); -static int32_t mndProcessAlterTopicInRsp(SMnodeMsg *pMsg); static int32_t mndProcessDropTopicInRsp(SMnodeMsg *pMsg); static int32_t mndProcessTopicMetaMsg(SMnodeMsg *pMsg); static int32_t mndGetTopicMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta); @@ -53,19 +50,8 @@ int32_t mndInitTopic(SMnode *pMnode) { .deleteFp = (SdbDeleteFp)mndTopicActionDelete}; mndSetMsgHandle(pMnode, TDMT_MND_CREATE_TOPIC, mndProcessCreateTopicMsg); - mndSetMsgHandle(pMnode, TDMT_MND_ALTER_TOPIC, mndProcessAlterTopicMsg); mndSetMsgHandle(pMnode, TDMT_MND_DROP_TOPIC, mndProcessDropTopicMsg); - mndSetMsgHandle(pMnode, TDMT_VND_CREATE_TOPIC_RSP, mndProcessCreateTopicInRsp); - mndSetMsgHandle(pMnode, TDMT_VND_ALTER_TOPIC_RSP, mndProcessAlterTopicInRsp); mndSetMsgHandle(pMnode, TDMT_VND_DROP_TOPIC_RSP, mndProcessDropTopicInRsp); - mndSetMsgHandle(pMnode, TDMT_VND_TABLE_META, mndProcessTopicMetaMsg); - - /*mndAddShowMetaHandle(pMnode, TSDB_MGMT_TOPIC, mndGetTopicMeta);*/ - /*mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TOPIC, mndRetrieveTopic);*/ - /*mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TOPIC, mndCancelGetNextTopic);*/ - - mndSetMsgHandle(pMnode, TDMT_MND_CREATE_TOPIC, mndProcessCreateTopicMsg); - mndSetMsgHandle(pMnode, TDMT_VND_CREATE_TOPIC_RSP, mndProcessCreateTopicInRsp); return sdbSetTable(pMnode->pSdb, table); } @@ -145,24 +131,9 @@ static int32_t mndTopicActionUpdate(SSdb *pSdb, STopicObj *pOldTopic, STopicObj atomic_exchange_32(&pOldTopic->version, pNewTopic->version); taosWLockLatch(&pOldTopic->lock); -#if 0 + + //TODO handle update - pOldTopic->numOfColumns = pNewTopic->numOfColumns; - pOldTopic->numOfTags = pNewTopic->numOfTags; - int32_t totalCols = pNewTopic->numOfTags + pNewTopic->numOfColumns; - int32_t totalSize = totalCols * sizeof(SSchema); - - if (pOldTopic->numOfTags + pOldTopic->numOfColumns < totalCols) { - void *pSchema = malloc(totalSize); - if (pSchema != NULL) { - free(pOldTopic->pSchema); - pOldTopic->pSchema = pSchema; - } - } - - memcpy(pOldTopic->pSchema, pNewTopic->pSchema, totalSize); - -#endif taosWUnLockLatch(&pOldTopic->lock); return 0; } @@ -191,41 +162,6 @@ static SDbObj *mndAcquireDbByTopic(SMnode *pMnode, char *topicName) { return mndAcquireDb(pMnode, db); } -static SCreateTopicInternalMsg *mndBuildCreateTopicMsg(SMnode *pMnode, SVgObj *pVgroup, STopicObj *pTopic) { - int32_t totalCols = 0; - int32_t contLen = sizeof(SCreateTopicInternalMsg) + pTopic->execLen + pTopic->sqlLen; - - SCreateTopicInternalMsg *pCreate = calloc(1, contLen); - if (pCreate == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; - } - - pCreate->head.contLen = htonl(contLen); - pCreate->head.vgId = htonl(pVgroup->vgId); - memcpy(pCreate->name, pTopic->name, TSDB_TABLE_FNAME_LEN); - pCreate->tuid = htobe64(pTopic->uid); - pCreate->sverson = htonl(pTopic->version); - - pCreate->sql = malloc(pTopic->sqlLen); - if (pCreate->sql == NULL) { - free(pCreate); - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; - } - memcpy(pCreate->sql, pTopic->sql, pTopic->sqlLen); - - pCreate->executor = malloc(pTopic->execLen); - if (pCreate->executor == NULL) { - free(pCreate); - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; - } - memcpy(pCreate->executor, pTopic->executor, pTopic->execLen); - - return pCreate; -} - static SDropTopicInternalMsg *mndBuildDropTopicMsg(SMnode *pMnode, SVgObj *pVgroup, STopicObj *pTopic) { int32_t contLen = sizeof(SDropTopicInternalMsg); @@ -243,109 +179,12 @@ static SDropTopicInternalMsg *mndBuildDropTopicMsg(SMnode *pMnode, SVgObj *pVgro return pDrop; } -static int32_t mndCheckCreateTopicMsg(SCreateTopicMsg *pCreate) { +static int32_t mndCheckCreateTopicMsg(SCMCreateTopicReq *pCreate) { // deserialize and other stuff return 0; } -static int32_t mndSetCreateTopicRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, STopicObj *pTopic) { - SSdbRaw *pRedoRaw = mndTopicActionEncode(pTopic); - if (pRedoRaw == NULL) return -1; - if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1; - if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING) != 0) return -1; - - return 0; -} - -static int32_t mndSetCreateTopicUndoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, STopicObj *pTopic) { - SSdbRaw *pUndoRaw = mndTopicActionEncode(pTopic); - if (pUndoRaw == NULL) return -1; - if (mndTransAppendUndolog(pTrans, pUndoRaw) != 0) return -1; - if (sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED) != 0) return -1; - - return 0; -} - -static int32_t mndSetCreateTopicCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, STopicObj *pTopic) { - SSdbRaw *pCommitRaw = mndTopicActionEncode(pTopic); - if (pCommitRaw == NULL) return -1; - if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1; - if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) return -1; - - return 0; -} - -static int32_t mndSetCreateTopicRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, STopicObj *pTopic) { - SSdb *pSdb = pMnode->pSdb; - SVgObj *pVgroup = NULL; - void *pIter = NULL; - - while (1) { - pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); - if (pIter == NULL) break; - if (pVgroup->dbUid != pDb->uid) continue; - - SCreateTopicInternalMsg *pMsg = mndBuildCreateTopicMsg(pMnode, pVgroup, pTopic); - if (pMsg == NULL) { - sdbCancelFetch(pSdb, pIter); - sdbRelease(pSdb, pVgroup); - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - STransAction action = {0}; - action.epSet = mndGetVgroupEpset(pMnode, pVgroup); - action.pCont = pMsg; - action.contLen = htonl(pMsg->head.contLen); - action.msgType = TDMT_VND_CREATE_TOPIC; - if (mndTransAppendRedoAction(pTrans, &action) != 0) { - free(pMsg); - sdbCancelFetch(pSdb, pIter); - sdbRelease(pSdb, pVgroup); - return -1; - } - sdbRelease(pSdb, pVgroup); - } - - return 0; -} - -static int32_t mndSetCreateTopicUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, STopicObj *pTopic) { - SSdb *pSdb = pMnode->pSdb; - SVgObj *pVgroup = NULL; - void *pIter = NULL; - - while (1) { - pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); - if (pIter == NULL) break; - if (pVgroup->dbUid != pDb->uid) continue; - - SDropTopicInternalMsg *pMsg = mndBuildDropTopicMsg(pMnode, pVgroup, pTopic); - if (pMsg == NULL) { - sdbCancelFetch(pSdb, pIter); - sdbRelease(pSdb, pVgroup); - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - STransAction action = {0}; - action.epSet = mndGetVgroupEpset(pMnode, pVgroup); - action.pCont = pMsg; - action.contLen = sizeof(SDropTopicInternalMsg); - action.msgType = TDMT_VND_DROP_TOPIC; - if (mndTransAppendUndoAction(pTrans, &action) != 0) { - free(pMsg); - sdbCancelFetch(pSdb, pIter); - sdbRelease(pSdb, pVgroup); - return -1; - } - sdbRelease(pSdb, pVgroup); - } - - return 0; -} - -static int32_t mndCreateTopic(SMnode *pMnode, SMnodeMsg *pMsg, SCreateTopicMsg *pCreate, SDbObj *pDb) { +static int32_t mndCreateTopic(SMnode *pMnode, SMnodeMsg *pMsg, SCMCreateTopicReq *pCreate, SDbObj *pDb) { STopicObj topicObj = {0}; tstrncpy(topicObj.name, pCreate->name, TSDB_TABLE_FNAME_LEN); tstrncpy(topicObj.db, pDb->name, TSDB_DB_FNAME_LEN); @@ -355,66 +194,17 @@ static int32_t mndCreateTopic(SMnode *pMnode, SMnodeMsg *pMsg, SCreateTopicMsg * topicObj.dbUid = pDb->uid; topicObj.version = 1; -#if 0 - int32_t totalCols = topicObj.numOfColumns + topicObj.numOfTags; - int32_t totalSize = totalCols * sizeof(SSchema); - topicObj.sql = malloc(totalSize); - if (topicObj.sql == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - memcpy(topicObj.sql, pCreate->sql, totalSize); -#endif - - int32_t code = 0; - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pMsg->rpcMsg); - if (pTrans == NULL) { - mError("topic:%s, failed to create since %s", pCreate->name, terrstr()); - return -1; - } - mDebug("trans:%d, used to create topic:%s", pTrans->id, pCreate->name); - - if (mndSetCreateTopicRedoLogs(pMnode, pTrans, pDb, &topicObj) != 0) { - mError("trans:%d, failed to set redo log since %s", pTrans->id, terrstr()); - goto CREATE_TOPIC_OVER; - } - - if (mndSetCreateTopicUndoLogs(pMnode, pTrans, pDb, &topicObj) != 0) { - mError("trans:%d, failed to set undo log since %s", pTrans->id, terrstr()); - goto CREATE_TOPIC_OVER; - } - - if (mndSetCreateTopicCommitLogs(pMnode, pTrans, pDb, &topicObj) != 0) { - mError("trans:%d, failed to set commit log since %s", pTrans->id, terrstr()); - goto CREATE_TOPIC_OVER; - } - - if (mndSetCreateTopicRedoActions(pMnode, pTrans, pDb, &topicObj) != 0) { - mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr()); - goto CREATE_TOPIC_OVER; - } - - if (mndSetCreateTopicUndoActions(pMnode, pTrans, pDb, &topicObj) != 0) { - mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr()); - goto CREATE_TOPIC_OVER; - } - - if (mndTransPrepare(pMnode, pTrans) != 0) { - mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); - mndTransDrop(pTrans); - return -1; - } - - code = 0; - -CREATE_TOPIC_OVER: - mndTransDrop(pTrans); - return code; + SSdbRaw *pTopicRaw = mndTopicActionEncode(&topicObj); + if (pTopicRaw == NULL) return -1; + if (sdbSetRawStatus(pTopicRaw, SDB_STATUS_READY) != 0) return -1; + return sdbWrite(pMnode->pSdb, pTopicRaw); } static int32_t mndProcessCreateTopicMsg(SMnodeMsg *pMsg) { - SMnode *pMnode = pMsg->pMnode; - SCreateTopicMsg *pCreate = pMsg->rpcMsg.pCont; + SMnode *pMnode = pMsg->pMnode; + char *msgStr = pMsg->rpcMsg.pCont; + SCMCreateTopicReq* pCreate; + tDeserializeSCMCreateTopicReq(msgStr, pCreate); mDebug("topic:%s, start to create", pCreate->name); @@ -436,15 +226,6 @@ static int32_t mndProcessCreateTopicMsg(SMnodeMsg *pMsg) { } } - // topic should have different name with stb - SStbObj *pStb = mndAcquireStb(pMnode, pCreate->name); - if (pStb != NULL) { - sdbRelease(pMnode->pSdb, pStb); - terrno = TSDB_CODE_MND_NAME_CONFLICT_WITH_STB; - mError("topic:%s, failed to create since %s", pCreate->name, terrstr()); - return -1; - } - SDbObj *pDb = mndAcquireDbByTopic(pMnode, pCreate->name); if (pDb == NULL) { terrno = TSDB_CODE_MND_DB_NOT_SELECTED; @@ -464,144 +245,7 @@ static int32_t mndProcessCreateTopicMsg(SMnodeMsg *pMsg) { return TSDB_CODE_MND_ACTION_IN_PROGRESS; } -static int32_t mndCheckAlterTopicMsg(SAlterTopicMsg *pAlter) { - SSchema *pSchema = &pAlter->schema; - pSchema->colId = htonl(pSchema->colId); - pSchema->bytes = htonl(pSchema->bytes); - - if (pSchema->type <= 0) { - terrno = TSDB_CODE_MND_INVALID_TOPIC_OPTION; - return -1; - } - if (pSchema->colId < 0 || pSchema->colId >= (TSDB_MAX_COLUMNS + TSDB_MAX_TAGS)) { - terrno = TSDB_CODE_MND_INVALID_TOPIC_OPTION; - return -1; - } - if (pSchema->bytes <= 0) { - terrno = TSDB_CODE_MND_INVALID_TOPIC_OPTION; - return -1; - } - if (pSchema->name[0] == 0) { - terrno = TSDB_CODE_MND_INVALID_TOPIC_OPTION; - return -1; - } - - return 0; -} - -static int32_t mndUpdateTopic(SMnode *pMnode, SMnodeMsg *pMsg, STopicObj *pOldTopic, STopicObj *pNewTopic) { return 0; } - -static int32_t mndProcessAlterTopicMsg(SMnodeMsg *pMsg) { - SMnode *pMnode = pMsg->pMnode; - SAlterTopicMsg *pAlter = pMsg->rpcMsg.pCont; - - mDebug("topic:%s, start to alter", pAlter->name); - - if (mndCheckAlterTopicMsg(pAlter) != 0) { - mError("topic:%s, failed to alter since %s", pAlter->name, terrstr()); - return -1; - } - - STopicObj *pTopic = mndAcquireTopic(pMnode, pAlter->name); - if (pTopic == NULL) { - terrno = TSDB_CODE_MND_TOPIC_NOT_EXIST; - mError("topic:%s, failed to alter since %s", pAlter->name, terrstr()); - return -1; - } - - STopicObj topicObj = {0}; - memcpy(&topicObj, pTopic, sizeof(STopicObj)); - - int32_t code = mndUpdateTopic(pMnode, pMsg, pTopic, &topicObj); - mndReleaseTopic(pMnode, pTopic); - - if (code != 0) { - mError("topic:%s, failed to alter since %s", pAlter->name, tstrerror(code)); - return code; - } - - return TSDB_CODE_MND_ACTION_IN_PROGRESS; -} - -static int32_t mndProcessAlterTopicInRsp(SMnodeMsg *pMsg) { - mndTransProcessRsp(pMsg); - return 0; -} - -static int32_t mndSetDropTopicRedoLogs(SMnode *pMnode, STrans *pTrans, STopicObj *pTopic) { - SSdbRaw *pRedoRaw = mndTopicActionEncode(pTopic); - if (pRedoRaw == NULL) return -1; - if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1; - if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING) != 0) return -1; - - return 0; -} - -static int32_t mndSetDropTopicUndoLogs(SMnode *pMnode, STrans *pTrans, STopicObj *pTopic) { - SSdbRaw *pUndoRaw = mndTopicActionEncode(pTopic); - if (pUndoRaw == NULL) return -1; - if (mndTransAppendUndolog(pTrans, pUndoRaw) != 0) return -1; - if (sdbSetRawStatus(pUndoRaw, SDB_STATUS_READY) != 0) return -1; - - return 0; -} - -static int32_t mndSetDropTopicCommitLogs(SMnode *pMnode, STrans *pTrans, STopicObj *pTopic) { - SSdbRaw *pCommitRaw = mndTopicActionEncode(pTopic); - if (pCommitRaw == NULL) return -1; - if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1; - if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED) != 0) return -1; - - return 0; -} - -static int32_t mndSetDropTopicRedoActions(SMnode *pMnode, STrans *pTrans, STopicObj *pTopic) { return 0; } - -static int32_t mndSetDropTopicUndoActions(SMnode *pMnode, STrans *pTrans, STopicObj *pTopic) { return 0; } - static int32_t mndDropTopic(SMnode *pMnode, SMnodeMsg *pMsg, STopicObj *pTopic) { - int32_t code = -1; - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pMsg->rpcMsg); - if (pTrans == NULL) { - mError("topic:%s, failed to drop since %s", pTopic->name, terrstr()); - return -1; - } - mDebug("trans:%d, used to drop topic:%s", pTrans->id, pTopic->name); - - if (mndSetDropTopicRedoLogs(pMnode, pTrans, pTopic) != 0) { - mError("trans:%d, failed to set redo log since %s", pTrans->id, terrstr()); - goto DROP_TOPIC_OVER; - } - - if (mndSetDropTopicUndoLogs(pMnode, pTrans, pTopic) != 0) { - mError("trans:%d, failed to set undo log since %s", pTrans->id, terrstr()); - goto DROP_TOPIC_OVER; - } - - if (mndSetDropTopicCommitLogs(pMnode, pTrans, pTopic) != 0) { - mError("trans:%d, failed to set commit log since %s", pTrans->id, terrstr()); - goto DROP_TOPIC_OVER; - } - - if (mndSetDropTopicRedoActions(pMnode, pTrans, pTopic) != 0) { - mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr()); - goto DROP_TOPIC_OVER; - } - - if (mndSetDropTopicUndoActions(pMnode, pTrans, pTopic) != 0) { - mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr()); - goto DROP_TOPIC_OVER; - } - - if (mndTransPrepare(pMnode, pTrans) != 0) { - mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); - goto DROP_TOPIC_OVER; - } - - code = 0; - -DROP_TOPIC_OVER: - mndTransDrop(pTrans); return 0; } @@ -705,11 +349,6 @@ static int32_t mndProcessTopicMetaMsg(SMnodeMsg *pMsg) { return 0; } -static int32_t mndProcessCreateTopicInRsp(SMnodeMsg *pMsg) { - mndTransProcessRsp(pMsg); - return 0; -} - static int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTopics) { SSdb *pSdb = pMnode->pSdb; diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index 40943849f1..31b5939463 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -31,6 +31,11 @@ extern "C" { #define CTG_DEFAULT_INVALID_VERSION (-1) +enum { + CTG_READ = 1, + CTG_WRITE, +}; + typedef struct SVgroupListCache { int32_t vgroupVersion; SHashObj *cache; // key:vgId, value:SVgroupInfo @@ -41,6 +46,7 @@ typedef struct SDBVgroupCache { } SDBVgroupCache; typedef struct STableMetaCache { + SRWLatch stableLock; SHashObj *cache; //key:fulltablename, value:STableMeta SHashObj *stableCache; //key:suid, value:STableMeta* } STableMetaCache; @@ -71,6 +77,31 @@ typedef uint32_t (*tableNameHashFp)(const char *, uint32_t); #define CTG_ERR_LRET(c,...) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { ctgError(__VA_ARGS__); terrno = _code; return _code; } } while (0) #define CTG_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0) +#define CTG_LOCK(type, _lock) do { \ + if (CTG_READ == (type)) { \ + if ((*(_lock)) < 0) assert(0); \ + taosRLockLatch(_lock); \ + ctgDebug("CTG RLOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \ + } else { \ + if ((*(_lock)) < 0) assert(0); \ + taosWLockLatch(_lock); \ + ctgDebug("CTG WLOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \ + } \ +} while (0) + +#define CTG_UNLOCK(type, _lock) do { \ + if (CTG_READ == (type)) { \ + if ((*(_lock)) <= 0) assert(0); \ + taosRUnLockLatch(_lock); \ + ctgDebug("CTG RULOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \ + } else { \ + if ((*(_lock)) <= 0) assert(0); \ + taosWUnLockLatch(_lock); \ + ctgDebug("CTG WULOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \ + } \ +} while (0) + + #ifdef __cplusplus } #endif diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 46d23efeb4..b632ac772c 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -20,24 +20,28 @@ SCatalogMgmt ctgMgmt = {0}; -int32_t ctgGetDBVgroupFromCache(struct SCatalog* pCatalog, const char *dbName, SDBVgroupInfo *dbInfo, int32_t *exist) { +int32_t ctgGetDBVgroupFromCache(struct SCatalog* pCatalog, const char *dbName, SDBVgroupInfo **dbInfo, bool *inCache) { if (NULL == pCatalog->dbCache.cache) { - *exist = 0; + *inCache = false; return TSDB_CODE_SUCCESS; } - SDBVgroupInfo *info = taosHashGet(pCatalog->dbCache.cache, dbName, strlen(dbName)); + SDBVgroupInfo *info = taosHashAcquire(pCatalog->dbCache.cache, dbName, strlen(dbName)); if (NULL == info) { - *exist = 0; + *inCache = false; return TSDB_CODE_SUCCESS; } - if (dbInfo) { - *dbInfo = *info; + CTG_LOCK(CTG_READ, &info->lock); + if (NULL == info->vgInfo) { + CTG_UNLOCK(CTG_READ, &info->lock); + *inCache = false; + return TSDB_CODE_SUCCESS; } - *exist = 1; + *dbInfo = info; + *inCache = true; return TSDB_CODE_SUCCESS; } @@ -80,46 +84,51 @@ int32_t ctgGetTableMetaFromCache(struct SCatalog* pCatalog, const SName* pTableN char tbFullName[TSDB_TABLE_FNAME_LEN]; tNameExtractFullName(pTableName, tbFullName); - STableMeta *tbMeta = taosHashGet(pCatalog->tableCache.cache, tbFullName, strlen(tbFullName)); + *pTableMeta = NULL; - if (NULL == tbMeta) { + size_t sz = 0; + STableMeta *tbMeta = taosHashGetCloneExt(pCatalog->tableCache.cache, tbFullName, strlen(tbFullName), NULL, (void **)pTableMeta, &sz); + + if (NULL == *pTableMeta) { *exist = 0; return TSDB_CODE_SUCCESS; } - if (tbMeta->tableType == TSDB_CHILD_TABLE) { - STableMeta **stbMeta = taosHashGet(pCatalog->tableCache.stableCache, &tbMeta->suid, sizeof(tbMeta->suid)); - if (NULL == stbMeta || NULL == *stbMeta) { - *exist = 0; - return TSDB_CODE_SUCCESS; - } + *exist = 1; - if ((*stbMeta)->suid != tbMeta->suid) { - ctgError("stable cache error, expected suid:%"PRId64 ",actual suid:%"PRId64, tbMeta->suid, (*stbMeta)->suid); - CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); - } - - int32_t metaSize = sizeof(STableMeta) + ((*stbMeta)->tableInfo.numOfTags + (*stbMeta)->tableInfo.numOfColumns) * sizeof(SSchema); - *pTableMeta = calloc(1, metaSize); - if (NULL == *pTableMeta) { - ctgError("calloc size[%d] failed", metaSize); - CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); - } - - memcpy(*pTableMeta, tbMeta, sizeof(SCTableMeta)); - memcpy(&(*pTableMeta)->sversion, &(*stbMeta)->sversion, metaSize - sizeof(SCTableMeta)); - } else { - int32_t metaSize = sizeof(STableMeta) + (tbMeta->tableInfo.numOfTags + tbMeta->tableInfo.numOfColumns) * sizeof(SSchema); - *pTableMeta = calloc(1, metaSize); - if (NULL == *pTableMeta) { - ctgError("calloc size[%d] failed", metaSize); - CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); - } - - memcpy(*pTableMeta, tbMeta, metaSize); + if (tbMeta->tableType != TSDB_CHILD_TABLE) { + return TSDB_CODE_SUCCESS; + } + + CTG_LOCK(CTG_READ, &pCatalog->tableCache.stableLock); + + STableMeta **stbMeta = taosHashGet(pCatalog->tableCache.stableCache, &tbMeta->suid, sizeof(tbMeta->suid)); + if (NULL == stbMeta || NULL == *stbMeta) { + CTG_UNLOCK(CTG_READ, &pCatalog->tableCache.stableLock); + qError("no stable:%"PRIx64 " meta in cache", tbMeta->suid); + tfree(*pTableMeta); + *exist = 0; + return TSDB_CODE_SUCCESS; } - *exist = 1; + if ((*stbMeta)->suid != tbMeta->suid) { + CTG_UNLOCK(CTG_READ, &pCatalog->tableCache.stableLock); + tfree(*pTableMeta); + ctgError("stable cache error, expected suid:%"PRId64 ",actual suid:%"PRId64, tbMeta->suid, (*stbMeta)->suid); + CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); + } + + int32_t metaSize = sizeof(STableMeta) + ((*stbMeta)->tableInfo.numOfTags + (*stbMeta)->tableInfo.numOfColumns) * sizeof(SSchema); + *pTableMeta = realloc(*pTableMeta, metaSize); + if (NULL == *pTableMeta) { + CTG_UNLOCK(CTG_READ, &pCatalog->tableCache.stableLock); + ctgError("calloc size[%d] failed", metaSize); + CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); + } + + memcpy(&(*pTableMeta)->sversion, &(*stbMeta)->sversion, metaSize - sizeof(SCTableMeta)); + + CTG_UNLOCK(CTG_READ, &pCatalog->tableCache.stableLock); return TSDB_CODE_SUCCESS; } @@ -223,9 +232,11 @@ int32_t ctgGetHashFunction(int8_t hashMethod, tableNameHashFp *fp) { int32_t ctgGetVgInfoFromDB(struct SCatalog *pCatalog, void *pRpc, const SEpSet *pMgmtEps, SDBVgroupInfo *dbInfo, SArray** vgroupList) { SHashObj *vgroupHash = NULL; SVgroupInfo *vgInfo = NULL; + SArray *vgList = NULL; + int32_t code = 0; - *vgroupList = taosArrayInit(taosHashGetSize(dbInfo->vgInfo), sizeof(SVgroupInfo)); - if (NULL == *vgroupList) { + vgList = taosArrayInit(taosHashGetSize(dbInfo->vgInfo), sizeof(SVgroupInfo)); + if (NULL == vgList) { ctgError("taosArrayInit failed"); CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); } @@ -234,19 +245,34 @@ int32_t ctgGetVgInfoFromDB(struct SCatalog *pCatalog, void *pRpc, const SEpSet * while (pIter) { vgInfo = pIter; - if (NULL == taosArrayPush(*vgroupList, vgInfo)) { + if (NULL == taosArrayPush(vgList, vgInfo)) { ctgError("taosArrayPush failed"); - CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); + CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); } pIter = taosHashIterate(dbInfo->vgInfo, pIter); vgInfo = NULL; } + *vgroupList = vgList; + vgList = NULL; + return TSDB_CODE_SUCCESS; + +_return: + + if (vgList) { + taosArrayDestroy(vgList); + } + + CTG_RET(code); } int32_t ctgGetVgInfoFromHashValue(SDBVgroupInfo *dbInfo, const SName *pTableName, SVgroupInfo *pVgroup) { + int32_t code = 0; + + CTG_LOCK(CTG_READ, &dbInfo->lock); + int32_t vgNum = taosHashGetSize(dbInfo->vgInfo); char db[TSDB_DB_FNAME_LEN] = {0}; tNameGetFullDbName(pTableName, db); @@ -259,7 +285,7 @@ int32_t ctgGetVgInfoFromHashValue(SDBVgroupInfo *dbInfo, const SName *pTableName tableNameHashFp fp = NULL; SVgroupInfo *vgInfo = NULL; - CTG_ERR_RET(ctgGetHashFunction(dbInfo->hashMethod, &fp)); + CTG_ERR_JRET(ctgGetHashFunction(dbInfo->hashMethod, &fp)); char tbFullName[TSDB_TABLE_FNAME_LEN]; tNameExtractFullName(pTableName, tbFullName); @@ -279,19 +305,23 @@ int32_t ctgGetVgInfoFromHashValue(SDBVgroupInfo *dbInfo, const SName *pTableName if (NULL == vgInfo) { ctgError("no hash range found for hashvalue[%u]", hashValue); - CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); + CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); } *pVgroup = *vgInfo; - return TSDB_CODE_SUCCESS; +_return: + + CTG_UNLOCK(CTG_READ, &dbInfo->lock); + + CTG_RET(TSDB_CODE_SUCCESS); } int32_t ctgGetTableMetaImpl(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, bool forceUpdate, STableMeta** pTableMeta) { if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == pTableMeta) { CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); } - + int32_t exist = 0; if (!forceUpdate) { @@ -316,21 +346,23 @@ int32_t ctgGetTableMetaImpl(struct SCatalog* pCatalog, void *pRpc, const SEpSet* int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *output) { + int32_t code = 0; + if (output->metaNum != 1 && output->metaNum != 2) { ctgError("invalid table meta number[%d] got from meta rsp", output->metaNum); - CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); + CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); } if (NULL == output->tbMeta) { ctgError("no valid table meta got from meta rsp"); - CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); + CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); } if (NULL == pCatalog->tableCache.cache) { pCatalog->tableCache.cache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); if (NULL == pCatalog->tableCache.cache) { ctgError("init hash[%d] for tablemeta cache failed", ctgMgmt.cfg.maxTblCacheNum); - CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); + CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); } } @@ -338,50 +370,59 @@ int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *out pCatalog->tableCache.stableCache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_ENTRY_LOCK); if (NULL == pCatalog->tableCache.stableCache) { ctgError("init hash[%d] for stablemeta cache failed", ctgMgmt.cfg.maxTblCacheNum); - CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); + CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); } } if (output->metaNum == 2) { if (taosHashPut(pCatalog->tableCache.cache, output->ctbFname, strlen(output->ctbFname), &output->ctbMeta, sizeof(output->ctbMeta)) != 0) { ctgError("push ctable[%s] to table cache failed", output->ctbFname); - CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); + CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); } if (TSDB_SUPER_TABLE != output->tbMeta->tableType) { ctgError("table type[%d] error, expected:%d", output->tbMeta->tableType, TSDB_SUPER_TABLE); - CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); + CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); } } int32_t tbSize = sizeof(*output->tbMeta) + sizeof(SSchema) * (output->tbMeta->tableInfo.numOfColumns + output->tbMeta->tableInfo.numOfTags); - if (taosHashPut(pCatalog->tableCache.cache, output->tbFname, strlen(output->tbFname), output->tbMeta, tbSize) != 0) { - ctgError("push table[%s] to table cache failed", output->tbFname); - CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); - } if (TSDB_SUPER_TABLE == output->tbMeta->tableType) { - if (taosHashPut(pCatalog->tableCache.stableCache, &output->tbMeta->suid, sizeof(output->tbMeta->suid), &output->tbMeta, POINTER_BYTES) != 0) { + CTG_LOCK(CTG_WRITE, &pCatalog->tableCache.stableLock); + if (taosHashPut(pCatalog->tableCache.cache, output->tbFname, strlen(output->tbFname), output->tbMeta, tbSize) != 0) { + CTG_UNLOCK(CTG_WRITE, &pCatalog->tableCache.stableLock); + ctgError("push table[%s] to table cache failed", output->tbFname); + CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); + } + + STableMeta *tbMeta = taosHashGet(pCatalog->tableCache.cache, output->tbFname, strlen(output->tbFname)); + if (taosHashPut(pCatalog->tableCache.stableCache, &output->tbMeta->suid, sizeof(output->tbMeta->suid), &tbMeta, POINTER_BYTES) != 0) { + CTG_UNLOCK(CTG_WRITE, &pCatalog->tableCache.stableLock); ctgError("push suid[%"PRIu64"] to stable cache failed", output->tbMeta->suid); - CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); + CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); + } + CTG_UNLOCK(CTG_WRITE, &pCatalog->tableCache.stableLock); + } else { + if (taosHashPut(pCatalog->tableCache.cache, output->tbFname, strlen(output->tbFname), output->tbMeta, tbSize) != 0) { + ctgError("push table[%s] to table cache failed", output->tbFname); + CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); } } + +_return: + tfree(output->tbMeta); - return TSDB_CODE_SUCCESS; + CTG_RET(code); } -int32_t ctgGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbName, int32_t forceUpdate, SDBVgroupInfo* dbInfo) { - if (NULL == pCatalog || NULL == dbName || NULL == pRpc || NULL == pMgmtEps) { - CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); - } - - int32_t exist = 0; - +int32_t ctgGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbName, int32_t forceUpdate, SDBVgroupInfo** dbInfo) { + bool inCache = false; if (0 == forceUpdate) { - CTG_ERR_RET(ctgGetDBVgroupFromCache(pCatalog, dbName, dbInfo, &exist)); + CTG_ERR_RET(ctgGetDBVgroupFromCache(pCatalog, dbName, dbInfo, &inCache)); - if (exist) { + if (inCache) { return TSDB_CODE_SUCCESS; } } @@ -397,9 +438,7 @@ int32_t ctgGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgm CTG_ERR_RET(catalogUpdateDBVgroup(pCatalog, dbName, &DbOut.dbVgroup)); - if (dbInfo) { - *dbInfo = DbOut.dbVgroup; - } + CTG_ERR_RET(ctgGetDBVgroupFromCache(pCatalog, dbName, dbInfo, &inCache)); return TSDB_CODE_SUCCESS; } @@ -479,17 +518,68 @@ int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, return TSDB_CODE_SUCCESS; } - SDBVgroupInfo * dbInfo = taosHashGet(pCatalog->dbCache.cache, dbName, strlen(dbName)); + SDBVgroupInfo * dbInfo = taosHashAcquire(pCatalog->dbCache.cache, dbName, strlen(dbName)); if (NULL == dbInfo) { *version = CTG_DEFAULT_INVALID_VERSION; return TSDB_CODE_SUCCESS; } *version = dbInfo->vgVersion; + taosHashRelease(pCatalog->dbCache.cache, dbInfo); return TSDB_CODE_SUCCESS; } +int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbName, int32_t forceUpdate, SArray** vgroupList) { + if (NULL == pCatalog || NULL == dbName || NULL == pRpc || NULL == pMgmtEps || NULL == vgroupList) { + CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); + } + + SDBVgroupInfo* db = NULL; + int32_t code = 0; + SVgroupInfo *vgInfo = NULL; + SArray *vgList = NULL; + + CTG_ERR_JRET(ctgGetDBVgroup(pCatalog, pRpc, pMgmtEps, dbName, forceUpdate, &db)); + + vgList = taosArrayInit(taosHashGetSize(db->vgInfo), sizeof(SVgroupInfo)); + if (NULL == vgList) { + ctgError("taosArrayInit failed"); + CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); + } + + void *pIter = taosHashIterate(db->vgInfo, NULL); + while (pIter) { + vgInfo = pIter; + + if (NULL == taosArrayPush(vgList, vgInfo)) { + ctgError("taosArrayPush failed"); + CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); + } + + pIter = taosHashIterate(db->vgInfo, pIter); + vgInfo = NULL; + } + + *vgroupList = vgList; + vgList = NULL; + +_return: + + if (db) { + CTG_UNLOCK(CTG_READ, &db->lock); + taosHashRelease(pCatalog->dbCache.cache, db); + } + + if (vgList) { + taosArrayDestroy(vgList); + vgList = NULL; + } + + CTG_RET(code); +} + + int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo) { if (NULL == pCatalog || NULL == dbName || NULL == dbInfo) { CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); @@ -497,13 +587,17 @@ int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDB if (dbInfo->vgVersion < 0) { if (pCatalog->dbCache.cache) { - SDBVgroupInfo *oldInfo = taosHashGet(pCatalog->dbCache.cache, dbName, strlen(dbName)); - if (oldInfo && oldInfo->vgInfo) { - taosHashCleanup(oldInfo->vgInfo); - oldInfo->vgInfo = NULL; - } + SDBVgroupInfo *oldInfo = taosHashAcquire(pCatalog->dbCache.cache, dbName, strlen(dbName)); + if (oldInfo) { + CTG_LOCK(CTG_WRITE, &oldInfo->lock); + if (oldInfo->vgInfo) { + taosHashCleanup(oldInfo->vgInfo); + oldInfo->vgInfo = NULL; + } + CTG_UNLOCK(CTG_WRITE, &oldInfo->lock); - taosHashRemove(pCatalog->dbCache.cache, dbName, strlen(dbName)); + taosHashRelease(pCatalog->dbCache.cache, oldInfo); + } } ctgWarn("remove db [%s] from cache", dbName); @@ -517,10 +611,16 @@ int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDB CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); } } else { - SDBVgroupInfo *oldInfo = taosHashGet(pCatalog->dbCache.cache, dbName, strlen(dbName)); - if (oldInfo && oldInfo->vgInfo) { - taosHashCleanup(oldInfo->vgInfo); - oldInfo->vgInfo = NULL; + SDBVgroupInfo *oldInfo = taosHashAcquire(pCatalog->dbCache.cache, dbName, strlen(dbName)); + if (oldInfo) { + CTG_LOCK(CTG_WRITE, &oldInfo->lock); + if (oldInfo->vgInfo) { + taosHashCleanup(oldInfo->vgInfo); + oldInfo->vgInfo = NULL; + } + CTG_UNLOCK(CTG_WRITE, &oldInfo->lock); + + taosHashRelease(pCatalog->dbCache.cache, oldInfo); } } @@ -573,7 +673,10 @@ int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const S STableMeta *tbMeta = NULL; int32_t code = 0; SVgroupInfo vgroupInfo = {0}; - SDBVgroupInfo dbVgroup = {0}; + SDBVgroupInfo* dbVgroup = NULL; + SArray *vgList = NULL; + + *pVgroupList = NULL; CTG_ERR_JRET(catalogGetTableMeta(pCatalog, pRpc, pMgmtEps, pTableName, &tbMeta)); @@ -582,38 +685,48 @@ int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const S CTG_ERR_JRET(ctgGetDBVgroup(pCatalog, pRpc, pMgmtEps, db, false, &dbVgroup)); if (tbMeta->tableType == TSDB_SUPER_TABLE) { - CTG_ERR_JRET(ctgGetVgInfoFromDB(pCatalog, pRpc, pMgmtEps, &dbVgroup, pVgroupList)); + CTG_ERR_JRET(ctgGetVgInfoFromDB(pCatalog, pRpc, pMgmtEps, dbVgroup, pVgroupList)); } else { int32_t vgId = tbMeta->vgId; - if (NULL == taosHashGetClone(dbVgroup.vgInfo, &vgId, sizeof(vgId), &vgroupInfo)) { + if (NULL == taosHashGetClone(dbVgroup->vgInfo, &vgId, sizeof(vgId), &vgroupInfo)) { ctgError("vgId[%d] not found in vgroup list", vgId); CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); } - *pVgroupList = taosArrayInit(1, sizeof(SVgroupInfo)); - if (NULL == *pVgroupList) { + vgList = taosArrayInit(1, sizeof(SVgroupInfo)); + if (NULL == vgList) { ctgError("taosArrayInit failed"); CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); } - if (NULL == taosArrayPush(*pVgroupList, &vgroupInfo)) { + if (NULL == taosArrayPush(vgList, &vgroupInfo)) { ctgError("push vgroupInfo to array failed"); CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); } - } - tfree(tbMeta); - return TSDB_CODE_SUCCESS; + *pVgroupList = vgList; + vgList = NULL; + } _return: tfree(tbMeta); - taosArrayDestroy(*pVgroupList); + + if (dbVgroup) { + CTG_UNLOCK(CTG_READ, &dbVgroup->lock); + taosHashRelease(pCatalog->dbCache.cache, dbVgroup); + } + + if (vgList) { + taosArrayDestroy(vgList); + vgList = NULL; + } + CTG_RET(code); } int32_t catalogGetTableHashVgroup(struct SCatalog *pCatalog, void *pTransporter, const SEpSet *pMgmtEps, const SName *pTableName, SVgroupInfo *pVgroup) { - SDBVgroupInfo dbInfo = {0}; + SDBVgroupInfo* dbInfo = NULL; int32_t code = 0; char db[TSDB_DB_FNAME_LEN] = {0}; @@ -621,12 +734,14 @@ int32_t catalogGetTableHashVgroup(struct SCatalog *pCatalog, void *pTransporter, CTG_ERR_RET(ctgGetDBVgroup(pCatalog, pTransporter, pMgmtEps, db, false, &dbInfo)); - if (dbInfo.vgVersion < 0 || NULL == dbInfo.vgInfo) { - ctgError("db[%s] vgroup cache invalid, vgroup version:%d, vgInfo:%p", db, dbInfo.vgVersion, dbInfo.vgInfo); - CTG_ERR_RET(TSDB_CODE_TSC_DB_NOT_SELECTED); - } + CTG_ERR_JRET(ctgGetVgInfoFromHashValue(dbInfo, pTableName, pVgroup)); - CTG_ERR_RET(ctgGetVgInfoFromHashValue(&dbInfo, pTableName, pVgroup)); +_return: + + if (dbInfo) { + CTG_UNLOCK(CTG_READ, &dbInfo->lock); + taosHashRelease(pCatalog->dbCache.cache, dbInfo); + } CTG_RET(code); } diff --git a/source/libs/qworker/inc/qworkerInt.h b/source/libs/qworker/inc/qworkerInt.h index 07ca91729d..91927c370a 100644 --- a/source/libs/qworker/inc/qworkerInt.h +++ b/source/libs/qworker/inc/qworkerInt.h @@ -106,11 +106,11 @@ typedef struct SQWorkerMgmt { if (QW_READ == (type)) { \ if ((*(_lock)) < 0) assert(0); \ taosRLockLatch(_lock); \ - qDebug("RLOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \ + qDebug("QW RLOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \ } else { \ if ((*(_lock)) < 0) assert(0); \ taosWLockLatch(_lock); \ - qDebug("WLOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \ + qDebug("QW WLOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \ } \ } while (0) @@ -118,11 +118,11 @@ typedef struct SQWorkerMgmt { if (QW_READ == (type)) { \ if ((*(_lock)) <= 0) assert(0); \ taosRUnLockLatch(_lock); \ - qDebug("RULOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \ + qDebug("QW RULOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \ } else { \ if ((*(_lock)) <= 0) assert(0); \ taosWUnLockLatch(_lock); \ - qDebug("WULOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \ + qDebug("QW WULOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \ } \ } while (0) diff --git a/source/util/CMakeLists.txt b/source/util/CMakeLists.txt index d343945a80..bf1774b45b 100644 --- a/source/util/CMakeLists.txt +++ b/source/util/CMakeLists.txt @@ -12,4 +12,6 @@ target_link_libraries( PUBLIC zlib PUBLIC lz4_static PUBLIC api -) \ No newline at end of file +) + +ADD_SUBDIRECTORY(test) diff --git a/source/util/src/thash.c b/source/util/src/thash.c index cfe14f00e1..840a1ef390 100644 --- a/source/util/src/thash.c +++ b/source/util/src/thash.c @@ -362,7 +362,7 @@ void* taosHashGetCloneExt(SHashObj *pHashObj, const void *key, size_t keyLen, vo return data; } -void* taosHashGetClone(SHashObj *pHashObj, const void *key, size_t keyLen, void* d) { +void* taosHashGetCloneImpl(SHashObj *pHashObj, const void *key, size_t keyLen, void* d, bool acquire) { if (taosHashTableEmpty(pHashObj) || keyLen == 0 || key == NULL) { return NULL; } @@ -404,6 +404,10 @@ void* taosHashGetClone(SHashObj *pHashObj, const void *key, size_t keyLen, void* memcpy(d, GET_HASH_NODE_DATA(pNode), pNode->dataLen); } + if (acquire) { + pNode->count++; + } + data = GET_HASH_NODE_DATA(pNode); } @@ -415,6 +419,15 @@ void* taosHashGetClone(SHashObj *pHashObj, const void *key, size_t keyLen, void* return data; } +void* taosHashGetClone(SHashObj *pHashObj, const void *key, size_t keyLen, void* d) { + return taosHashGetCloneImpl(pHashObj, key, keyLen, d, false); +} + +void* taosHashAcquire(SHashObj *pHashObj, const void *key, size_t keyLen) { + return taosHashGetCloneImpl(pHashObj, key, keyLen, NULL, true); +} + + int32_t taosHashRemove(SHashObj *pHashObj, const void *key, size_t keyLen/*, void *data, size_t dsize*/) { if (pHashObj == NULL || taosHashTableEmpty(pHashObj)) { return -1; @@ -919,3 +932,9 @@ void taosHashCancelIterate(SHashObj *pHashObj, void *p) { __rd_unlock(&pHashObj->lock, pHashObj->type); } + +void taosHashRelease(SHashObj *pHashObj, void *p) { + taosHashCancelIterate(pHashObj, p); +} + + diff --git a/source/util/test/CMakeLists.txt b/source/util/test/CMakeLists.txt index a60c6cff28..79aaa1beb0 100644 --- a/source/util/test/CMakeLists.txt +++ b/source/util/test/CMakeLists.txt @@ -13,17 +13,22 @@ IF (HEADER_GTEST_INCLUDE_DIR AND (LIB_GTEST_STATIC_DIR OR LIB_GTEST_SHARED_DIR)) LIST(REMOVE_ITEM SOURCE_LIST ${CMAKE_CURRENT_SOURCE_DIR}/trefTest.c) ADD_EXECUTABLE(utilTest ${SOURCE_LIST}) - TARGET_LINK_LIBRARIES(utilTest tutil common os gtest pthread gcov) + TARGET_LINK_LIBRARIES(utilTest util common os gtest pthread gcov) + LIST(REMOVE_ITEM SOURCE_LIST ${CMAKE_CURRENT_SOURCE_DIR}/cacheTest.cpp) + LIST(APPEND SOURCE_LIST ${CMAKE_CURRENT_SOURCE_DIR}/hashTest.cpp) + ADD_EXECUTABLE(hashTest ${SOURCE_LIST}) + TARGET_LINK_LIBRARIES(hashTest util common os gtest pthread gcov) + LIST(APPEND BIN_SRC ${CMAKE_CURRENT_SOURCE_DIR}/trefTest.c) ADD_EXECUTABLE(trefTest ${BIN_SRC}) - TARGET_LINK_LIBRARIES(trefTest common tutil) + TARGET_LINK_LIBRARIES(trefTest common util) ENDIF() #IF (TD_LINUX) # ADD_EXECUTABLE(trefTest ./trefTest.c) -# TARGET_LINK_LIBRARIES(trefTest tutil common) +# TARGET_LINK_LIBRARIES(trefTest util common) #ENDIF () INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc) diff --git a/source/util/test/hashTest.cpp b/source/util/test/hashTest.cpp index bc3fed74c4..d31fcfb7ef 100644 --- a/source/util/test/hashTest.cpp +++ b/source/util/test/hashTest.cpp @@ -4,10 +4,15 @@ #include #include -#include "hash.h" +#include "thash.h" #include "taos.h" namespace { + +typedef struct TESTSTRUCT { + char *p; +}TESTSTRUCT; + // the simple test code for basic operations void simpleTest() { SHashObj* hashTable = (SHashObj*) taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK); @@ -141,6 +146,52 @@ void invalidOperationTest() { } +void acquireRleaseTest() { + SHashObj* hashTable = (SHashObj*) taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); + ASSERT_EQ(taosHashGetSize(hashTable), 0); + + int32_t key = 2; + int32_t code = 0; + int32_t num = 0; + TESTSTRUCT data = {0}; + char *str1 = "abcdefg"; + char *str2 = "aaaaaaa"; + char *str3 = "123456789"; + + data.p = (char *)malloc(10); + strcpy(data.p, str1); + + code = taosHashPut(hashTable, &key, sizeof(key), &data, sizeof(data)); + ASSERT_EQ(code, 0); + + TESTSTRUCT* pdata = (TESTSTRUCT*)taosHashAcquire(hashTable, &key, sizeof(key)); + ASSERT_TRUE(pdata != nullptr); + ASSERT_TRUE(strcmp(pdata->p, str1) == 0); + + code = taosHashRemove(hashTable, &key, sizeof(key)); + ASSERT_EQ(code, 0); + ASSERT_TRUE(strcmp(pdata->p, str1) == 0); + + num = taosHashGetSize(hashTable); + ASSERT_EQ(num, 1); + + strcpy(pdata->p, str3); + + data.p = (char *)malloc(10); + strcpy(data.p, str2); + code = taosHashPut(hashTable, &key, sizeof(key), &data, sizeof(data)); + ASSERT_EQ(code, 0); + num = taosHashGetSize(hashTable); + ASSERT_EQ(num, 2); + + printf("%s,expect:%s", pdata->p, str3); + ASSERT_TRUE(strcmp(pdata->p, str3) == 0); + + taosHashRelease(hashTable, pdata); + num = taosHashGetSize(hashTable); + ASSERT_EQ(num, 1); +} + } int main(int argc, char** argv) { @@ -153,4 +204,5 @@ TEST(testCase, hashTest) { stringKeyTest(); noLockPerformanceTest(); multithreadsTest(); + acquireRleaseTest(); }