From 2a450f67ddb36424c68f6553e4d29b5b7631f010 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 20 Dec 2021 19:38:38 +0800 Subject: [PATCH 1/6] [td-11818] remove unused code. --- source/client/src/clientMsgHandler.c | 338 +-------------------------- 1 file changed, 3 insertions(+), 335 deletions(-) diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index f8fd69b89a..247468274e 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -13,11 +13,11 @@ * along with this program. If not, see . */ -#include -#include +#include "os.h" +#include "catalog.h" +#include "tname.h" #include "clientInt.h" #include "clientLog.h" -#include "os.h" #include "tmsgtype.h" #include "trpc.h" @@ -29,16 +29,6 @@ void tscProcessActivityTimer(void *handle, void *tmrId); static int32_t extractSTableQueryVgroupId(STableMetaInfo* pTableMetaInfo); -static int32_t minMsgSize() { return tsRpcHeadSize + 100; } -static int32_t getWaitingTimeInterval(int32_t count) { - int32_t initial = 100; // 100 ms by default - if (count <= 1) { - return 0; - } - - return initial * ((2u)<<(count - 2)); -} - static int32_t vgIdCompare(const void *lhs, const void *rhs) { int32_t left = *(int32_t *)lhs; int32_t right = *(int32_t *)rhs; @@ -298,36 +288,6 @@ void tscProcessActivityTimer(void *handle, void *tmrId) { taosReleaseRef(tscRefId, rid); } -int tscSendMsgToServer(SSqlObj *pSql) { - STscObj* pObj = pSql->pTscObj; - SSqlCmd* pCmd = &pSql->cmd; - - char *pMsg = rpcMallocCont(pCmd->payloadLen); - if (NULL == pMsg) { - tscError("0x%"PRIx64" msg:%s malloc failed", pSql->self, taosMsg[pSql->cmd.msgType]); - return TSDB_CODE_TSC_OUT_OF_MEMORY; - } - - // set the mgmt ip list - if (pSql->cmd.command >= TSDB_SQL_MGMT) { - tscDumpMgmtEpSet(pSql); - } - - memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen); - - SRpcMsg rpcMsg = { - .msgType = pSql->cmd.msgType, - .pCont = pMsg, - .contLen = pSql->cmd.payloadLen, - .ahandle = (void*)pSql->self, - .handle = NULL, - .code = 0 - }; - - rpcSendRequest(pObj->pRpcObj->pDnodeConn, &pSql->epSet, &rpcMsg, &pSql->rpcRid); - return TSDB_CODE_SUCCESS; -} - // handle three situation // 1. epset retry, only return last failure ep // 2. no epset retry, like 'taos -h invalidFqdn', return invalidFqdn @@ -354,176 +314,6 @@ void tscSetFqdnErrorMsg(SSqlObj* pSql, SRpcEpSet* pEpSet) { } } -void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { - TSDB_CACHE_PTR_TYPE handle = (TSDB_CACHE_PTR_TYPE) rpcMsg->ahandle; - SSqlObj* pSql = (SSqlObj*)taosAcquireRef(tscObjRef, handle); - if (pSql == NULL) { - rpcFreeCont(rpcMsg->pCont); - return; - } - - assert(pSql->self == handle); - - STscObj *pObj = pSql->pTscObj; - SSqlRes *pRes = &pSql->res; - SSqlCmd *pCmd = &pSql->cmd; - - pSql->rpcRid = -1; - if (pObj->signature != pObj) { - tscDebug("0x%"PRIx64" DB connection is closed, cmd:%d pObj:%p signature:%p", pSql->self, pCmd->command, pObj, pObj->signature); - - taosRemoveRef(tscObjRef, handle); - taosReleaseRef(tscObjRef, handle); - rpcFreeCont(rpcMsg->pCont); - return; - } - - SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd); - if (pQueryInfo != NULL && pQueryInfo->type == TSDB_QUERY_TYPE_FREE_RESOURCE) { - tscDebug("0x%"PRIx64" sqlObj needs to be released or DB connection is closed, cmd:%d type:%d, pObj:%p signature:%p", - pSql->self, pCmd->command, pQueryInfo->type, pObj, pObj->signature); - - taosRemoveRef(tscObjRef, handle); - taosReleaseRef(tscObjRef, handle); - rpcFreeCont(rpcMsg->pCont); - return; - } - - if (pEpSet) { - if (!tscEpSetIsEqual(&pSql->epSet, pEpSet)) { - if (pCmd->command < TSDB_SQL_MGMT) { - tscUpdateVgroupInfo(pSql, pEpSet); - } else { - tscUpdateMgmtEpSet(pSql, pEpSet); - } - } - } - - int32_t cmd = pCmd->command; - - // set the flag to denote that sql string needs to be re-parsed and build submit block with table schema - if (cmd == TSDB_SQL_INSERT && rpcMsg->code == TSDB_CODE_TDB_TABLE_RECONFIGURE) { - pSql->cmd.insertParam.schemaAttached = 1; - } - - // single table query error need to be handled here. - if ((cmd == TSDB_SQL_SELECT || cmd == TSDB_SQL_UPDATE_TAG_VAL) && - (((rpcMsg->code == TSDB_CODE_TDB_INVALID_TABLE_ID || rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID)) || - rpcMsg->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || rpcMsg->code == TSDB_CODE_APP_NOT_READY)) { - - // 1. super table subquery - // 2. nest queries are all not updated the tablemeta and retry parse the sql after cleanup local tablemeta/vgroup id buffer - if ((TSDB_QUERY_HAS_TYPE(pQueryInfo->type, (TSDB_QUERY_TYPE_STABLE_SUBQUERY | TSDB_QUERY_TYPE_SUBQUERY | - TSDB_QUERY_TYPE_TAG_FILTER_QUERY)) && - !TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_PROJECTION_QUERY)) || - (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_NEST_SUBQUERY)) || (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_STABLE_SUBQUERY) && pQueryInfo->distinct)) { - // do nothing in case of super table subquery - } else { - pSql->retry += 1; - tscWarn("0x%" PRIx64 " it shall renew table meta, code:%s, retry:%d", pSql->self, tstrerror(rpcMsg->code), pSql->retry); - - pSql->res.code = rpcMsg->code; // keep the previous error code - if (pSql->retry > pSql->maxRetry) { - tscError("0x%" PRIx64 " max retry %d reached, give up", pSql->self, pSql->maxRetry); - } else { - // wait for a little bit moment and then retry - // todo do not sleep in rpc callback thread, add this process into queue to process - if (rpcMsg->code == TSDB_CODE_APP_NOT_READY || rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID) { - int32_t duration = getWaitingTimeInterval(pSql->retry); - taosMsleep(duration); - } - - pSql->retryReason = rpcMsg->code; - rpcMsg->code = tscRenewTableMeta(pSql, 0); - // if there is an error occurring, proceed to the following error handling procedure. - if (rpcMsg->code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { - taosReleaseRef(tscObjRef, handle); - rpcFreeCont(rpcMsg->pCont); - return; - } - } - } - } - - pRes->rspLen = 0; - - if (pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED) { - tscDebug("0x%"PRIx64" query is cancelled, code:%s", pSql->self, tstrerror(pRes->code)); - } else { - pRes->code = rpcMsg->code; - } - - if (pRes->code == TSDB_CODE_SUCCESS) { - tscDebug("0x%"PRIx64" reset retry counter to be 0 due to success rsp, old:%d", pSql->self, pSql->retry); - pSql->retry = 0; - } - - if (pRes->code != TSDB_CODE_TSC_QUERY_CANCELLED) { - assert(rpcMsg->msgType == pCmd->msgType + 1); - pRes->code = rpcMsg->code; - pRes->rspType = rpcMsg->msgType; - pRes->rspLen = rpcMsg->contLen; - - if (pRes->rspLen > 0 && rpcMsg->pCont) { - char *tmp = (char *)realloc(pRes->pRsp, pRes->rspLen); - if (tmp == NULL) { - pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; - } else { - pRes->pRsp = tmp; - memcpy(pRes->pRsp, rpcMsg->pCont, pRes->rspLen); - } - } else { - tfree(pRes->pRsp); - } - - /* - * There is not response callback function for submit response. - * The actual inserted number of points is the first number. - */ - if (rpcMsg->msgType == TSDB_MSG_TYPE_SUBMIT_RSP && pRes->pRsp != NULL) { - SShellSubmitRspMsg *pMsg = (SShellSubmitRspMsg*)pRes->pRsp; - pMsg->code = htonl(pMsg->code); - pMsg->numOfRows = htonl(pMsg->numOfRows); - pMsg->affectedRows = htonl(pMsg->affectedRows); - pMsg->failedRows = htonl(pMsg->failedRows); - pMsg->numOfFailedBlocks = htonl(pMsg->numOfFailedBlocks); - - pRes->numOfRows += pMsg->affectedRows; - tscDebug("0x%"PRIx64" SQL cmd:%s, code:%s inserted rows:%d rspLen:%d", pSql->self, sqlCmd[pCmd->command], - tstrerror(pRes->code), pMsg->affectedRows, pRes->rspLen); - } else { - tscDebug("0x%"PRIx64" SQL cmd:%s, code:%s rspLen:%d", pSql->self, sqlCmd[pCmd->command], tstrerror(pRes->code), pRes->rspLen); - } - } - - if (pRes->code == TSDB_CODE_SUCCESS && tscProcessMsgRsp[pCmd->command]) { - rpcMsg->code = (*tscProcessMsgRsp[pCmd->command])(pSql); - } - - bool shouldFree = tscShouldBeFreed(pSql); - if (rpcMsg->code != TSDB_CODE_TSC_ACTION_IN_PROGRESS) { - if (rpcMsg->code != TSDB_CODE_SUCCESS) { - pRes->code = rpcMsg->code; - } - - rpcMsg->code = (pRes->code == TSDB_CODE_SUCCESS) ? (int32_t)pRes->numOfRows : pRes->code; - if (rpcMsg->code == TSDB_CODE_RPC_FQDN_ERROR) { - tscAllocPayload(pCmd, TSDB_FQDN_LEN + 64); - tscSetFqdnErrorMsg(pSql, pEpSet); - } - - (*pSql->fp)(pSql->param, pSql, rpcMsg->code); - } - - if (shouldFree) { // in case of table-meta/vgrouplist query, automatically free it - tscDebug("0x%"PRIx64" sqlObj is automatically freed", pSql->self); - taosRemoveRef(tscObjRef, handle); - } - - taosReleaseRef(tscObjRef, handle); - rpcFreeCont(rpcMsg->pCont); -} - int doBuildAndSendMsg(SSqlObj *pSql) { SSqlCmd *pCmd = &pSql->cmd; SSqlRes *pRes = &pSql->res; @@ -2987,51 +2777,6 @@ int32_t tscGetUdfFromNode(SSqlObj *pSql, SQueryInfo* pQueryInfo) { return code; } -static void freeElem(void* p) { - tfree(*(char**)p); -} - -/** - * retrieve table meta from mnode, and then update the local table meta hashmap. - * @param pSql sql object - * @param tableIndex table index - * @return status code - */ -int tscRenewTableMeta(SSqlObj *pSql, int32_t tableIndex) { - SSqlCmd* pCmd = &pSql->cmd; - - SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd); - STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, tableIndex); - - char name[TSDB_TABLE_FNAME_LEN] = {0}; - int32_t code = tNameExtractFullName(&pTableMetaInfo->name, name); - if (code != TSDB_CODE_SUCCESS) { - tscError("0x%"PRIx64" failed to generate the table full name", pSql->self); - return TSDB_CODE_TSC_INVALID_OPERATION; - } - - STableMeta* pTableMeta = pTableMetaInfo->pTableMeta; - if (pTableMeta) { - tscDebug("0x%"PRIx64" update table meta:%s, old meta numOfTags:%d, numOfCols:%d, uid:%" PRIu64, pSql->self, name, - tscGetNumOfTags(pTableMeta), tscGetNumOfColumns(pTableMeta), pTableMeta->id.uid); - } - - - // remove stored tableMeta info in hash table - tscResetSqlCmd(pCmd, true, pSql->self); - - SArray* pNameList = taosArrayInit(1, POINTER_BYTES); - SArray* vgroupList = taosArrayInit(1, POINTER_BYTES); - - char* n = strdup(name); - taosArrayPush(pNameList, &n); - code = getMultiTableMetaFromMnode(pSql, pNameList, vgroupList, NULL, tscTableMetaCallBack, true); - taosArrayDestroyEx(pNameList, freeElem); - taosArrayDestroyEx(vgroupList, freeElem); - - return code; -} - static bool allVgroupInfoRetrieved(SQueryInfo* pQueryInfo) { for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) { STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, i); @@ -3044,53 +2789,6 @@ static bool allVgroupInfoRetrieved(SQueryInfo* pQueryInfo) { return true; } -int tscGetSTableVgroupInfo(SSqlObj *pSql, SQueryInfo* pQueryInfo) { - int32_t code = TSDB_CODE_RPC_NETWORK_UNAVAIL; - if (allVgroupInfoRetrieved(pQueryInfo)) { - return TSDB_CODE_SUCCESS; - } - SSqlObj *pNew = calloc(1, sizeof(SSqlObj)); - pNew->pTscObj = pSql->pTscObj; - pNew->signature = pNew; - - pNew->cmd.command = TSDB_SQL_STABLEVGROUP; - - // TODO TEST IT - SQueryInfo *pNewQueryInfo = tscGetQueryInfoS(&pNew->cmd); - if (pNewQueryInfo == NULL) { - tscFreeSqlObj(pNew); - return code; - } - - for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) { - STableMetaInfo *pMInfo = tscGetMetaInfo(pQueryInfo, i); - STableMeta* pTableMeta = tscTableMetaDup(pMInfo->pTableMeta); - tscAddTableMetaInfo(pNewQueryInfo, &pMInfo->name, pTableMeta, NULL, pMInfo->tagColList, pMInfo->pVgroupTables); - } - - if ((code = tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE)) != TSDB_CODE_SUCCESS) { - tscFreeSqlObj(pNew); - return code; - } - - pNewQueryInfo->numOfTables = pQueryInfo->numOfTables; - registerSqlObj(pNew); - - tscDebug("0x%"PRIx64" svgroupRid from %" PRId64 " to %" PRId64 , pSql->self, pSql->svgroupRid, pNew->self); - - pSql->svgroupRid = pNew->self; - tscDebug("0x%"PRIx64" new sqlObj:%p to get vgroupInfo, numOfTables:%d", pSql->self, pNew, pNewQueryInfo->numOfTables); - - pNew->fp = tscTableMetaCallBack; - pNew->param = (void *)pSql->self; - code = tscBuildAndSendRequest(pNew, NULL); - if (code == TSDB_CODE_SUCCESS) { - code = TSDB_CODE_TSC_ACTION_IN_PROGRESS; - } - - return code; -} - #endif int32_t buildConnectMsg(SRequestObj *pRequest, SRequestMsgBody* pMsgBody) { @@ -3205,36 +2903,6 @@ int32_t doBuildMsgSupp(SRequestObj *pRequest, SRequestMsgBody* pMsgBody) { } } -STableMeta* createTableMetaFromMsg(STableMetaMsg* pTableMetaMsg) { - assert(pTableMetaMsg != NULL && pTableMetaMsg->numOfColumns >= 2); - - size_t schemaSize = (pTableMetaMsg->numOfColumns + pTableMetaMsg->numOfTags) * sizeof(SSchema); - STableMeta* pTableMeta = calloc(1, sizeof(STableMeta) + schemaSize); - - pTableMeta->tableType = pTableMetaMsg->tableType; - pTableMeta->vgId = pTableMetaMsg->vgId; - pTableMeta->suid = pTableMetaMsg->suid; - pTableMeta->uid = pTableMetaMsg->tuid; - - pTableMeta->tableInfo = (STableComInfo) { - .numOfTags = pTableMetaMsg->numOfTags, - .precision = pTableMetaMsg->precision, - .numOfColumns = pTableMetaMsg->numOfColumns, - }; - - pTableMeta->sversion = pTableMetaMsg->sversion; - pTableMeta->tversion = pTableMetaMsg->tversion; - - memcpy(pTableMeta->schema, pTableMetaMsg->pSchema, schemaSize); - - int32_t numOfTotalCols = pTableMeta->tableInfo.numOfColumns; - for(int32_t i = 0; i < numOfTotalCols; ++i) { - pTableMeta->tableInfo.rowSize += pTableMeta->schema[i].bytes; - } - - return pTableMeta; -} - int32_t processShowRsp(SRequestObj *pRequest, const char* pMsg, int32_t msgLen) { SShowRsp* pShow = (SShowRsp *)pMsg; pShow->showId = htonl(pShow->showId); From 3949f88b085f1f8be65a35fe94050533da8f13bb Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 20 Dec 2021 20:30:55 +0800 Subject: [PATCH 2/6] minor changes --- include/util/taoserror.h | 4 ++++ source/dnode/mnode/impl/src/mndMnode.c | 8 ++++++-- source/dnode/mnode/impl/src/mndStb.c | 14 +++++++++----- source/dnode/mnode/impl/src/mndTrans.c | 8 ++++++-- source/dnode/mnode/impl/src/mndUser.c | 8 ++++++-- source/util/src/terror.c | 4 ++++ 6 files changed, 35 insertions(+), 11 deletions(-) diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 95824df5ac..7e8df3add2 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -230,6 +230,10 @@ int32_t* taosGetErrno(); #define TSDB_CODE_MND_INVALID_FUNC_CODE TAOS_DEF_ERROR_CODE(0, 0x03C5) #define TSDB_CODE_MND_INVALID_FUNC_BUFSIZE TAOS_DEF_ERROR_CODE(0, 0x03C6) +// mnode-trans +#define TSDB_CODE_MND_TRANS_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03D0) +#define TSDB_CODE_MND_TRANS_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03D1) + // dnode #define TSDB_CODE_DND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0400) #define TSDB_CODE_DND_EXITING TAOS_DEF_ERROR_CODE(0, 0x0401) diff --git a/source/dnode/mnode/impl/src/mndMnode.c b/source/dnode/mnode/impl/src/mndMnode.c index 05c02ceac6..9331a0696a 100644 --- a/source/dnode/mnode/impl/src/mndMnode.c +++ b/source/dnode/mnode/impl/src/mndMnode.c @@ -61,8 +61,12 @@ int32_t mndInitMnode(SMnode *pMnode) { void mndCleanupMnode(SMnode *pMnode) {} static SMnodeObj *mndAcquireMnode(SMnode *pMnode, int32_t mnodeId) { - SSdb *pSdb = pMnode->pSdb; - return sdbAcquire(pSdb, SDB_MNODE, &mnodeId); + SSdb *pSdb = pMnode->pSdb; + SMnodeObj *pObj = sdbAcquire(pSdb, SDB_MNODE, &mnodeId); + if (pObj == NULL) { + terrno = TSDB_CODE_MND_MNODE_NOT_EXIST; + } + return pObj; } static void mndReleaseMnode(SMnode *pMnode, SMnodeObj *pMnodeObj) { diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 822036b599..61a6cf7833 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -23,7 +23,7 @@ #include "mndUser.h" #include "tname.h" -#define TSDB_STB_VER_NUM 1 +#define TSDB_STB_VER_NUMBER 1 #define TSDB_STB_RESERVE_SIZE 64 static SSdbRaw *mndStbActionEncode(SStbObj *pStb); @@ -70,7 +70,7 @@ void mndCleanupStb(SMnode *pMnode) {} static SSdbRaw *mndStbActionEncode(SStbObj *pStb) { int32_t size = sizeof(SStbObj) + (pStb->numOfColumns + pStb->numOfTags) * sizeof(SSchema) + TSDB_STB_RESERVE_SIZE; - SSdbRaw *pRaw = sdbAllocRaw(SDB_STB, TSDB_STB_VER_NUM, size); + SSdbRaw *pRaw = sdbAllocRaw(SDB_STB, TSDB_STB_VER_NUMBER, size); if (pRaw == NULL) return NULL; int32_t dataPos = 0; @@ -103,7 +103,7 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) { int8_t sver = 0; if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL; - if (sver != TSDB_STB_VER_NUM) { + if (sver != TSDB_STB_VER_NUMBER) { mError("failed to decode stable since %s", terrstr()); terrno = TSDB_CODE_SDB_INVALID_DATA_VER; return NULL; @@ -176,8 +176,12 @@ static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOldStb, SStbObj *pNewStb } SStbObj *mndAcquireStb(SMnode *pMnode, char *stbName) { - SSdb *pSdb = pMnode->pSdb; - return sdbAcquire(pSdb, SDB_STB, stbName); + SSdb *pSdb = pMnode->pSdb; + SStbObj *pStb = sdbAcquire(pSdb, SDB_STB, stbName); + if (pStb == NULL) { + terrno = TSDB_CODE_MND_STB_NOT_EXIST; + } + return pStb; } void mndReleaseStb(SMnode *pMnode, SStbObj *pStb) { diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index c31a8c5b7d..b84ee5ec77 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -317,8 +317,12 @@ static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *pOldTrans, STrans *pNewT } STrans *mndAcquireTrans(SMnode *pMnode, int32_t transId) { - SSdb *pSdb = pMnode->pSdb; - return sdbAcquire(pSdb, SDB_TRANS, &transId); + SSdb *pSdb = pMnode->pSdb; + STrans *pTrans = sdbAcquire(pSdb, SDB_TRANS, &transId); + if (pTrans == NULL) { + terrno = TSDB_CODE_MND_TRANS_NOT_EXIST; + } + return pTrans; } void mndReleaseTrans(SMnode *pMnode, STrans *pTrans) { diff --git a/source/dnode/mnode/impl/src/mndUser.c b/source/dnode/mnode/impl/src/mndUser.c index eabf2df958..c96e6a80db 100644 --- a/source/dnode/mnode/impl/src/mndUser.c +++ b/source/dnode/mnode/impl/src/mndUser.c @@ -175,8 +175,12 @@ static int32_t mndUserActionUpdate(SSdb *pSdb, SUserObj *pOldUser, SUserObj *pNe } SUserObj *mndAcquireUser(SMnode *pMnode, char *userName) { - SSdb *pSdb = pMnode->pSdb; - return sdbAcquire(pSdb, SDB_USER, userName); + SSdb *pSdb = pMnode->pSdb; + SUserObj *pUser = sdbAcquire(pSdb, SDB_USER, userName); + if (pUser == NULL) { + terrno = TSDB_CODE_MND_DB_NOT_EXIST; + } + return pUser; } void mndReleaseUser(SMnode *pMnode, SUserObj *pUser) { diff --git a/source/util/src/terror.c b/source/util/src/terror.c index ffc59c5d69..43ac760643 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -240,6 +240,10 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_FUNC_COMMENT, "Invalid func comment" TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_FUNC_CODE, "Invalid func code") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_FUNC_BUFSIZE, "Invalid func bufSize") +// mnode-trans +TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_ALREADY_EXIST, "Transaction already exists") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_NOT_EXIST, "Transaction not exists") + // dnode TAOS_DEFINE_ERROR(TSDB_CODE_DND_ACTION_IN_PROGRESS, "Action in progress") TAOS_DEFINE_ERROR(TSDB_CODE_DND_EXITING, "Dnode is exiting") From 3489d290fb5848a195837b54df6ef4e030af437a Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 20 Dec 2021 22:43:05 +0800 Subject: [PATCH 3/6] update index TFile write --- source/libs/index/inc/index_tfile.h | 6 +-- source/libs/index/src/index.c | 10 +++-- source/libs/index/src/index_cache.c | 2 +- source/libs/index/src/index_tfile.c | 55 ++++++++++++++++++++++++++-- source/libs/index/test/indexTests.cc | 9 ++++- 5 files changed, 70 insertions(+), 12 deletions(-) diff --git a/source/libs/index/inc/index_tfile.h b/source/libs/index/inc/index_tfile.h index 979c0b0639..e6cb3a75c4 100644 --- a/source/libs/index/inc/index_tfile.h +++ b/source/libs/index/inc/index_tfile.h @@ -59,7 +59,7 @@ typedef struct TFileReader { typedef struct IndexTFile { char *path; - TFileReader *tb; + TFileCache *cache; TFileWriter *tw; } IndexTFile; @@ -79,14 +79,14 @@ typedef struct TFileReaderOpt { } TFileReaderOpt; // tfile cache -TFileCache *tfileCacheCreate(); +TFileCache *tfileCacheCreate(const char *path); void tfileCacheDestroy(TFileCache *tcache); TFileReader* tfileCacheGet(TFileCache *tcache, TFileCacheKey *key); void tfileCachePut(TFileCache *tcache, TFileCacheKey *key, TFileReader *reader); TFileWriter *tfileWriterCreate(const char *suid, const char *colName); -IndexTFile *indexTFileCreate(); +IndexTFile *indexTFileCreate(const char *path); int indexTFilePut(void *tfile, SIndexTerm *term, uint64_t uid); diff --git a/source/libs/index/src/index.c b/source/libs/index/src/index.c index ec83e84a3b..84b49493a2 100644 --- a/source/libs/index/src/index.c +++ b/source/libs/index/src/index.c @@ -333,13 +333,17 @@ static int indexMergeFinalResults(SArray *interResults, EIndexOperatorType oType //refactor, merge interResults into fResults by oType SArray *first = taosArrayGetP(interResults, 0); taosArraySort(first, uidCompare); + taosArrayRemoveDuplicate(first, uidCompare, NULL); if (oType == MUST) { - + // just one column index, enhance later + taosArrayAddAll(fResults, first); } else if (oType == SHOULD) { - + // just one column index, enhance later + taosArrayAddAll(fResults, first); // tag1 condistion || tag2 condition } else if (oType == NOT) { - + // just one column index, enhance later + taosArrayAddAll(fResults, first); // not use currently } return 0; diff --git a/source/libs/index/src/index_cache.c b/source/libs/index/src/index_cache.c index 5813c99164..ea185fefe5 100644 --- a/source/libs/index/src/index_cache.c +++ b/source/libs/index/src/index_cache.c @@ -147,7 +147,7 @@ int indexCacheSearch(void *cache, SIndexTermQuery *query, int16_t colId, int32_t char *buf = calloc(1, keyLen); if (qtype == QUERY_TERM) { - + } else if (qtype == QUERY_PREFIX) { } else if (qtype == QUERY_SUFFIX) { diff --git a/source/libs/index/src/index_tfile.c b/source/libs/index/src/index_tfile.c index 81a7f9f443..40936e1052 100644 --- a/source/libs/index/src/index_tfile.c +++ b/source/libs/index/src/index_tfile.c @@ -13,12 +13,39 @@ * along with this program. If not, see . */ +#include +#include #include "index_tfile.h" #include "index_fst.h" #include "index_util.h" +// tfile name suid-colId-version.tindex +static int tfileGetFileList(const char *path, SArray *result) { + DIR *dir = opendir(path); + if (NULL == dir) { return -1; } + struct dirent *entry; + while ((entry = readdir(dir)) != NULL) { + size_t len = strlen(entry->d_name); + char *buf = calloc(1, len + 1); + memcpy(buf, entry->d_name, len); + taosArrayPush(result, &buf); + } + closedir(dir); + return 0; +} +static int tfileCompare(const void *a, const void *b) { + const char *aName = *(char **)a; + const char *bName = *(char **)b; + size_t aLen = strlen(aName); + size_t bLen = strlen(bName); + return strncmp(aName, bName, aLen > bLen ? aLen : bLen); +} +static int tfileParseFileName(const char *filename, uint64_t *suid) { + + return 0; +} static void tfileSerialCacheKey(TFileCacheKey *key, char *buf) { SERIALIZE_MEM_TO_BUF(buf, key, suid); SERIALIZE_VAR_TO_BUF(buf, '_', char); @@ -29,12 +56,22 @@ static void tfileSerialCacheKey(TFileCacheKey *key, char *buf) { SERIALIZE_STR_MEM_TO_BUF(buf, key, colName, key->nColName); } -TFileCache *tfileCacheCreate() { +TFileCache *tfileCacheCreate(const char *path) { TFileCache *tcache = calloc(1, sizeof(TFileCache)); if (tcache == NULL) { return NULL; } tcache->tableCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); tcache->capacity = 64; + + SArray *files = taosArrayInit(4, sizeof(void *)); + tfileGetFileList(path, files); + taosArraySort(files, tfileCompare); + for (size_t i = 0; i < taosArrayGetSize(files); i++) { + char *file = taosArrayGetP(files, i); + free((void *)file); + } + taosArrayDestroy(files); + return tcache; } void tfileCacheDestroy(TFileCache *tcache) { @@ -59,8 +96,11 @@ void tfileCachePut(TFileCache *tcache, TFileCacheKey *key, TFileReader *reader) -IndexTFile *indexTFileCreate() { +IndexTFile *indexTFileCreate(const char *path) { IndexTFile *tfile = calloc(1, sizeof(IndexTFile)); + tfile->cache = tfileCacheCreate(path); + + return tfile; } void IndexTFileDestroy(IndexTFile *tfile) { @@ -69,8 +109,15 @@ void IndexTFileDestroy(IndexTFile *tfile) { int indexTFileSearch(void *tfile, SIndexTermQuery *query, SArray *result) { - IndexTFile *ptfile = (IndexTFile *)tfile; - + IndexTFile *pTfile = (IndexTFile *)tfile; + + SIndexTerm *term = query->term; + TFileCacheKey key = {.suid = term->suid, + .colType = term->colType, + .version = 0, + .colName = term->colName, + .nColName= term->nColName}; + TFileReader *reader = tfileCacheGet(pTfile->cache, &key); return 0; } int indexTFilePut(void *tfile, SIndexTerm *term, uint64_t uid) { diff --git a/source/libs/index/test/indexTests.cc b/source/libs/index/test/indexTests.cc index 9baabb9610..85d76bb716 100644 --- a/source/libs/index/test/indexTests.cc +++ b/source/libs/index/test/indexTests.cc @@ -26,6 +26,7 @@ class FstWriter { public: FstWriter() { + _wc = writerCtxCreate(TFile, "/tmp/tindex", false, 0); _b = fstBuilderCreate(NULL, 0); } bool Put(const std::string &key, uint64_t val) { @@ -37,15 +38,19 @@ class FstWriter { ~FstWriter() { fstBuilderFinish(_b); fstBuilderDestroy(_b); + + writerCtxDestroy(_wc); } private: FstBuilder *_b; + WriterCtx *_wc; }; class FstReadMemory { public: FstReadMemory(size_t size) { - _w = fstCountingWriterCreate(NULL); + _wc = writerCtxCreate(TFile, "/tmp/tindex", true, 0); + _w = fstCountingWriterCreate(_wc); _size = size; memset((void *)&_s, 0, sizeof(_s)); } @@ -94,12 +99,14 @@ class FstReadMemory { fstCountingWriterDestroy(_w); fstDestroy(_fst); fstSliceDestroy(&_s); + writerCtxDestroy(_wc); } private: FstCountingWriter *_w; Fst *_fst; FstSlice _s; + WriterCtx *_wc; size_t _size; }; From 4f52726463454dbfa7d3aa6a38f7345501ac2186 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 20 Dec 2021 23:12:51 +0800 Subject: [PATCH 4/6] update index TFile write --- source/libs/index/src/index_tfile.c | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/source/libs/index/src/index_tfile.c b/source/libs/index/src/index_tfile.c index 40936e1052..9a5b3251b1 100644 --- a/source/libs/index/src/index_tfile.c +++ b/source/libs/index/src/index_tfile.c @@ -42,9 +42,12 @@ static int tfileCompare(const void *a, const void *b) { size_t bLen = strlen(bName); return strncmp(aName, bName, aLen > bLen ? aLen : bLen); } -static int tfileParseFileName(const char *filename, uint64_t *suid) { - - return 0; +static int tfileParseFileName(const char *filename, uint64_t *suid, int *colId, int *version) { + if (3 == sscanf(filename, "%" PRIu64 "-%d-%d.tindex", suid, colId, version)) { + // read suid & colid & version success + return 0; + } + return -1; } static void tfileSerialCacheKey(TFileCacheKey *key, char *buf) { SERIALIZE_MEM_TO_BUF(buf, key, suid); @@ -68,6 +71,12 @@ TFileCache *tfileCacheCreate(const char *path) { taosArraySort(files, tfileCompare); for (size_t i = 0; i < taosArrayGetSize(files); i++) { char *file = taosArrayGetP(files, i); + uint64_t suid; + int colId, version; + if (0 != tfileParseFileName(file, &suid, &colId, &version)) { + // invalid file, just skip + continue; + } free((void *)file); } taosArrayDestroy(files); From 97128cbf1fb903ac136ac047016aea36d9bae200 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 21 Dec 2021 10:11:30 +0800 Subject: [PATCH 5/6] [td-11818] support create account and drop account. --- source/client/inc/clientInt.h | 47 +++++++++++++++------------- source/client/src/clientImpl.c | 21 +++++++------ source/client/src/clientMain.c | 11 ++----- source/client/src/clientMsgHandler.c | 31 +++++++----------- source/client/src/tscEnv.c | 7 ++--- source/libs/parser/inc/astToMsg.h | 1 + source/libs/parser/src/astToMsg.c | 44 ++++++++++++++++++++++++++ source/libs/parser/src/astValidate.c | 36 +++++++++++++++++++++ 8 files changed, 135 insertions(+), 63 deletions(-) diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index 4e7fff06b6..a1a9155d16 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -86,26 +86,32 @@ typedef struct STscObj { SAppInstInfo *pAppInfo; } STscObj; -typedef struct SClientResultInfo { - const char *pMsg; +typedef struct SReqResultInfo { + const char *pRspMsg; const char *pData; TAOS_FIELD *fields; - int32_t numOfCols; - int32_t numOfRows; - int32_t current; + uint32_t numOfCols; + int32_t *length; TAOS_ROW row; char **pCol; -} SClientResultInfo; -typedef struct SReqBody { - tsem_t rspSem; // not used now - void* fp; - void* param; - int32_t paramLen; - int64_t execId; // showId/queryId - SClientResultInfo* pResInfo; -} SRequestBody; + uint32_t numOfRows; + uint32_t current; +} SReqResultInfo; + +typedef struct SReqMsg { + void *pMsg; + uint32_t len; +} SReqMsgInfo; + +typedef struct SRequestSendRecvBody { + tsem_t rspSem; // not used now + void* fp; + int64_t execId; // showId/queryId + SReqMsgInfo requestMsg; + SReqResultInfo resInfo; +} SRequestSendRecvBody; #define ERROR_MSG_BUF_DEFAULT_SIZE 512 @@ -115,7 +121,7 @@ typedef struct SRequestObj { STscObj *pTscObj; SQueryExecMetric metric; char *sqlstr; // sql string - SRequestBody body; + SRequestSendRecvBody body; int64_t self; char *msgBuf; int32_t code; @@ -123,11 +129,10 @@ typedef struct SRequestObj { } SRequestObj; typedef struct SRequestMsgBody { - int32_t msgType; - void *pData; - int32_t msgLen; - uint64_t requestId; - uint64_t requestObjRefId; + int32_t msgType; + SReqMsgInfo msgInfo; + uint64_t requestId; + uint64_t requestObjRefId; } SRequestMsgBody; extern SAppInfo appInfo; @@ -158,7 +163,7 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen); void* doFetchRow(SRequestObj* pRequest); -void setResultDataPtr(SClientResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows); +void setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows); #ifdef __cplusplus } diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 9532789ca0..bab1f9cc9b 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -155,8 +155,7 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) { code = qParseQuerySql(pRequest->sqlstr, sqlLen, pRequest->requestId, &type, &output, &outputLen, pRequest->msgBuf, ERROR_MSG_BUF_DEFAULT_SIZE); if (type == TSDB_SQL_CREATE_USER || type == TSDB_SQL_SHOW || type == TSDB_SQL_DROP_USER || type == TSDB_SQL_DROP_ACCT || type == TSDB_SQL_CREATE_DB || type == TSDB_SQL_CREATE_ACCT) { pRequest->type = type; - pRequest->body.param = output; - pRequest->body.paramLen = outputLen; + pRequest->body.requestMsg = (SReqMsgInfo){.pMsg = output, .len = outputLen}; SRequestMsgBody body = {0}; buildRequestMsgFp[type](pRequest, &body); @@ -165,6 +164,8 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) { sendMsgToServer(pTscObj->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &body, &transporterId); tsem_wait(&pRequest->body.rspSem); + + destroyRequestMsgBody(&body); } else { assert(0); @@ -255,7 +256,7 @@ STscObj* taosConnectImpl(const char *ip, const char *user, const char *auth, con static int32_t buildConnectMsg(SRequestObj *pRequest, SRequestMsgBody* pMsgBody) { pMsgBody->msgType = TSDB_MSG_TYPE_CONNECT; - pMsgBody->msgLen = sizeof(SConnectMsg); + pMsgBody->msgInfo.len = sizeof(SConnectMsg); pMsgBody->requestObjRefId = pRequest->self; SConnectMsg *pConnect = calloc(1, sizeof(SConnectMsg)); @@ -279,28 +280,28 @@ static int32_t buildConnectMsg(SRequestObj *pRequest, SRequestMsgBody* pMsgBody) pConnect->startTime = htobe64(appInfo.startTime); tstrncpy(pConnect->app, appInfo.appName, tListLen(pConnect->app)); - pMsgBody->pData = pConnect; + pMsgBody->msgInfo.pMsg = pConnect; return 0; } static void destroyRequestMsgBody(SRequestMsgBody* pMsgBody) { assert(pMsgBody != NULL); - tfree(pMsgBody->pData); + tfree(pMsgBody->msgInfo.pMsg); } int32_t sendMsgToServer(void *pTransporter, SEpSet* epSet, const SRequestMsgBody *pBody, int64_t* pTransporterId) { - char *pMsg = rpcMallocCont(pBody->msgLen); + char *pMsg = rpcMallocCont(pBody->msgInfo.len); if (NULL == pMsg) { tscError("0x%"PRIx64" msg:%s malloc failed", pBody->requestId, taosMsg[pBody->msgType]); terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; return -1; } - memcpy(pMsg, pBody->pData, pBody->msgLen); + memcpy(pMsg, pBody->msgInfo.pMsg, pBody->msgInfo.len); SRpcMsg rpcMsg = { .msgType = pBody->msgType, .pCont = pMsg, - .contLen = pBody->msgLen, + .contLen = pBody->msgInfo.len, .ahandle = (void*) pBody->requestObjRefId, .handle = NULL, .code = 0 @@ -388,7 +389,7 @@ TAOS *taos_connect_l(const char *ip, int ipLen, const char *user, int userLen, c void* doFetchRow(SRequestObj* pRequest) { assert(pRequest != NULL); - SClientResultInfo* pResultInfo = pRequest->body.pResInfo; + SReqResultInfo* pResultInfo = &pRequest->body.resInfo; if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) { pRequest->type = TSDB_SQL_RETRIEVE_MNODE; @@ -421,7 +422,7 @@ void* doFetchRow(SRequestObj* pRequest) { return pResultInfo->row; } -void setResultDataPtr(SClientResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows) { +void setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows) { assert(numOfCols > 0 && pFields != NULL && pResultInfo != NULL); if (numOfRows == 0) { return; diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 21e632db8d..8a75799ed7 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -115,12 +115,7 @@ int taos_field_count(TAOS_RES *res) { } SRequestObj* pRequest = (SRequestObj*) res; - - SClientResultInfo* pResInfo = pRequest->body.pResInfo; - if (pResInfo == NULL) { - return 0; - } - + SReqResultInfo* pResInfo = &pRequest->body.resInfo; return pResInfo->numOfCols; } @@ -133,7 +128,7 @@ TAOS_FIELD *taos_fetch_fields(TAOS_RES *res) { return NULL; } - SClientResultInfo* pResInfo = ((SRequestObj*) res)->body.pResInfo; + SReqResultInfo* pResInfo = &(((SRequestObj*) res)->body.resInfo); return pResInfo->fields; } @@ -248,7 +243,7 @@ int* taos_fetch_lengths(TAOS_RES *res) { return NULL; } - return ((SRequestObj*) res)->body.pResInfo->length; + return ((SRequestObj*) res)->body.resInfo.length; } const char *taos_data_type(int type) { diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index 247468274e..e70dcc63b3 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -2793,7 +2793,7 @@ static bool allVgroupInfoRetrieved(SQueryInfo* pQueryInfo) { int32_t buildConnectMsg(SRequestObj *pRequest, SRequestMsgBody* pMsgBody) { pMsgBody->msgType = TSDB_MSG_TYPE_CONNECT; - pMsgBody->msgLen = sizeof(SConnectMsg); + pMsgBody->msgInfo.len = sizeof(SConnectMsg); pMsgBody->requestObjRefId = pRequest->self; SConnectMsg *pConnect = calloc(1, sizeof(SConnectMsg)); @@ -2817,7 +2817,7 @@ int32_t buildConnectMsg(SRequestObj *pRequest, SRequestMsgBody* pMsgBody) { pConnect->startTime = htobe64(appInfo.startTime); tstrncpy(pConnect->app, appInfo.appName, tListLen(pConnect->app)); - pMsgBody->pData = pConnect; + pMsgBody->msgInfo.pMsg = pConnect; return 0; } @@ -2858,17 +2858,14 @@ int processConnectRsp(SRequestObj *pRequest, const char* pMsg, int32_t msgLen) { pTscObj->pAppInfo->clusterId = pConnect->clusterId; atomic_add_fetch_64(&pTscObj->pAppInfo->numOfConns, 1); - pRequest->body.pResInfo = calloc(1, sizeof(SClientResultInfo)); - pRequest->body.pResInfo->pMsg = pMsg; - + pRequest->body.resInfo.pRspMsg = pMsg; tscDebug("0x%" PRIx64 " clusterId:%d, totalConn:%"PRId64, pRequest->requestId, pConnect->clusterId, pTscObj->pAppInfo->numOfConns); return 0; } int32_t doBuildMsgSupp(SRequestObj *pRequest, SRequestMsgBody* pMsgBody) { pMsgBody->requestObjRefId = pRequest->self; - pMsgBody->msgLen = pRequest->body.paramLen; - pMsgBody->pData = pRequest->body.param; + pMsgBody->msgInfo = pRequest->body.requestMsg; switch(pRequest->type) { case TSDB_SQL_CREATE_USER: @@ -2886,7 +2883,7 @@ int32_t doBuildMsgSupp(SRequestObj *pRequest, SRequestMsgBody* pMsgBody) { case TSDB_SQL_CREATE_DB: { pMsgBody->msgType = TSDB_MSG_TYPE_CREATE_DB; - SCreateDbMsg* pCreateMsg = pRequest->body.param; + SCreateDbMsg* pCreateMsg = pRequest->body.requestMsg.pMsg; SName name = {0}; int32_t ret = tNameSetDbName(&name, pRequest->pTscObj->acctId, pCreateMsg->db, strnlen(pCreateMsg->db, tListLen(pCreateMsg->db))); if (ret != TSDB_CODE_SUCCESS) { @@ -2925,12 +2922,8 @@ int32_t processShowRsp(SRequestObj *pRequest, const char* pMsg, int32_t msgLen) pFields[i].bytes = pSchema[i].bytes; } - if (pRequest->body.pResInfo == NULL) { - pRequest->body.pResInfo = calloc(1, sizeof(SClientResultInfo)); - } - - pRequest->body.pResInfo->pMsg = pMsg; - SClientResultInfo* pResInfo = pRequest->body.pResInfo; + pRequest->body.resInfo.pRspMsg = pMsg; + SReqResultInfo* pResInfo = &pRequest->body.resInfo; pResInfo->fields = pFields; pResInfo->numOfCols = pMetaMsg->numOfColumns; @@ -2944,27 +2937,27 @@ int32_t processShowRsp(SRequestObj *pRequest, const char* pMsg, int32_t msgLen) int buildRetrieveMnodeMsg(SRequestObj *pRequest, SRequestMsgBody* pMsgBody) { pMsgBody->msgType = TSDB_MSG_TYPE_SHOW_RETRIEVE; - pMsgBody->msgLen = sizeof(SRetrieveTableMsg); + pMsgBody->msgInfo.len = sizeof(SRetrieveTableMsg); pMsgBody->requestObjRefId = pRequest->self; SRetrieveTableMsg *pRetrieveMsg = calloc(1, sizeof(SRetrieveTableMsg)); pRetrieveMsg->showId = htonl(pRequest->body.execId); - pMsgBody->pData = pRetrieveMsg; + pMsgBody->msgInfo.pMsg = pRetrieveMsg; return TSDB_CODE_SUCCESS; } int32_t processRetrieveMnodeRsp(SRequestObj *pRequest, const char* pMsg, int32_t msgLen) { assert(msgLen >= sizeof(SRetrieveTableRsp)); - tfree(pRequest->body.pResInfo->pMsg); - pRequest->body.pResInfo->pMsg = pMsg; + tfree(pRequest->body.resInfo.pRspMsg); + pRequest->body.resInfo.pRspMsg = pMsg; SRetrieveTableRsp *pRetrieve = (SRetrieveTableRsp *) pMsg; pRetrieve->numOfRows = htonl(pRetrieve->numOfRows); pRetrieve->precision = htons(pRetrieve->precision); - SClientResultInfo* pResInfo = pRequest->body.pResInfo; + SReqResultInfo* pResInfo = &pRequest->body.resInfo; pResInfo->numOfRows = pRetrieve->numOfRows; pResInfo->pData = pRetrieve->data; // todo fix this in async model diff --git a/source/client/src/tscEnv.c b/source/client/src/tscEnv.c index 76c37ca2f1..182e330df7 100644 --- a/source/client/src/tscEnv.c +++ b/source/client/src/tscEnv.c @@ -170,7 +170,7 @@ void* createRequest(STscObj* pObj, __taos_async_fn_t fp, void* param, int32_t ty pRequest->type = type; pRequest->pTscObj = pObj; pRequest->body.fp = fp; - pRequest->body.param = param; +// pRequest->body.requestMsg. = param; pRequest->msgBuf = calloc(1, ERROR_MSG_BUF_DEFAULT_SIZE); tsem_init(&pRequest->body.rspSem, 0, 0); @@ -188,10 +188,7 @@ static void doDestroyRequest(void* p) { tfree(pRequest->sqlstr); tfree(pRequest->pInfo); - if (pRequest->body.pResInfo != NULL) { - tfree(pRequest->body.pResInfo->pMsg); - tfree(pRequest->body.pResInfo); - } + tfree(pRequest->body.resInfo.pRspMsg); deregisterRequest(pRequest); tfree(pRequest); diff --git a/source/libs/parser/inc/astToMsg.h b/source/libs/parser/inc/astToMsg.h index 32906f7800..1771bdc0ed 100644 --- a/source/libs/parser/inc/astToMsg.h +++ b/source/libs/parser/inc/astToMsg.h @@ -5,6 +5,7 @@ #include "taosmsg.h" SCreateUserMsg* buildUserManipulationMsg(SSqlInfo* pInfo, int32_t* outputLen, int64_t id, char* msgBuf, int32_t msgLen); +SCreateAcctMsg* buildAcctManipulationMsg(SSqlInfo* pInfo, int32_t* outputLen, int64_t id, char* msgBuf, int32_t msgLen); SDropUserMsg* buildDropUserMsg(SSqlInfo* pInfo, int32_t* outputLen, int64_t id, char* msgBuf, int32_t msgLen); SShowMsg* buildShowMsg(SShowInfo* pShowInfo, int64_t id, char* msgBuf, int32_t msgLen); SCreateDbMsg* buildCreateDbMsg(SCreateDbInfo* pCreateDbInfo, char* msgBuf, int32_t msgLen); diff --git a/source/libs/parser/src/astToMsg.c b/source/libs/parser/src/astToMsg.c index 3f9d86737f..42ffbf3c06 100644 --- a/source/libs/parser/src/astToMsg.c +++ b/source/libs/parser/src/astToMsg.c @@ -24,6 +24,50 @@ SCreateUserMsg* buildUserManipulationMsg(SSqlInfo* pInfo, int32_t* outputLen, in return pMsg; } +SCreateAcctMsg* buildAcctManipulationMsg(SSqlInfo* pInfo, int32_t* outputLen, int64_t id, char* msgBuf, int32_t msgLen) { + SCreateAcctMsg* pMsg = (SCreateAcctMsg*)calloc(1, sizeof(SCreateAcctMsg)); + if (pMsg == NULL) { + // tscError("0x%" PRIx64 " failed to malloc for query msg", id); + terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; + return NULL; + } + + SCreateAcctMsg *pCreateMsg = (SCreateAcctMsg *) calloc(1, sizeof(SCreateAcctMsg)); + + SToken *pName = &pInfo->pMiscInfo->user.user; + SToken *pPwd = &pInfo->pMiscInfo->user.passwd; + + strncpy(pCreateMsg->user, pName->z, pName->n); + strncpy(pCreateMsg->pass, pPwd->z, pPwd->n); + + SCreateAcctInfo *pAcctOpt = &pInfo->pMiscInfo->acctOpt; + + pCreateMsg->maxUsers = htonl(pAcctOpt->maxUsers); + pCreateMsg->maxDbs = htonl(pAcctOpt->maxDbs); + pCreateMsg->maxTimeSeries = htonl(pAcctOpt->maxTimeSeries); + pCreateMsg->maxStreams = htonl(pAcctOpt->maxStreams); +// pCreateMsg->maxPointsPerSecond = htonl(pAcctOpt->maxPointsPerSecond); + pCreateMsg->maxStorage = htobe64(pAcctOpt->maxStorage); +// pCreateMsg->maxQueryTime = htobe64(pAcctOpt->maxQueryTime); +// pCreateMsg->maxConnections = htonl(pAcctOpt->maxConnections); + + if (pAcctOpt->stat.n == 0) { + pCreateMsg->accessState = -1; + } else { + if (pAcctOpt->stat.z[0] == 'r' && pAcctOpt->stat.n == 1) { + pCreateMsg->accessState = TSDB_VN_READ_ACCCESS; + } else if (pAcctOpt->stat.z[0] == 'w' && pAcctOpt->stat.n == 1) { + pCreateMsg->accessState = TSDB_VN_WRITE_ACCCESS; + } else if (strncmp(pAcctOpt->stat.z, "all", 3) == 0 && pAcctOpt->stat.n == 3) { + pCreateMsg->accessState = TSDB_VN_ALL_ACCCESS; + } else if (strncmp(pAcctOpt->stat.z, "no", 2) == 0 && pAcctOpt->stat.n == 2) { + pCreateMsg->accessState = 0; + } + } + + *outputLen = sizeof(SCreateAcctMsg); + return pMsg; +} SDropUserMsg* buildDropUserMsg(SSqlInfo* pInfo, int32_t *msgLen, int64_t id, char* msgBuf, int32_t msgBufLen) { SToken* pName = taosArrayGet(pInfo->pMiscInfo->a, 0); if (pName->n >= TSDB_USER_LEN) { diff --git a/source/libs/parser/src/astValidate.c b/source/libs/parser/src/astValidate.c index ba96c4a796..accc786bf6 100644 --- a/source/libs/parser/src/astValidate.c +++ b/source/libs/parser/src/astValidate.c @@ -4228,6 +4228,42 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, int64_t id, void** output, in break; } + case TSDB_SQL_CREATE_ACCT: + case TSDB_SQL_ALTER_ACCT: { + const char* msg1 = "invalid state option, available options[no, r, w, all]"; + const char* msg2 = "invalid user/account name"; + const char* msg3 = "name too long"; + + SToken* pName = &pInfo->pMiscInfo->user.user; + SToken* pPwd = &pInfo->pMiscInfo->user.passwd; + + if (parserValidatePassword(pPwd, pMsgBuf) != TSDB_CODE_SUCCESS) { + return TSDB_CODE_TSC_INVALID_OPERATION; + } + + if (pName->n >= TSDB_USER_LEN) { + return buildInvalidOperationMsg(pMsgBuf, msg3); + } + + if (parserValidateNameToken(pName) != TSDB_CODE_SUCCESS) { + return buildInvalidOperationMsg(pMsgBuf, msg2); + } + + SCreateAcctInfo* pAcctOpt = &pInfo->pMiscInfo->acctOpt; + if (pAcctOpt->stat.n > 0) { + if (pAcctOpt->stat.z[0] == 'r' && pAcctOpt->stat.n == 1) { + } else if (pAcctOpt->stat.z[0] == 'w' && pAcctOpt->stat.n == 1) { + } else if (strncmp(pAcctOpt->stat.z, "all", 3) == 0 && pAcctOpt->stat.n == 3) { + } else if (strncmp(pAcctOpt->stat.z, "no", 2) == 0 && pAcctOpt->stat.n == 2) { + } else { + return buildInvalidOperationMsg(pMsgBuf, msg1); + } + } + + *output = buildAcctManipulationMsg(pInfo, outputLen, id, msgBuf, msgBufLen); + break; + } + case TSDB_SQL_DROP_ACCT: case TSDB_SQL_DROP_USER: { *output = buildDropUserMsg(pInfo, outputLen, id, msgBuf, msgBufLen); From 546b0f10cbbe5ee904c9d1c3b2e313410d18ec3d Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 21 Dec 2021 10:27:10 +0800 Subject: [PATCH 6/6] minor changes --- source/dnode/mnode/impl/src/mndAcct.c | 2 +- source/dnode/mnode/impl/src/mndCluster.c | 2 +- source/dnode/mnode/impl/src/mndMnode.c | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndAcct.c b/source/dnode/mnode/impl/src/mndAcct.c index 426e8500f2..4c2706f481 100644 --- a/source/dnode/mnode/impl/src/mndAcct.c +++ b/source/dnode/mnode/impl/src/mndAcct.c @@ -71,7 +71,7 @@ static int32_t mndCreateDefaultAcct(SMnode *pMnode) { } static SSdbRaw *mndAcctActionEncode(SAcctObj *pAcct) { - SSdbRaw *pRaw = sdbAllocRaw(SDB_ACCT, TSDB_ACCT_VER_NUMBER, sizeof(SAcctObj)); + SSdbRaw *pRaw = sdbAllocRaw(SDB_ACCT, TSDB_ACCT_VER_NUMBER, sizeof(SAcctObj) + TSDB_ACCT_RESERVE_SIZE); if (pRaw == NULL) return NULL; int32_t dataPos = 0; diff --git a/source/dnode/mnode/impl/src/mndCluster.c b/source/dnode/mnode/impl/src/mndCluster.c index 7e9a12e889..cf8511c054 100644 --- a/source/dnode/mnode/impl/src/mndCluster.c +++ b/source/dnode/mnode/impl/src/mndCluster.c @@ -63,7 +63,7 @@ int32_t mndGetClusterName(SMnode *pMnode, char *clusterName, int32_t len) { } static SSdbRaw *mndClusterActionEncode(SClusterObj *pCluster) { - SSdbRaw *pRaw = sdbAllocRaw(SDB_CLUSTER, TSDB_CLUSTER_VER_NUMBE, sizeof(SClusterObj)); + SSdbRaw *pRaw = sdbAllocRaw(SDB_CLUSTER, TSDB_CLUSTER_VER_NUMBE, sizeof(SClusterObj) + TSDB_CLUSTER_RESERVE_SIZE); if (pRaw == NULL) return NULL; int32_t dataPos = 0; diff --git a/source/dnode/mnode/impl/src/mndMnode.c b/source/dnode/mnode/impl/src/mndMnode.c index 9331a0696a..869b6e538b 100644 --- a/source/dnode/mnode/impl/src/mndMnode.c +++ b/source/dnode/mnode/impl/src/mndMnode.c @@ -102,7 +102,7 @@ static int32_t mndCreateDefaultMnode(SMnode *pMnode) { } static SSdbRaw *mndMnodeActionEncode(SMnodeObj *pMnodeObj) { - SSdbRaw *pRaw = sdbAllocRaw(SDB_MNODE, TSDB_MNODE_VER_NUMBER, sizeof(SMnodeObj)); + SSdbRaw *pRaw = sdbAllocRaw(SDB_MNODE, TSDB_MNODE_VER_NUMBER, sizeof(SMnodeObj) + TSDB_MNODE_RESERVE_SIZE); if (pRaw == NULL) return NULL; int32_t dataPos = 0;