From c877418df00f36c1621a6ccc2e2adf65e0457611 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 17 Jul 2020 00:31:23 +0000 Subject: [PATCH 01/16] support multi thread update mnode and dnode --- src/client/inc/tsclient.h | 2 +- src/client/src/tscServer.c | 78 ++++++++++++++++++++++++++------------ src/client/src/tscSql.c | 2 +- src/client/src/tscSystem.c | 3 +- src/client/src/tscUtil.c | 17 +++++---- src/inc/trpc.h | 5 +++ 6 files changed, 72 insertions(+), 35 deletions(-) diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index be82eb64a8..e1180c4143 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -457,7 +457,7 @@ extern void * tscQhandle; extern int tscKeepConn[]; extern int tsInsertHeadSize; extern int tscNumOfThreads; -extern SRpcIpSet tscMgmtIpSet; +extern SRpcCorIpSet tscMgmtIpSet; extern int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo); diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index d8af6d5c87..e8a03a6482 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -29,7 +29,7 @@ #define TSC_MGMT_VNODE 999 -SRpcIpSet tscMgmtIpSet; +SRpcCorIpSet tscMgmtIpSet; SRpcIpSet tscDnodeIpSet; int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo) = {0}; @@ -44,6 +44,42 @@ void tscSaveSubscriptionProgress(void* sub); static int32_t minMsgSize() { return tsRpcHeadSize + 100; } +static void tscDumpMgmtIpSet(SRpcIpSet *ipSet) { + taosCorBeginRead(&tscMgmtIpSet.version); + *ipSet = tscMgmtIpSet.ipSet; + taosCorEndRead(&tscMgmtIpSet.version); +} + +bool tscIpSetIsEqual(SRpcIpSet *s1, SRpcIpSet *s2) { + if (s1->numOfIps != s2->numOfIps + || s1->inUse != s1->inUse) { + return false; + } + for (int32_t i = 0; i < s1->numOfIps; i++) { + if (s1->port[i] != s2->port[i] + || strncmp(s1->fqdn[i], s2->fqdn[i], TSDB_FQDN_LEN) != 0) + return false; + } + return true; +} +void tscSetMgmtIpList(SRpcIpSet *pIpSet) { + // no need to update if equal + SRpcIpSet dump; + tscDumpMgmtIpSet(&dump); + if (tscIpSetIsEqual(&dump, pIpSet)) { + return; + } + + taosCorBeginWrite(&tscMgmtIpSet.version); + SRpcIpSet *mgmtIpSet = &tscMgmtIpSet.ipSet; + mgmtIpSet->numOfIps = pIpSet->numOfIps; + mgmtIpSet->inUse = pIpSet->inUse; + for (int32_t i = 0; i < mgmtIpSet->numOfIps; ++i) { + mgmtIpSet->port[i] = htons(pIpSet->port[i]); + strncpy(mgmtIpSet->fqdn[i], pIpSet->fqdn[i], TSDB_FQDN_LEN); + } + taosCorEndWrite(&tscMgmtIpSet.version); +} static void tscSetDnodeIpList(SSqlObj* pSql, SCMVgroupInfo* pVgroupInfo) { SRpcIpSet* pIpList = &pSql->ipList; pIpList->inUse = 0; @@ -60,31 +96,17 @@ static void tscSetDnodeIpList(SSqlObj* pSql, SCMVgroupInfo* pVgroupInfo) { } void tscPrintMgmtIp() { - if (tscMgmtIpSet.numOfIps <= 0) { - tscError("invalid mnode IP list:%d", tscMgmtIpSet.numOfIps); + SRpcIpSet dump; + tscDumpMgmtIpSet(&dump); + if (dump.numOfIps <= 0) { + tscError("invalid mnode IP list:%d", dump.numOfIps); } else { - for (int i = 0; i < tscMgmtIpSet.numOfIps; ++i) { - tscDebug("mnode index:%d %s:%d", i, tscMgmtIpSet.fqdn[i], tscMgmtIpSet.port[i]); + for (int i = 0; i < dump.numOfIps; ++i) { + tscDebug("mnode index:%d %s:%d", i, dump.fqdn[i], dump.port[i]); } } } -void tscSetMgmtIpList(SRpcIpSet *pIpList) { - tscMgmtIpSet.numOfIps = pIpList->numOfIps; - tscMgmtIpSet.inUse = pIpList->inUse; - for (int32_t i = 0; i < tscMgmtIpSet.numOfIps; ++i) { - tscMgmtIpSet.port[i] = htons(pIpList->port[i]); - } -} - -void tscUpdateIpSet(void *ahandle, SRpcIpSet *pIpSet) { - tscMgmtIpSet = *pIpSet; - tscDebug("mnode IP list is changed for ufp is called, numOfIps:%d inUse:%d", tscMgmtIpSet.numOfIps, tscMgmtIpSet.inUse); - for (int32_t i = 0; i < tscMgmtIpSet.numOfIps; ++i) { - tscDebug("index:%d fqdn:%s port:%d", i, tscMgmtIpSet.fqdn[i], tscMgmtIpSet.port[i]); - } -} - /* * For each management node, try twice at least in case of poor network situation. * If the client start to connect to a non-management node from the client, and the first retry may fail due to @@ -95,7 +117,9 @@ void tscUpdateIpSet(void *ahandle, SRpcIpSet *pIpSet) { UNUSED_FUNC static int32_t tscGetMgmtConnMaxRetryTimes() { int32_t factor = 2; - return tscMgmtIpSet.numOfIps * factor; + SRpcIpSet dump; + tscDumpMgmtIpSet(&dump); + return dump.numOfIps * factor; } void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { @@ -185,7 +209,9 @@ int tscSendMsgToServer(SSqlObj *pSql) { // set the mgmt ip list if (pSql->cmd.command >= TSDB_SQL_MGMT) { - pSql->ipList = tscMgmtIpSet; + SRpcIpSet dump; + tscDumpMgmtIpSet(&dump); + pSql->ipList = dump; } memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen); @@ -239,7 +265,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet) { if (pCmd->command < TSDB_SQL_MGMT) { if (pIpSet) pSql->ipList = *pIpSet; } else { - if (pIpSet) tscMgmtIpSet = *pIpSet; + if (pIpSet) tscSetMgmtIpList(pIpSet); } if (rpcMsg->pCont == NULL) { @@ -421,7 +447,9 @@ int tscProcessSql(SSqlObj *pSql) { return pSql->res.code; } } else if (pCmd->command < TSDB_SQL_LOCAL) { - pSql->ipList = tscMgmtIpSet; + SRpcIpSet dump; + tscDumpMgmtIpSet(&dump); + pSql->ipList = dump; } else { // local handler return (*tscProcessMsgRsp[pCmd->command])(pSql); } diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 0677463d8d..b24cfd9b27 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -63,7 +63,7 @@ SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con if (ip) { if (tscSetMgmtIpListFromCfg(ip, NULL) < 0) return NULL; - if (port) tscMgmtIpSet.port[0] = port; + if (port) tscMgmtIpSet.ipSet.port[0] = port; } void *pDnodeConn = NULL; diff --git a/src/client/src/tscSystem.c b/src/client/src/tscSystem.c index 82cc8cc225..1a78ed87bd 100644 --- a/src/client/src/tscSystem.c +++ b/src/client/src/tscSystem.c @@ -41,7 +41,8 @@ int tscNumOfThreads; static pthread_once_t tscinit = PTHREAD_ONCE_INIT; void taosInitNote(int numOfNoteLines, int maxNotes, char* lable); -void tscUpdateIpSet(void *ahandle, SRpcIpSet *pIpSet); +//void tscUpdateIpSet(void *ahandle, SRpcIpSet *pIpSet); + void tscCheckDiskUsage(void *UNUSED_PARAM(para), void* UNUSED_PARAM(param)) { taosGetDisk(); diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 3e0fe0b4be..b0e6c727eb 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -2146,16 +2146,19 @@ char* strdup_throw(const char* str) { } int tscSetMgmtIpListFromCfg(const char *first, const char *second) { - tscMgmtIpSet.numOfIps = 0; - tscMgmtIpSet.inUse = 0; + // init mgmt ip set + tscMgmtIpSet.version = 0; + SRpcIpSet *mgmtIpSet = &(tscMgmtIpSet.ipSet); + mgmtIpSet->numOfIps = 0; + mgmtIpSet->inUse = 0; if (first && first[0] != 0) { if (strlen(first) >= TSDB_EP_LEN) { terrno = TSDB_CODE_TSC_INVALID_FQDN; return -1; } - taosGetFqdnPortFromEp(first, tscMgmtIpSet.fqdn[tscMgmtIpSet.numOfIps], &tscMgmtIpSet.port[tscMgmtIpSet.numOfIps]); - tscMgmtIpSet.numOfIps++; + taosGetFqdnPortFromEp(first, mgmtIpSet->fqdn[mgmtIpSet->numOfIps], &(mgmtIpSet->port[mgmtIpSet->numOfIps])); + mgmtIpSet->numOfIps++; } if (second && second[0] != 0) { @@ -2163,11 +2166,11 @@ int tscSetMgmtIpListFromCfg(const char *first, const char *second) { terrno = TSDB_CODE_TSC_INVALID_FQDN; return -1; } - taosGetFqdnPortFromEp(second, tscMgmtIpSet.fqdn[tscMgmtIpSet.numOfIps], &tscMgmtIpSet.port[tscMgmtIpSet.numOfIps]); - tscMgmtIpSet.numOfIps++; + taosGetFqdnPortFromEp(second, mgmtIpSet->fqdn[mgmtIpSet->numOfIps], &(mgmtIpSet->port[mgmtIpSet->numOfIps])); + mgmtIpSet->numOfIps++; } - if ( tscMgmtIpSet.numOfIps == 0) { + if (mgmtIpSet->numOfIps == 0) { terrno = TSDB_CODE_TSC_INVALID_FQDN; return -1; } diff --git a/src/inc/trpc.h b/src/inc/trpc.h index d1adfb7494..b159155e9d 100644 --- a/src/inc/trpc.h +++ b/src/inc/trpc.h @@ -35,6 +35,11 @@ typedef struct SRpcIpSet { char fqdn[TSDB_MAX_REPLICA][TSDB_FQDN_LEN]; } SRpcIpSet; +typedef struct SRpcCorIpSet { + int32_t version; + SRpcIpSet ipSet; +} SRpcCorIpSet; + typedef struct SRpcConnInfo { uint32_t clientIp; uint16_t clientPort; From 3d10b6f81d7d0f301bc3dd42405bf4f3869ab723 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 17 Jul 2020 04:25:37 +0000 Subject: [PATCH 02/16] support multi thread update dnode --- src/client/src/tscSchemaUtil.c | 3 ++ src/client/src/tscServer.c | 67 +++++++++++++++++++++------------- src/inc/taosmsg.h | 1 + 3 files changed, 45 insertions(+), 26 deletions(-) diff --git a/src/client/src/tscSchemaUtil.c b/src/client/src/tscSchemaUtil.c index 934a562387..b391a5760e 100644 --- a/src/client/src/tscSchemaUtil.c +++ b/src/client/src/tscSchemaUtil.c @@ -157,6 +157,9 @@ STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg, size_t* size pTableMeta->sid = pTableMetaMsg->sid; pTableMeta->uid = pTableMetaMsg->uid; pTableMeta->vgroupInfo = pTableMetaMsg->vgroup; + //init version here + pTableMeta->vgroupInfo.version = 0; + pTableMeta->sversion = pTableMetaMsg->sversion; pTableMeta->tversion = pTableMetaMsg->tversion; diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index e8a03a6482..4a11288a85 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -51,8 +51,7 @@ static void tscDumpMgmtIpSet(SRpcIpSet *ipSet) { } bool tscIpSetIsEqual(SRpcIpSet *s1, SRpcIpSet *s2) { - if (s1->numOfIps != s2->numOfIps - || s1->inUse != s1->inUse) { + if (s1->numOfIps != s2->numOfIps /*|| s1->inUse != s1->inUse*/) { return false; } for (int32_t i = 0; i < s1->numOfIps; i++) { @@ -80,21 +79,38 @@ void tscSetMgmtIpList(SRpcIpSet *pIpSet) { } taosCorEndWrite(&tscMgmtIpSet.version); } -static void tscSetDnodeIpList(SSqlObj* pSql, SCMVgroupInfo* pVgroupInfo) { - SRpcIpSet* pIpList = &pSql->ipList; - pIpList->inUse = 0; - if (pVgroupInfo == NULL) { - pIpList->numOfIps = 0; +static void tscDumpIpSetFromVgroupInfo(SCMVgroupInfo *pVgroupInfo, SRpcIpSet *pIpSet, int32_t *vgId) { + if (pVgroupInfo == NULL) { return;} + taosCorBeginRead(&pVgroupInfo->version); + if (vgId) { + *vgId = pVgroupInfo->vgId; + } + pIpSet->inUse = 0; + pIpSet->numOfIps = pVgroupInfo->numOfIps; + for (int32_t i = 0; i < pVgroupInfo->numOfIps; ++i) { + strncpy(pIpSet->fqdn[i], pVgroupInfo->ipAddr[i].fqdn, TSDB_FQDN_LEN); + pIpSet->port[i] = pVgroupInfo->ipAddr[i].port; + } + taosCorEndRead(&pVgroupInfo->version); +} +static void tscSetVgroupInfoWithIpSet(SCMVgroupInfo *pVgroupInfo, SRpcIpSet *pIpSet) { + taosCorBeginWrite(&pVgroupInfo->version); + //TODO(dengyihao), dont care vgid + pVgroupInfo->numOfIps = pIpSet->numOfIps; + for (int32_t i = 0; pVgroupInfo->numOfIps; i++) { + strncpy(pVgroupInfo->ipAddr[i].fqdn, pIpSet->fqdn[i], TSDB_FQDN_LEN); + pVgroupInfo->ipAddr[i].port = pIpSet->port[i]; + } + taosCorEndWrite(&pVgroupInfo->version); +} +static void tscSetVgroupInfo(SSqlObj *pObj, SRpcIpSet *pIpSet) { + if (tscIpSetIsEqual(&pObj->ipList, pIpSet)) { return; } - - pIpList->numOfIps = pVgroupInfo->numOfIps; - for(int32_t i = 0; i < pVgroupInfo->numOfIps; ++i) { - strcpy(pIpList->fqdn[i], pVgroupInfo->ipAddr[i].fqdn); - pIpList->port[i] = pVgroupInfo->ipAddr[i].port; - } + SSqlCmd *pCmd = &pObj->cmd; + STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); + tscSetVgroupInfoWithIpSet(&pTableMetaInfo->pTableMeta->vgroupInfo, pIpSet); } - void tscPrintMgmtIp() { SRpcIpSet dump; tscDumpMgmtIpSet(&dump); @@ -263,7 +279,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet) { } if (pCmd->command < TSDB_SQL_MGMT) { - if (pIpSet) pSql->ipList = *pIpSet; + if (pIpSet) tscSetVgroupInfo(pSql, pIpSet); } else { if (pIpSet) tscSetMgmtIpList(pIpSet); } @@ -553,7 +569,7 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) { // pSql->cmd.payloadLen is set during copying data into payload pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT; - tscSetDnodeIpList(pSql, &pTableMeta->vgroupInfo); + tscDumpIpSetFromVgroupInfo(&pTableMeta->vgroupInfo, &pSql->ipList, NULL); tscDebug("%p build submit msg, vgId:%d numOfTables:%d numberOfIP:%d", pSql, vgId, pSql->cmd.numOfTablesInSubmit, pSql->ipList.numOfIps); @@ -595,11 +611,9 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char } else { pVgroupInfo = &pTableMeta->vgroupInfo; } - - tscSetDnodeIpList(pSql, pVgroupInfo); - if (pVgroupInfo != NULL) { - pQueryMsg->head.vgId = htonl(pVgroupInfo->vgId); - } + int32_t vgId = 0; + tscDumpIpSetFromVgroupInfo(pVgroupInfo, &pSql->ipList, &vgId); + pQueryMsg->head.vgId = htonl(vgId); STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg; pTableIdInfo->tid = htonl(pTableMeta->sid); @@ -618,8 +632,9 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, index); // set the vgroup info - tscSetDnodeIpList(pSql, &pTableIdList->vgInfo); - pQueryMsg->head.vgId = htonl(pTableIdList->vgInfo.vgId); + int32_t vgId = 0; + tscDumpIpSetFromVgroupInfo(&pTableIdList->vgInfo, &pSql->ipList, &vgId); + pQueryMsg->head.vgId = htonl(vgId); int32_t numOfTables = taosArrayGetSize(pTableIdList->itemList); pQueryMsg->numOfTables = htonl(numOfTables); // set the number of tables @@ -1351,7 +1366,7 @@ int tscBuildUpdateTagMsg(SSqlObj* pSql, SSqlInfo *pInfo) { SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - tscSetDnodeIpList(pSql, &pTableMetaInfo->pTableMeta->vgroupInfo); + tscDumpIpSetFromVgroupInfo(&pTableMetaInfo->pTableMeta->vgroupInfo, &pSql->ipList, NULL); return TSDB_CODE_SUCCESS; } @@ -1875,11 +1890,11 @@ int tscProcessSTableVgroupRsp(SSqlObj *pSql) { memcpy(pInfo->vgroupList, pVgroupInfo, size); for (int32_t j = 0; j < pInfo->vgroupList->numOfVgroups; ++j) { + //just init, no need to lock SCMVgroupInfo *pVgroups = &pInfo->vgroupList->vgroups[j]; - + pVgroups->version = 0; pVgroups->vgId = htonl(pVgroups->vgId); assert(pVgroups->numOfIps >= 1); - for (int32_t k = 0; k < pVgroups->numOfIps; ++k) { pVgroups->ipAddr[k].port = htons(pVgroups->ipAddr[k].port); } diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index b7afaf1e06..c12aed2bcc 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -647,6 +647,7 @@ typedef struct SCMSTableVgroupMsg { } SCMSTableVgroupMsg, SCMSTableVgroupRspMsg; typedef struct { + int32_t version; int32_t vgId; int8_t numOfIps; SIpAddr ipAddr[TSDB_MAX_REPLICA]; From dd45e955dc5ed8c2a9862abbcda319ced4531f68 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 17 Jul 2020 06:53:56 +0000 Subject: [PATCH 03/16] fix bug --- src/client/src/tscServer.c | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 4a11288a85..045aab61d6 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -44,9 +44,18 @@ void tscSaveSubscriptionProgress(void* sub); static int32_t minMsgSize() { return tsRpcHeadSize + 100; } +void tscIpSetCopy(SRpcIpSet *dst, SRpcIpSet *src) { + dst->numOfIps = src->numOfIps; + dst->inUse = src->inUse; + for (int32_t i = 0; i < src->numOfIps; ++i) { + dst->port[i] = htons(dst->port[i]); + strncpy(dst->fqdn[i], src->fqdn[i], TSDB_FQDN_LEN); + } +} static void tscDumpMgmtIpSet(SRpcIpSet *ipSet) { taosCorBeginRead(&tscMgmtIpSet.version); - *ipSet = tscMgmtIpSet.ipSet; + SRpcIpSet* src = &tscMgmtIpSet.ipSet; + tscIpSetCopy(ipSet, src); taosCorEndRead(&tscMgmtIpSet.version); } @@ -71,12 +80,7 @@ void tscSetMgmtIpList(SRpcIpSet *pIpSet) { taosCorBeginWrite(&tscMgmtIpSet.version); SRpcIpSet *mgmtIpSet = &tscMgmtIpSet.ipSet; - mgmtIpSet->numOfIps = pIpSet->numOfIps; - mgmtIpSet->inUse = pIpSet->inUse; - for (int32_t i = 0; i < mgmtIpSet->numOfIps; ++i) { - mgmtIpSet->port[i] = htons(pIpSet->port[i]); - strncpy(mgmtIpSet->fqdn[i], pIpSet->fqdn[i], TSDB_FQDN_LEN); - } + tscIpSetCopy(mgmtIpSet, pIpSet); taosCorEndWrite(&tscMgmtIpSet.version); } static void tscDumpIpSetFromVgroupInfo(SCMVgroupInfo *pVgroupInfo, SRpcIpSet *pIpSet, int32_t *vgId) { @@ -227,7 +231,7 @@ int tscSendMsgToServer(SSqlObj *pSql) { if (pSql->cmd.command >= TSDB_SQL_MGMT) { SRpcIpSet dump; tscDumpMgmtIpSet(&dump); - pSql->ipList = dump; + tscIpSetCopy(&pSql->ipList, &dump); } memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen); From 6163e2401f5d18822c206475543a14019991c500 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 17 Jul 2020 07:42:04 +0000 Subject: [PATCH 04/16] refor code --- src/client/src/tscServer.c | 41 ++++++++++++++++++-------------------- 1 file changed, 19 insertions(+), 22 deletions(-) diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 045aab61d6..c82773a125 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -70,15 +70,10 @@ bool tscIpSetIsEqual(SRpcIpSet *s1, SRpcIpSet *s2) { } return true; } -void tscSetMgmtIpList(SRpcIpSet *pIpSet) { +void tscUpdateMgmtIpList(SRpcIpSet *pIpSet) { // no need to update if equal - SRpcIpSet dump; - tscDumpMgmtIpSet(&dump); - if (tscIpSetIsEqual(&dump, pIpSet)) { - return; - } - taosCorBeginWrite(&tscMgmtIpSet.version); + // or copy directly, tscMgmtIpSet.ipSet = *pIpSet SRpcIpSet *mgmtIpSet = &tscMgmtIpSet.ipSet; tscIpSetCopy(mgmtIpSet, pIpSet); taosCorEndWrite(&tscMgmtIpSet.version); @@ -97,7 +92,12 @@ static void tscDumpIpSetFromVgroupInfo(SCMVgroupInfo *pVgroupInfo, SRpcIpSet *pI } taosCorEndRead(&pVgroupInfo->version); } -static void tscSetVgroupInfoWithIpSet(SCMVgroupInfo *pVgroupInfo, SRpcIpSet *pIpSet) { + +static void tscUpdateVgroupInfo(SSqlObj *pObj, SRpcIpSet *pIpSet) { + SSqlCmd *pCmd = &pObj->cmd; + STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); + SCMVgroupInfo *pVgroupInfo = &pTableMetaInfo->pTableMeta->vgroupInfo; + taosCorBeginWrite(&pVgroupInfo->version); //TODO(dengyihao), dont care vgid pVgroupInfo->numOfIps = pIpSet->numOfIps; @@ -107,14 +107,6 @@ static void tscSetVgroupInfoWithIpSet(SCMVgroupInfo *pVgroupInfo, SRpcIpSet *pIp } taosCorEndWrite(&pVgroupInfo->version); } -static void tscSetVgroupInfo(SSqlObj *pObj, SRpcIpSet *pIpSet) { - if (tscIpSetIsEqual(&pObj->ipList, pIpSet)) { - return; - } - SSqlCmd *pCmd = &pObj->cmd; - STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); - tscSetVgroupInfoWithIpSet(&pTableMetaInfo->pTableMeta->vgroupInfo, pIpSet); -} void tscPrintMgmtIp() { SRpcIpSet dump; tscDumpMgmtIpSet(&dump); @@ -157,7 +149,7 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { SCMHeartBeatRsp *pRsp = (SCMHeartBeatRsp *)pRes->pRsp; SRpcIpSet * pIpList = &pRsp->ipList; if (pIpList->numOfIps > 0) - tscSetMgmtIpList(pIpList); + tscUpdateMgmtIpList(pIpList); pSql->pTscObj->connId = htonl(pRsp->connId); @@ -231,6 +223,7 @@ int tscSendMsgToServer(SSqlObj *pSql) { if (pSql->cmd.command >= TSDB_SQL_MGMT) { SRpcIpSet dump; tscDumpMgmtIpSet(&dump); + // no need to update pSql->ipList tscIpSetCopy(&pSql->ipList, &dump); } @@ -282,10 +275,14 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet) { return; } - if (pCmd->command < TSDB_SQL_MGMT) { - if (pIpSet) tscSetVgroupInfo(pSql, pIpSet); - } else { - if (pIpSet) tscSetMgmtIpList(pIpSet); + if (pIpSet) { + if (!tscIpSetIsEqual(&pSql->ipList, pIpSet)) { + if (pCmd->command < TSDB_SQL_MGMT) { + tscUpdateVgroupInfo(pSql, pIpSet); + } else { + tscUpdateMgmtIpList(pIpSet); + } + } } if (rpcMsg->pCont == NULL) { @@ -1994,7 +1991,7 @@ int tscProcessConnectRsp(SSqlObj *pSql) { tstrncpy(pObj->db, temp, sizeof(pObj->db)); if (pConnect->ipList.numOfIps > 0) - tscSetMgmtIpList(&pConnect->ipList); + tscUpdateMgmtIpList(&pConnect->ipList); strcpy(pObj->sversion, pConnect->serverVersion); pObj->writeAuth = pConnect->writeAuth; From 5f55393464f57939a17043d3a4a23d3f81ad8353 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 17 Jul 2020 21:54:16 +0000 Subject: [PATCH 05/16] fixbug --- src/client/src/tscServer.c | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index c82773a125..ced6914df2 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -48,7 +48,7 @@ void tscIpSetCopy(SRpcIpSet *dst, SRpcIpSet *src) { dst->numOfIps = src->numOfIps; dst->inUse = src->inUse; for (int32_t i = 0; i < src->numOfIps; ++i) { - dst->port[i] = htons(dst->port[i]); + dst->port[i] = src->port[i]; strncpy(dst->fqdn[i], src->fqdn[i], TSDB_FQDN_LEN); } } @@ -58,7 +58,11 @@ static void tscDumpMgmtIpSet(SRpcIpSet *ipSet) { tscIpSetCopy(ipSet, src); taosCorEndRead(&tscMgmtIpSet.version); } - +static void tscIpSetHtons(SRpcIpSet *s) { + for (int32_t i = 0; i < s->numOfIps; i++) { + s->port[i] = htons(s->port[i]); + } +} bool tscIpSetIsEqual(SRpcIpSet *s1, SRpcIpSet *s2) { if (s1->numOfIps != s2->numOfIps /*|| s1->inUse != s1->inUse*/) { return false; @@ -149,6 +153,7 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { SCMHeartBeatRsp *pRsp = (SCMHeartBeatRsp *)pRes->pRsp; SRpcIpSet * pIpList = &pRsp->ipList; if (pIpList->numOfIps > 0) + tscIpSetHtons(pIpList); tscUpdateMgmtIpList(pIpList); pSql->pTscObj->connId = htonl(pRsp->connId); @@ -275,13 +280,15 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet) { return; } - if (pIpSet) { - if (!tscIpSetIsEqual(&pSql->ipList, pIpSet)) { - if (pCmd->command < TSDB_SQL_MGMT) { + if (pIpSet) { + //SRpcIpSet dump; + tscIpSetHtons(pIpSet); + if (tscIpSetIsEqual(&pSql->ipList, pIpSet)) { + if(pCmd->command < TSDB_SQL_MGMT) { tscUpdateVgroupInfo(pSql, pIpSet); } else { tscUpdateMgmtIpList(pIpSet); - } + } } } @@ -464,10 +471,8 @@ int tscProcessSql(SSqlObj *pSql) { return pSql->res.code; } } else if (pCmd->command < TSDB_SQL_LOCAL) { - SRpcIpSet dump; - tscDumpMgmtIpSet(&dump); - pSql->ipList = dump; - } else { // local handler + tscDumpMgmtIpSet(&pSql->ipList); + } else { return (*tscProcessMsgRsp[pCmd->command])(pSql); } @@ -1991,6 +1996,7 @@ int tscProcessConnectRsp(SSqlObj *pSql) { tstrncpy(pObj->db, temp, sizeof(pObj->db)); if (pConnect->ipList.numOfIps > 0) + tscIpSetHtons(&pConnect->ipList); tscUpdateMgmtIpList(&pConnect->ipList); strcpy(pObj->sversion, pConnect->serverVersion); From d2823d60e2bc30dffa04970bf970de53761f076a Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 17 Jul 2020 22:17:23 +0000 Subject: [PATCH 06/16] modify code style --- src/client/src/tscServer.c | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index ced6914df2..b162387685 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -226,10 +226,7 @@ int tscSendMsgToServer(SSqlObj *pSql) { // set the mgmt ip list if (pSql->cmd.command >= TSDB_SQL_MGMT) { - SRpcIpSet dump; - tscDumpMgmtIpSet(&dump); - // no need to update pSql->ipList - tscIpSetCopy(&pSql->ipList, &dump); + tscDumpMgmtIpSet(&pSql->ipList); } memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen); @@ -471,7 +468,7 @@ int tscProcessSql(SSqlObj *pSql) { return pSql->res.code; } } else if (pCmd->command < TSDB_SQL_LOCAL) { - tscDumpMgmtIpSet(&pSql->ipList); + //tscDumpMgmtIpSet(&pSql->ipList); } else { return (*tscProcessMsgRsp[pCmd->command])(pSql); } From 65b32c3855ecfd7caf2aef977ada081e5e3b1b66 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 17 Jul 2020 22:35:57 +0000 Subject: [PATCH 07/16] modify code style --- src/client/src/tscServer.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index b162387685..3b3884eb07 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -100,6 +100,7 @@ static void tscDumpIpSetFromVgroupInfo(SCMVgroupInfo *pVgroupInfo, SRpcIpSet *pI static void tscUpdateVgroupInfo(SSqlObj *pObj, SRpcIpSet *pIpSet) { SSqlCmd *pCmd = &pObj->cmd; STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); + if (pTableMetaInfo == NULL || pTableMetaInfo->pTableMeta == NULL) { return;} SCMVgroupInfo *pVgroupInfo = &pTableMetaInfo->pTableMeta->vgroupInfo; taosCorBeginWrite(&pVgroupInfo->version); From 0ac592fbfd44686a875629a93760d6e48e24b8bf Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sat, 18 Jul 2020 01:04:02 +0000 Subject: [PATCH 08/16] add inuse to vgroupinfo --- src/client/src/tscServer.c | 9 ++++++--- src/inc/taosmsg.h | 1 + 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 3b3884eb07..d0c9d3636b 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -64,7 +64,7 @@ static void tscIpSetHtons(SRpcIpSet *s) { } } bool tscIpSetIsEqual(SRpcIpSet *s1, SRpcIpSet *s2) { - if (s1->numOfIps != s2->numOfIps /*|| s1->inUse != s1->inUse*/) { + if (s1->numOfIps != s2->numOfIps || s1->inUse != s2->inUse) { return false; } for (int32_t i = 0; i < s1->numOfIps; i++) { @@ -88,7 +88,8 @@ static void tscDumpIpSetFromVgroupInfo(SCMVgroupInfo *pVgroupInfo, SRpcIpSet *pI if (vgId) { *vgId = pVgroupInfo->vgId; } - pIpSet->inUse = 0; + int8_t inUse = pVgroupInfo->inUse; + pIpSet->inUse = (inUse >= 0 && inUse < TSDB_MAX_REPLICA) ? inUse: 0; pIpSet->numOfIps = pVgroupInfo->numOfIps; for (int32_t i = 0; i < pVgroupInfo->numOfIps; ++i) { strncpy(pIpSet->fqdn[i], pVgroupInfo->ipAddr[i].fqdn, TSDB_FQDN_LEN); @@ -104,7 +105,8 @@ static void tscUpdateVgroupInfo(SSqlObj *pObj, SRpcIpSet *pIpSet) { SCMVgroupInfo *pVgroupInfo = &pTableMetaInfo->pTableMeta->vgroupInfo; taosCorBeginWrite(&pVgroupInfo->version); - //TODO(dengyihao), dont care vgid + //TODO(dengyihao), dont care vgid + pVgroupInfo->inUse = pIpSet->inUse; pVgroupInfo->numOfIps = pIpSet->numOfIps; for (int32_t i = 0; pVgroupInfo->numOfIps; i++) { strncpy(pVgroupInfo->ipAddr[i].fqdn, pIpSet->fqdn[i], TSDB_FQDN_LEN); @@ -1897,6 +1899,7 @@ int tscProcessSTableVgroupRsp(SSqlObj *pSql) { //just init, no need to lock SCMVgroupInfo *pVgroups = &pInfo->vgroupList->vgroups[j]; pVgroups->version = 0; + pVgroups->inUse = 0; pVgroups->vgId = htonl(pVgroups->vgId); assert(pVgroups->numOfIps >= 1); for (int32_t k = 0; k < pVgroups->numOfIps; ++k) { diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index c12aed2bcc..fec3d77d8e 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -649,6 +649,7 @@ typedef struct SCMSTableVgroupMsg { typedef struct { int32_t version; int32_t vgId; + int8_t inUse; int8_t numOfIps; SIpAddr ipAddr[TSDB_MAX_REPLICA]; } SCMVgroupInfo; From 700504900fc6419f6e87f26d2449ab2168144017 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sat, 18 Jul 2020 01:32:00 +0000 Subject: [PATCH 09/16] compile error --- src/client/src/tscServer.c | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index d0c9d3636b..cafdcfbf25 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -155,9 +155,10 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { if (code == 0) { SCMHeartBeatRsp *pRsp = (SCMHeartBeatRsp *)pRes->pRsp; SRpcIpSet * pIpList = &pRsp->ipList; - if (pIpList->numOfIps > 0) + if (pIpList->numOfIps > 0) { tscIpSetHtons(pIpList); tscUpdateMgmtIpList(pIpList); + } pSql->pTscObj->connId = htonl(pRsp->connId); @@ -1996,9 +1997,10 @@ int tscProcessConnectRsp(SSqlObj *pSql) { assert(len <= sizeof(pObj->db)); tstrncpy(pObj->db, temp, sizeof(pObj->db)); - if (pConnect->ipList.numOfIps > 0) + if (pConnect->ipList.numOfIps > 0) { tscIpSetHtons(&pConnect->ipList); tscUpdateMgmtIpList(&pConnect->ipList); + } strcpy(pObj->sversion, pConnect->serverVersion); pObj->writeAuth = pConnect->writeAuth; From 54ab10737a7dfc0b433680a16751858077fb6e8b Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sat, 18 Jul 2020 01:43:22 +0000 Subject: [PATCH 10/16] compile error --- src/client/src/tscServer.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index cafdcfbf25..9425b32aab 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -26,6 +26,7 @@ #include "ttime.h" #include "ttimer.h" #include "tutil.h" +#include "tlockfree.h" #define TSC_MGMT_VNODE 999 From 7aa062c9ad5679891f0d949160146c02eaa6b71a Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sat, 18 Jul 2020 03:03:04 +0000 Subject: [PATCH 11/16] modify function --- src/util/inc/tlockfree.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/util/inc/tlockfree.h b/src/util/inc/tlockfree.h index e425d71d27..a81f597832 100644 --- a/src/util/inc/tlockfree.h +++ b/src/util/inc/tlockfree.h @@ -75,7 +75,7 @@ void taosRUnLockLatch(SRWLatch *pLatch); // copy on read #define taosCorBeginRead(x) for (uint32_t i_ = 1; 1; ++i_) { \ - int32_t old_ = atomic_load_32(x); \ + int32_t old_ = atomic_add_fetch_32((x), 0); \ if (old_ & 0x00000001) { \ if (i_ % 1000 == 0) { \ sched_yield(); \ @@ -84,7 +84,7 @@ void taosRUnLockLatch(SRWLatch *pLatch); } #define taosCorEndRead(x) \ - if (atomic_load_32(x) == old_) { \ + if (atomic_add_fetch_32((x), 0) == old_) { \ break; \ } \ } From a28e9a9402fcb802fa7a301d76385fb083d6db3d Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sat, 18 Jul 2020 03:21:16 +0000 Subject: [PATCH 12/16] test --- src/client/src/tscServer.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 9425b32aab..7269173c8c 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -85,7 +85,7 @@ void tscUpdateMgmtIpList(SRpcIpSet *pIpSet) { } static void tscDumpIpSetFromVgroupInfo(SCMVgroupInfo *pVgroupInfo, SRpcIpSet *pIpSet, int32_t *vgId) { if (pVgroupInfo == NULL) { return;} - taosCorBeginRead(&pVgroupInfo->version); + //taosCorBeginRead(&pVgroupInfo->version); if (vgId) { *vgId = pVgroupInfo->vgId; } @@ -96,7 +96,7 @@ static void tscDumpIpSetFromVgroupInfo(SCMVgroupInfo *pVgroupInfo, SRpcIpSet *pI strncpy(pIpSet->fqdn[i], pVgroupInfo->ipAddr[i].fqdn, TSDB_FQDN_LEN); pIpSet->port[i] = pVgroupInfo->ipAddr[i].port; } - taosCorEndRead(&pVgroupInfo->version); + //taosCorEndRead(&pVgroupInfo->version); } static void tscUpdateVgroupInfo(SSqlObj *pObj, SRpcIpSet *pIpSet) { @@ -105,7 +105,7 @@ static void tscUpdateVgroupInfo(SSqlObj *pObj, SRpcIpSet *pIpSet) { if (pTableMetaInfo == NULL || pTableMetaInfo->pTableMeta == NULL) { return;} SCMVgroupInfo *pVgroupInfo = &pTableMetaInfo->pTableMeta->vgroupInfo; - taosCorBeginWrite(&pVgroupInfo->version); + //taosCorBeginWrite(&pVgroupInfo->version); //TODO(dengyihao), dont care vgid pVgroupInfo->inUse = pIpSet->inUse; pVgroupInfo->numOfIps = pIpSet->numOfIps; @@ -113,7 +113,7 @@ static void tscUpdateVgroupInfo(SSqlObj *pObj, SRpcIpSet *pIpSet) { strncpy(pVgroupInfo->ipAddr[i].fqdn, pIpSet->fqdn[i], TSDB_FQDN_LEN); pVgroupInfo->ipAddr[i].port = pIpSet->port[i]; } - taosCorEndWrite(&pVgroupInfo->version); + //taosCorEndWrite(&pVgroupInfo->version); } void tscPrintMgmtIp() { SRpcIpSet dump; From 3c4e620f31257219d3871857f8c95102f87efecc Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sat, 18 Jul 2020 05:41:19 +0000 Subject: [PATCH 13/16] bugfix --- src/client/inc/tsclient.h | 10 ++++++- src/client/src/tscSchemaUtil.c | 16 ++++++++--- src/client/src/tscServer.c | 49 ++++++++++++++++++++-------------- src/inc/taosmsg.h | 2 -- 4 files changed, 50 insertions(+), 27 deletions(-) diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index e1180c4143..3a420567a9 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -52,12 +52,20 @@ typedef struct STableComInfo { int32_t rowSize; } STableComInfo; +typedef struct SCMCorVgroupInfo { + int32_t version; + int8_t inUse; + int8_t numOfIps; + SIpAddr ipAddr[TSDB_MAX_REPLICA]; +} SCMCorVgroupInfo; + typedef struct STableMeta { STableComInfo tableInfo; uint8_t tableType; int16_t sversion; int16_t tversion; - SCMVgroupInfo vgroupInfo; + SCMVgroupInfo vgroupInfo; + SCMCorVgroupInfo corVgroupInfo; int32_t sid; // the index of one table in a virtual node uint64_t uid; // unique id of a table SSchema schema[]; // if the table is TSDB_CHILD_TABLE, schema is acquired by super table meta info diff --git a/src/client/src/tscSchemaUtil.c b/src/client/src/tscSchemaUtil.c index b391a5760e..52342b3650 100644 --- a/src/client/src/tscSchemaUtil.c +++ b/src/client/src/tscSchemaUtil.c @@ -140,7 +140,15 @@ struct SSchema tscGetTbnameColumnSchema() { strcpy(s.name, TSQL_TBNAME_L); return s; } - +static void tscInitCorVgroupInfo(SCMCorVgroupInfo *corVgroupInfo, SCMVgroupInfo *vgroupInfo) { + corVgroupInfo->version = 0; + corVgroupInfo->inUse = 0; + corVgroupInfo->numOfIps = vgroupInfo->numOfIps; + for (int32_t i = 0; i < corVgroupInfo->numOfIps; i++) { + strncpy(corVgroupInfo->ipAddr[i].fqdn, vgroupInfo->ipAddr[i].fqdn, TSDB_FQDN_LEN); + corVgroupInfo->ipAddr[i].port = vgroupInfo->ipAddr[i].port; + } +} STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg, size_t* size) { assert(pTableMetaMsg != NULL); @@ -157,9 +165,9 @@ STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg, size_t* size pTableMeta->sid = pTableMetaMsg->sid; pTableMeta->uid = pTableMetaMsg->uid; pTableMeta->vgroupInfo = pTableMetaMsg->vgroup; - //init version here - pTableMeta->vgroupInfo.version = 0; - + + tscInitCorVgroupInfo(&pTableMeta->corVgroupInfo, &pTableMeta->vgroupInfo); + pTableMeta->sversion = pTableMetaMsg->sversion; pTableMeta->tversion = pTableMetaMsg->tversion; diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 7269173c8c..cb97307b04 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -45,6 +45,20 @@ void tscSaveSubscriptionProgress(void* sub); static int32_t minMsgSize() { return tsRpcHeadSize + 100; } +static void tscSetDnodeIpList(SSqlObj* pSql, SCMVgroupInfo* pVgroupInfo) { + SRpcIpSet* pIpList = &pSql->ipList; + pIpList->inUse = 0; + if (pVgroupInfo == NULL) { + pIpList->numOfIps = 0; + return; + } + + pIpList->numOfIps = pVgroupInfo->numOfIps; + for(int32_t i = 0; i < pVgroupInfo->numOfIps; ++i) { + strcpy(pIpList->fqdn[i], pVgroupInfo->ipAddr[i].fqdn); + pIpList->port[i] = pVgroupInfo->ipAddr[i].port; + } +} void tscIpSetCopy(SRpcIpSet *dst, SRpcIpSet *src) { dst->numOfIps = src->numOfIps; dst->inUse = src->inUse; @@ -83,12 +97,9 @@ void tscUpdateMgmtIpList(SRpcIpSet *pIpSet) { tscIpSetCopy(mgmtIpSet, pIpSet); taosCorEndWrite(&tscMgmtIpSet.version); } -static void tscDumpIpSetFromVgroupInfo(SCMVgroupInfo *pVgroupInfo, SRpcIpSet *pIpSet, int32_t *vgId) { +static void tscDumpIpSetFromVgroupInfo(SCMCorVgroupInfo *pVgroupInfo, SRpcIpSet *pIpSet) { if (pVgroupInfo == NULL) { return;} - //taosCorBeginRead(&pVgroupInfo->version); - if (vgId) { - *vgId = pVgroupInfo->vgId; - } + taosCorBeginRead(&pVgroupInfo->version); int8_t inUse = pVgroupInfo->inUse; pIpSet->inUse = (inUse >= 0 && inUse < TSDB_MAX_REPLICA) ? inUse: 0; pIpSet->numOfIps = pVgroupInfo->numOfIps; @@ -96,16 +107,16 @@ static void tscDumpIpSetFromVgroupInfo(SCMVgroupInfo *pVgroupInfo, SRpcIpSet *pI strncpy(pIpSet->fqdn[i], pVgroupInfo->ipAddr[i].fqdn, TSDB_FQDN_LEN); pIpSet->port[i] = pVgroupInfo->ipAddr[i].port; } - //taosCorEndRead(&pVgroupInfo->version); + taosCorEndRead(&pVgroupInfo->version); } static void tscUpdateVgroupInfo(SSqlObj *pObj, SRpcIpSet *pIpSet) { SSqlCmd *pCmd = &pObj->cmd; STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); if (pTableMetaInfo == NULL || pTableMetaInfo->pTableMeta == NULL) { return;} - SCMVgroupInfo *pVgroupInfo = &pTableMetaInfo->pTableMeta->vgroupInfo; + SCMCorVgroupInfo *pVgroupInfo = &pTableMetaInfo->pTableMeta->corVgroupInfo; - //taosCorBeginWrite(&pVgroupInfo->version); + taosCorBeginWrite(&pVgroupInfo->version); //TODO(dengyihao), dont care vgid pVgroupInfo->inUse = pIpSet->inUse; pVgroupInfo->numOfIps = pIpSet->numOfIps; @@ -113,7 +124,7 @@ static void tscUpdateVgroupInfo(SSqlObj *pObj, SRpcIpSet *pIpSet) { strncpy(pVgroupInfo->ipAddr[i].fqdn, pIpSet->fqdn[i], TSDB_FQDN_LEN); pVgroupInfo->ipAddr[i].port = pIpSet->port[i]; } - //taosCorEndWrite(&pVgroupInfo->version); + taosCorEndWrite(&pVgroupInfo->version); } void tscPrintMgmtIp() { SRpcIpSet dump; @@ -577,7 +588,7 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) { // pSql->cmd.payloadLen is set during copying data into payload pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT; - tscDumpIpSetFromVgroupInfo(&pTableMeta->vgroupInfo, &pSql->ipList, NULL); + tscDumpIpSetFromVgroupInfo(&pTableMeta->corVgroupInfo, &pSql->ipList); tscDebug("%p build submit msg, vgId:%d numOfTables:%d numberOfIP:%d", pSql, vgId, pSql->cmd.numOfTablesInSubmit, pSql->ipList.numOfIps); @@ -619,9 +630,10 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char } else { pVgroupInfo = &pTableMeta->vgroupInfo; } - int32_t vgId = 0; - tscDumpIpSetFromVgroupInfo(pVgroupInfo, &pSql->ipList, &vgId); - pQueryMsg->head.vgId = htonl(vgId); + tscSetDnodeIpList(pSql, pVgroupInfo); + if (pVgroupInfo != NULL) { + pQueryMsg->head.vgId = htonl(pVgroupInfo->vgId); + } STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg; pTableIdInfo->tid = htonl(pTableMeta->sid); @@ -639,10 +651,9 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, index); - // set the vgroup info - int32_t vgId = 0; - tscDumpIpSetFromVgroupInfo(&pTableIdList->vgInfo, &pSql->ipList, &vgId); - pQueryMsg->head.vgId = htonl(vgId); + // set the vgroup info + tscSetDnodeIpList(pSql, &pTableIdList->vgInfo); + pQueryMsg->head.vgId = htonl(pTableIdList->vgInfo.vgId); int32_t numOfTables = taosArrayGetSize(pTableIdList->itemList); pQueryMsg->numOfTables = htonl(numOfTables); // set the number of tables @@ -1374,7 +1385,7 @@ int tscBuildUpdateTagMsg(SSqlObj* pSql, SSqlInfo *pInfo) { SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - tscDumpIpSetFromVgroupInfo(&pTableMetaInfo->pTableMeta->vgroupInfo, &pSql->ipList, NULL); + tscDumpIpSetFromVgroupInfo(&pTableMetaInfo->pTableMeta->corVgroupInfo, &pSql->ipList); return TSDB_CODE_SUCCESS; } @@ -1900,8 +1911,6 @@ int tscProcessSTableVgroupRsp(SSqlObj *pSql) { for (int32_t j = 0; j < pInfo->vgroupList->numOfVgroups; ++j) { //just init, no need to lock SCMVgroupInfo *pVgroups = &pInfo->vgroupList->vgroups[j]; - pVgroups->version = 0; - pVgroups->inUse = 0; pVgroups->vgId = htonl(pVgroups->vgId); assert(pVgroups->numOfIps >= 1); for (int32_t k = 0; k < pVgroups->numOfIps; ++k) { diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index fec3d77d8e..b7afaf1e06 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -647,9 +647,7 @@ typedef struct SCMSTableVgroupMsg { } SCMSTableVgroupMsg, SCMSTableVgroupRspMsg; typedef struct { - int32_t version; int32_t vgId; - int8_t inUse; int8_t numOfIps; SIpAddr ipAddr[TSDB_MAX_REPLICA]; } SCMVgroupInfo; From 246ea1df248e8721bad64f40fd57ed0ec0db263c Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sat, 18 Jul 2020 10:31:31 +0000 Subject: [PATCH 14/16] modify from ip to ep --- src/client/inc/tsclient.h | 4 +- src/client/src/tscSchemaUtil.c | 8 +-- src/client/src/tscServer.c | 95 +++++++++++++++------------------- src/client/src/tscSql.c | 2 +- src/client/src/tscUtil.c | 2 +- src/inc/trpc.h | 6 +-- 6 files changed, 53 insertions(+), 64 deletions(-) diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 83f9165d35..b5455ed1fb 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -55,8 +55,8 @@ typedef struct STableComInfo { typedef struct SCMCorVgroupInfo { int32_t version; int8_t inUse; - int8_t numOfIps; - SIpAddr ipAddr[TSDB_MAX_REPLICA]; + int8_t numOfEps; + SEpAddr epAddr[TSDB_MAX_REPLICA]; } SCMCorVgroupInfo; typedef struct STableMeta { diff --git a/src/client/src/tscSchemaUtil.c b/src/client/src/tscSchemaUtil.c index 52342b3650..9b8f48b109 100644 --- a/src/client/src/tscSchemaUtil.c +++ b/src/client/src/tscSchemaUtil.c @@ -143,10 +143,10 @@ struct SSchema tscGetTbnameColumnSchema() { static void tscInitCorVgroupInfo(SCMCorVgroupInfo *corVgroupInfo, SCMVgroupInfo *vgroupInfo) { corVgroupInfo->version = 0; corVgroupInfo->inUse = 0; - corVgroupInfo->numOfIps = vgroupInfo->numOfIps; - for (int32_t i = 0; i < corVgroupInfo->numOfIps; i++) { - strncpy(corVgroupInfo->ipAddr[i].fqdn, vgroupInfo->ipAddr[i].fqdn, TSDB_FQDN_LEN); - corVgroupInfo->ipAddr[i].port = vgroupInfo->ipAddr[i].port; + corVgroupInfo->numOfEps = vgroupInfo->numOfEps; + for (int32_t i = 0; i < corVgroupInfo->numOfEps; i++) { + strncpy(corVgroupInfo->epAddr[i].fqdn, vgroupInfo->epAddr[i].fqdn, TSDB_FQDN_LEN); + corVgroupInfo->epAddr[i].port = vgroupInfo->epAddr[i].port; } } STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg, size_t* size) { diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 3d78063bed..d9922b8718 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -53,64 +53,53 @@ static void tscSetDnodeEpSet(SSqlObj* pSql, SCMVgroupInfo* pVgroupInfo) { return; } - pEpList->numOfEps = pVgroupInfo->numOfEps; + pEpSet->numOfEps = pVgroupInfo->numOfEps; for(int32_t i = 0; i < pVgroupInfo->numOfEps; ++i) { - strcpy(pEpList->fqdn[i], pVgroupInfo->epAddr[i].fqdn); - pEpList->port[i] = pVgroupInfo->epAddr[i].port; + strcpy(pEpSet->fqdn[i], pVgroupInfo->epAddr[i].fqdn); + pEpSet->port[i] = pVgroupInfo->epAddr[i].port; } } -void tscIpSetCopy(SRpcIpSet *dst, SRpcIpSet *src) { - dst->numOfIps = src->numOfIps; - dst->inUse = src->inUse; - for (int32_t i = 0; i < src->numOfIps; ++i) { - dst->port[i] = src->port[i]; - strncpy(dst->fqdn[i], src->fqdn[i], TSDB_FQDN_LEN); - } -} -static void tscDumpMgmtIpSet(SRpcIpSet *ipSet) { - taosCorBeginRead(&tscMgmtIpSet.version); - SRpcIpSet* src = &tscMgmtIpSet.ipSet; - tscIpSetCopy(ipSet, src); - taosCorEndRead(&tscMgmtIpSet.version); +static void tscDumpMgmtEpSet(SRpcEpSet *epSet) { + taosCorBeginRead(&tscMgmtEpSet.version); + *epSet = tscMgmtEpSet.epSet; + taosCorEndRead(&tscMgmtEpSet.version); } -static void tscIpSetHtons(SRpcIpSet *s) { - for (int32_t i = 0; i < s->numOfIps; i++) { +static void tscEpSetHtons(SRpcEpSet *s) { + for (int32_t i = 0; i < s->numOfEps; i++) { s->port[i] = htons(s->port[i]); } } -bool tscIpSetIsEqual(SRpcIpSet *s1, SRpcIpSet *s2) { - if (s1->numOfIps != s2->numOfIps || s1->inUse != s2->inUse) { +bool tscEpSetIsEqual(SRpcEpSet *s1, SRpcEpSet *s2) { + if (s1->numOfEps != s2->numOfEps || s1->inUse != s2->inUse) { return false; } - for (int32_t i = 0; i < s1->numOfIps; i++) { + for (int32_t i = 0; i < s1->numOfEps; i++) { if (s1->port[i] != s2->port[i] || strncmp(s1->fqdn[i], s2->fqdn[i], TSDB_FQDN_LEN) != 0) return false; } return true; } -void tscUpdateMgmtIpList(SRpcIpSet *pIpSet) { +void tscUpdateMgmtEpSet(SRpcEpSet *pEpSet) { // no need to update if equal - taosCorBeginWrite(&tscMgmtIpSet.version); - // or copy directly, tscMgmtIpSet.ipSet = *pIpSet - SRpcIpSet *mgmtIpSet = &tscMgmtIpSet.ipSet; - tscIpSetCopy(mgmtIpSet, pIpSet); - taosCorEndWrite(&tscMgmtIpSet.version); + taosCorBeginWrite(&tscMgmtEpSet.version); + tscMgmtEpSet.epSet = *pEpSet; + taosCorEndWrite(&tscMgmtEpSet.version); } -static void tscDumpIpSetFromVgroupInfo(SCMCorVgroupInfo *pVgroupInfo, SRpcIpSet *pIpSet) { +static void tscDumpEpSetFromVgroupInfo(SCMCorVgroupInfo *pVgroupInfo, SRpcEpSet *pEpSet) { if (pVgroupInfo == NULL) { return;} taosCorBeginRead(&pVgroupInfo->version); int8_t inUse = pVgroupInfo->inUse; - pIpSet->inUse = (inUse >= 0 && inUse < TSDB_MAX_REPLICA) ? inUse: 0; - pIpSet->numOfIps = pVgroupInfo->numOfIps; - for (int32_t i = 0; i < pVgroupInfo->numOfIps; ++i) { - strncpy(pIpSet->fqdn[i], pVgroupInfo->ipAddr[i].fqdn, TSDB_FQDN_LEN); - pIpSet->port[i] = pVgroupInfo->ipAddr[i].port; + pEpSet->inUse = (inUse >= 0 && inUse < TSDB_MAX_REPLICA) ? inUse: 0; + pEpSet->numOfEps = pVgroupInfo->numOfEps; + for (int32_t i = 0; i < pVgroupInfo->numOfEps; ++i) { + strncpy(pEpSet->fqdn[i], pVgroupInfo->epAddr[i].fqdn, TSDB_FQDN_LEN); + pEpSet->port[i] = pVgroupInfo->epAddr[i].port; } taosCorEndRead(&pVgroupInfo->version); } -static void tscUpdateVgroupInfo(SSqlObj *pObj, SRpcIpSet *pIpSet) { +static void tscUpdateVgroupInfo(SSqlObj *pObj, SRpcEpSet *pEpSet) { SSqlCmd *pCmd = &pObj->cmd; STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); if (pTableMetaInfo == NULL || pTableMetaInfo->pTableMeta == NULL) { return;} @@ -130,7 +119,7 @@ void tscPrintMgmtEp() { SRpcEpSet dump; tscDumpMgmtEpSet(&dump); if (dump.numOfEps <= 0) { - tscError("invalid mnode EP list:%d", dump.numOfEPs); + tscError("invalid mnode EP list:%d", dump.numOfEps); } else { for (int i = 0; i < dump.numOfEps; ++i) { tscDebug("mnode index:%d %s:%d", i, dump.fqdn[i], dump.port[i]); @@ -166,10 +155,10 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { if (code == 0) { SCMHeartBeatRsp *pRsp = (SCMHeartBeatRsp *)pRes->pRsp; - SRpcEpSet * pEpList = &pRsp->epList; - if (pEpList->numOfEps > 0) { - tscEpSetHtons(pEpList); - tscUpdateMgmtEpList(pEpList); + SRpcEpSet * epSet = &pRsp->epSet; + if (epSet->numOfEps > 0) { + tscEpSetHtons(epSet); + tscUpdateMgmtEpSet(epSet); } pSql->pTscObj->connId = htonl(pRsp->connId); @@ -242,7 +231,7 @@ int tscSendMsgToServer(SSqlObj *pSql) { // set the mgmt ip list if (pSql->cmd.command >= TSDB_SQL_MGMT) { - tscDumpMgmtEpSet(&pSql->epList); + tscDumpMgmtEpSet(&pSql->epSet); } memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen); @@ -296,11 +285,11 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { if (pEpSet) { //SRpcEpSet dump; tscEpSetHtons(pEpSet); - if (tscEpSetIsEqual(&pSql->epList, pEpSet)) { + if (tscEpSetIsEqual(&pSql->epSet, pEpSet)) { if(pCmd->command < TSDB_SQL_MGMT) { tscUpdateVgroupInfo(pSql, pEpSet); } else { - tscUpdateMgmtEpList(pEpSet); + tscUpdateMgmtEpSet(pEpSet); } } } @@ -589,7 +578,7 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) { // pSql->cmd.payloadLen is set during copying data into payload pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT; - tscDumpEpSetFromVgroupInfo(&pTableMeta->corVgroupInfo, &pSql->epList); + tscDumpEpSetFromVgroupInfo(&pTableMeta->corVgroupInfo, &pSql->epSet); tscDebug("%p build submit msg, vgId:%d numOfTables:%d numberOfEP:%d", pSql, vgId, pSql->cmd.numOfTablesInSubmit, pSql->epSet.numOfEps); @@ -631,7 +620,7 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char } else { pVgroupInfo = &pTableMeta->vgroupInfo; } - tscSetDnodeEpList(pSql, pVgroupInfo); + tscSetDnodeEpSet(pSql, pVgroupInfo); if (pVgroupInfo != NULL) { pQueryMsg->head.vgId = htonl(pVgroupInfo->vgId); @@ -654,7 +643,7 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, index); // set the vgroup info - tscSetDnodeEpList(pSql, &pTableIdList->vgInfo); + tscSetDnodeEpSet(pSql, &pTableIdList->vgInfo); pQueryMsg->head.vgId = htonl(pTableIdList->vgInfo.vgId); int32_t numOfTables = taosArrayGetSize(pTableIdList->itemList); @@ -1200,11 +1189,11 @@ int32_t tscBuildShowMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pShowMsg->payloadLen = htons(pPattern->n); } } else { - SSQLToken *pIpAddr = &pShowInfo->prefix; - assert(pIpAddr->n > 0 && pIpAddr->type > 0); + SSQLToken *pEpAddr = &pShowInfo->prefix; + assert(pEpAddr->n > 0 && pEpAddr->type > 0); - strncpy(pShowMsg->payload, pIpAddr->z, pIpAddr->n); - pShowMsg->payloadLen = htons(pIpAddr->n); + strncpy(pShowMsg->payload, pEpAddr->z, pEpAddr->n); + pShowMsg->payloadLen = htons(pEpAddr->n); } pCmd->payloadLen = sizeof(SCMShowMsg) + pShowMsg->payloadLen; @@ -1387,7 +1376,7 @@ int tscBuildUpdateTagMsg(SSqlObj* pSql, SSqlInfo *pInfo) { SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - tscDumpEpSetFromVgroupInfo(&pTableMetaInfo->pTableMeta->corVgroupInfo, &pSql->epList); + tscDumpEpSetFromVgroupInfo(&pTableMetaInfo->pTableMeta->corVgroupInfo, &pSql->epSet); return TSDB_CODE_SUCCESS; } @@ -2011,9 +2000,9 @@ int tscProcessConnectRsp(SSqlObj *pSql) { assert(len <= sizeof(pObj->db)); tstrncpy(pObj->db, temp, sizeof(pObj->db)); - if (pConnect->epList.numOfEps > 0) { - tscEpSetHtons(&pConnect->epList); - tscUpdateMgmtEpList(&pConnect->epList); + if (pConnect->epSet.numOfEps > 0) { + tscEpSetHtons(&pConnect->epSet); + tscUpdateMgmtEpSet(&pConnect->epSet); } strcpy(pObj->sversion, pConnect->serverVersion); diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 59b506a454..5848b7b82f 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -62,7 +62,7 @@ SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con } if (ip) { - if (tscSetMgmtEpListFromCfg(ip, NULL) < 0) return NULL; + if (tscSetMgmtEpSetFromCfg(ip, NULL) < 0) return NULL; if (port) tscMgmtEpSet.epSet.port[0] = port; } diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index c3cae58d8a..957bdeeb7f 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -2145,7 +2145,7 @@ char* strdup_throw(const char* str) { return p; } -int tscSetMgmtEpListFromCfg(const char *first, const char *second) { +int tscSetMgmtEpSetFromCfg(const char *first, const char *second) { // init mgmt ip set tscMgmtEpSet.version = 0; SRpcEpSet *mgmtEpSet = &(tscMgmtEpSet.epSet); diff --git a/src/inc/trpc.h b/src/inc/trpc.h index 41ed98fd70..bdee917b5e 100644 --- a/src/inc/trpc.h +++ b/src/inc/trpc.h @@ -35,10 +35,10 @@ typedef struct SRpcEpSet { char fqdn[TSDB_MAX_REPLICA][TSDB_FQDN_LEN]; } SRpcEpSet; -typedef struct SRpcCorIpSet { +typedef struct SRpcCorEpSet { int32_t version; - SRpcIpSet ipSet; -} SRpcCorIpSet; + SRpcEpSet epSet; +} SRpcCorEpSet; typedef struct SRpcConnInfo { uint32_t clientIp; From 4a6c170d96607251389741df7fbc8fc7ab5019ac Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sun, 19 Jul 2020 17:53:06 +0800 Subject: [PATCH 15/16] change hash data in sdb --- src/mnode/src/mnodeSdb.c | 79 ++++++++++++++-------------------------- 1 file changed, 27 insertions(+), 52 deletions(-) diff --git a/src/mnode/src/mnodeSdb.c b/src/mnode/src/mnodeSdb.c index 0f72dbdec4..42ded7ed06 100644 --- a/src/mnode/src/mnodeSdb.c +++ b/src/mnode/src/mnodeSdb.c @@ -77,11 +77,6 @@ typedef struct { pthread_mutex_t mutex; } SSdbObject; -typedef struct { - int32_t rowSize; - void * row; -} SSdbRow; - typedef struct { pthread_t thread; int32_t workerId; @@ -419,32 +414,28 @@ void sdbDecRef(void *handle, void *pObj) { } } -static SSdbRow *sdbGetRowMeta(SSdbTable *pTable, void *key) { +static void *sdbGetRowMeta(SSdbTable *pTable, void *key) { if (pTable == NULL) return NULL; int32_t keySize = sizeof(int32_t); if (pTable->keyType == SDB_KEY_STRING || pTable->keyType == SDB_KEY_VAR_STRING) { keySize = strlen((char *)key); } - - return taosHashGet(pTable->iHandle, key, keySize); + + void **ppRow = (void **)taosHashGet(pTable->iHandle, key, keySize); + if (ppRow == NULL) return NULL; + return *ppRow; } -static SSdbRow *sdbGetRowMetaFromObj(SSdbTable *pTable, void *key) { +static void *sdbGetRowMetaFromObj(SSdbTable *pTable, void *key) { return sdbGetRowMeta(pTable, sdbGetObjKey(pTable, key)); } void *sdbGetRow(void *handle, void *key) { - SSdbTable *pTable = (SSdbTable *)handle; - int32_t keySize = sizeof(int32_t); - if (pTable->keyType == SDB_KEY_STRING || pTable->keyType == SDB_KEY_VAR_STRING) { - keySize = strlen((char *)key); - } - - SSdbRow *pMeta = taosHashGet(pTable->iHandle, key, keySize); - if (pMeta) { - sdbIncRef(pTable, pMeta->row); - return pMeta->row; + void *pRow = sdbGetRowMeta(handle, key); + if (pRow) { + sdbIncRef(handle, pRow); + return pRow; } else { return NULL; } @@ -455,10 +446,6 @@ static void *sdbGetRowFromObj(SSdbTable *pTable, void *key) { } static int32_t sdbInsertHash(SSdbTable *pTable, SSdbOper *pOper) { - SSdbRow rowMeta; - rowMeta.rowSize = pOper->rowSize; - rowMeta.row = pOper->pObj; - void * key = sdbGetObjKey(pTable, pOper->pObj); int32_t keySize = sizeof(int32_t); @@ -466,7 +453,7 @@ static int32_t sdbInsertHash(SSdbTable *pTable, SSdbOper *pOper) { keySize = strlen((char *)key); } - taosHashPut(pTable->iHandle, key, keySize, &rowMeta, sizeof(SSdbRow)); + taosHashPut(pTable->iHandle, key, keySize, &pOper->pObj, sizeof(void **)); sdbIncRef(pTable, pOper->pObj); atomic_add_fetch_32(&pTable->numOfRows, 1); @@ -586,17 +573,17 @@ static int sdbWrite(void *param, void *data, int type) { code = (*pTable->decodeFp)(&oper); return sdbInsertHash(pTable, &oper); } else if (action == SDB_ACTION_DELETE) { - SSdbRow *rowMeta = sdbGetRowMeta(pTable, pHead->cont); - if (rowMeta == NULL || rowMeta->row == NULL) { + void *pRow = sdbGetRowMeta(pTable, pHead->cont); + if (pRow == NULL) { sdbError("table:%s, failed to get object:%s from wal while dispose delete action", pTable->tableName, pHead->cont); return TSDB_CODE_SUCCESS; } - SSdbOper oper = {.table = pTable, .pObj = rowMeta->row}; + SSdbOper oper = {.table = pTable, .pObj = pRow}; return sdbDeleteHash(pTable, &oper); } else if (action == SDB_ACTION_UPDATE) { - SSdbRow *rowMeta = sdbGetRowMeta(pTable, pHead->cont); - if (rowMeta == NULL || rowMeta->row == NULL) { + void *pRow = sdbGetRowMeta(pTable, pHead->cont); + if (pRow == NULL) { sdbError("table:%s, failed to get object:%s from wal while dispose update action", pTable->tableName, pHead->cont); return TSDB_CODE_SUCCESS; @@ -675,18 +662,12 @@ int32_t sdbDeleteRow(SSdbOper *pOper) { SSdbTable *pTable = (SSdbTable *)pOper->table; if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE; - SSdbRow *pMeta = sdbGetRowMetaFromObj(pTable, pOper->pObj); - if (pMeta == NULL) { + void *pRow = sdbGetRowMetaFromObj(pTable, pOper->pObj); + if (pRow == NULL) { sdbDebug("table:%s, record is not there, delete failed", pTable->tableName); return TSDB_CODE_MND_SDB_OBJ_NOT_THERE; } - void *pMetaRow = pMeta->row; - if (pMetaRow == NULL) { - sdbError("table:%s, record meta is null", pTable->tableName); - return TSDB_CODE_MND_SDB_INVAID_META_ROW; - } - sdbIncRef(pTable, pOper->pObj); int32_t code = sdbDeleteHash(pTable, pOper); @@ -728,18 +709,12 @@ int32_t sdbUpdateRow(SSdbOper *pOper) { SSdbTable *pTable = (SSdbTable *)pOper->table; if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE; - SSdbRow *pMeta = sdbGetRowMetaFromObj(pTable, pOper->pObj); - if (pMeta == NULL) { + void *pRow = sdbGetRowMetaFromObj(pTable, pOper->pObj); + if (pRow == NULL) { sdbDebug("table:%s, record is not there, update failed", pTable->tableName); return TSDB_CODE_MND_SDB_OBJ_NOT_THERE; } - void *pMetaRow = pMeta->row; - if (pMetaRow == NULL) { - sdbError("table:%s, record meta is null", pTable->tableName); - return TSDB_CODE_MND_SDB_INVAID_META_ROW; - } - int32_t code = sdbUpdateHash(pTable, pOper); if (code != TSDB_CODE_SUCCESS) { sdbError("table:%s, failed to update hash", pTable->tableName); @@ -789,14 +764,14 @@ void *sdbFetchRow(void *handle, void *pNode, void **ppRow) { return NULL; } - SSdbRow *pMeta = taosHashIterGet(pIter); - if (pMeta == NULL) { + void **ppMetaRow = taosHashIterGet(pIter); + if (ppMetaRow == NULL) { taosHashDestroyIter(pIter); return NULL; } - *ppRow = pMeta->row; - sdbIncRef(handle, pMeta->row); + *ppRow = *ppMetaRow; + sdbIncRef(handle, *ppMetaRow); return pIter; } @@ -846,11 +821,11 @@ void sdbCloseTable(void *handle) { SHashMutableIterator *pIter = taosHashCreateIter(pTable->iHandle); while (taosHashIterNext(pIter)) { - SSdbRow *pMeta = taosHashIterGet(pIter); - if (pMeta == NULL) continue; + void **ppRow = taosHashIterGet(pIter); + if (ppRow == NULL) continue; SSdbOper oper = { - .pObj = pMeta->row, + .pObj = *ppRow, .table = pTable, }; From 19e2109614435838e028f9db70c780ae5ea8239f Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sun, 19 Jul 2020 18:11:23 +0800 Subject: [PATCH 16/16] fix refcount error while drop stable --- src/mnode/src/mnodeTable.c | 41 +++++++++++++++++++------------------- 1 file changed, 20 insertions(+), 21 deletions(-) diff --git a/src/mnode/src/mnodeTable.c b/src/mnode/src/mnodeTable.c index 75ed442cd4..7478d7cd78 100644 --- a/src/mnode/src/mnodeTable.c +++ b/src/mnode/src/mnodeTable.c @@ -107,42 +107,41 @@ static int32_t mnodeChildTableActionInsert(SSdbOper *pOper) { SVgObj *pVgroup = mnodeGetVgroup(pTable->vgId); if (pVgroup == NULL) { mError("ctable:%s, not in vgId:%d", pTable->info.tableId, pTable->vgId); - return TSDB_CODE_MND_VGROUP_NOT_EXIST; } - mnodeDecVgroupRef(pVgroup); - SDbObj *pDb = mnodeGetDb(pVgroup->dbName); - if (pDb == NULL) { - mError("ctable:%s, vgId:%d not in db:%s", pTable->info.tableId, pVgroup->vgId, pVgroup->dbName); - return TSDB_CODE_MND_INVALID_DB; + SDbObj *pDb = NULL; + if (pVgroup != NULL) { + pDb = mnodeGetDb(pVgroup->dbName); + if (pDb == NULL) { + mError("ctable:%s, vgId:%d not in db:%s", pTable->info.tableId, pVgroup->vgId, pVgroup->dbName); + } } - - if (pDb->status != TSDB_DB_STATUS_READY) { - mError("db:%s, status:%d, in dropping", pDb->name, pDb->status); - return TSDB_CODE_MND_DB_IN_DROPPING; - } - mnodeDecDbRef(pDb); - SAcctObj *pAcct = mnodeGetAcct(pDb->acct); - if (pAcct == NULL) { - mError("ctable:%s, acct:%s not exists", pTable->info.tableId, pDb->acct); - return TSDB_CODE_MND_INVALID_ACCT; + SAcctObj *pAcct = NULL; + if (pDb != NULL) { + pAcct = mnodeGetAcct(pDb->acct); + if (pAcct == NULL) { + mError("ctable:%s, acct:%s not exists", pTable->info.tableId, pDb->acct); + } } - mnodeDecAcctRef(pAcct); if (pTable->info.type == TSDB_CHILD_TABLE) { // add ref pTable->superTable = mnodeGetSuperTableByUid(pTable->suid); mnodeAddTableIntoStable(pTable->superTable, pTable); grantAdd(TSDB_GRANT_TIMESERIES, pTable->superTable->numOfColumns - 1); - pAcct->acctInfo.numOfTimeSeries += (pTable->superTable->numOfColumns - 1); + if (pAcct) pAcct->acctInfo.numOfTimeSeries += (pTable->superTable->numOfColumns - 1); } else { grantAdd(TSDB_GRANT_TIMESERIES, pTable->numOfColumns - 1); - pAcct->acctInfo.numOfTimeSeries += (pTable->numOfColumns - 1); + if (pAcct) pAcct->acctInfo.numOfTimeSeries += (pTable->numOfColumns - 1); } - mnodeAddTableIntoDb(pDb); - mnodeAddTableIntoVgroup(pVgroup, pTable); + if (pDb) mnodeAddTableIntoDb(pDb); + if (pVgroup) mnodeAddTableIntoVgroup(pVgroup, pTable); + + mnodeDecVgroupRef(pVgroup); + mnodeDecDbRef(pDb); + mnodeDecAcctRef(pAcct); return TSDB_CODE_SUCCESS; }