support multi thread update dnode
This commit is contained in:
parent
c877418df0
commit
3d10b6f81d
|
@ -157,6 +157,9 @@ STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg, size_t* size
|
||||||
pTableMeta->sid = pTableMetaMsg->sid;
|
pTableMeta->sid = pTableMetaMsg->sid;
|
||||||
pTableMeta->uid = pTableMetaMsg->uid;
|
pTableMeta->uid = pTableMetaMsg->uid;
|
||||||
pTableMeta->vgroupInfo = pTableMetaMsg->vgroup;
|
pTableMeta->vgroupInfo = pTableMetaMsg->vgroup;
|
||||||
|
//init version here
|
||||||
|
pTableMeta->vgroupInfo.version = 0;
|
||||||
|
|
||||||
pTableMeta->sversion = pTableMetaMsg->sversion;
|
pTableMeta->sversion = pTableMetaMsg->sversion;
|
||||||
pTableMeta->tversion = pTableMetaMsg->tversion;
|
pTableMeta->tversion = pTableMetaMsg->tversion;
|
||||||
|
|
||||||
|
|
|
@ -51,8 +51,7 @@ static void tscDumpMgmtIpSet(SRpcIpSet *ipSet) {
|
||||||
}
|
}
|
||||||
|
|
||||||
bool tscIpSetIsEqual(SRpcIpSet *s1, SRpcIpSet *s2) {
|
bool tscIpSetIsEqual(SRpcIpSet *s1, SRpcIpSet *s2) {
|
||||||
if (s1->numOfIps != s2->numOfIps
|
if (s1->numOfIps != s2->numOfIps /*|| s1->inUse != s1->inUse*/) {
|
||||||
|| s1->inUse != s1->inUse) {
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
for (int32_t i = 0; i < s1->numOfIps; i++) {
|
for (int32_t i = 0; i < s1->numOfIps; i++) {
|
||||||
|
@ -80,21 +79,38 @@ void tscSetMgmtIpList(SRpcIpSet *pIpSet) {
|
||||||
}
|
}
|
||||||
taosCorEndWrite(&tscMgmtIpSet.version);
|
taosCorEndWrite(&tscMgmtIpSet.version);
|
||||||
}
|
}
|
||||||
static void tscSetDnodeIpList(SSqlObj* pSql, SCMVgroupInfo* pVgroupInfo) {
|
static void tscDumpIpSetFromVgroupInfo(SCMVgroupInfo *pVgroupInfo, SRpcIpSet *pIpSet, int32_t *vgId) {
|
||||||
SRpcIpSet* pIpList = &pSql->ipList;
|
if (pVgroupInfo == NULL) { return;}
|
||||||
pIpList->inUse = 0;
|
taosCorBeginRead(&pVgroupInfo->version);
|
||||||
if (pVgroupInfo == NULL) {
|
if (vgId) {
|
||||||
pIpList->numOfIps = 0;
|
*vgId = pVgroupInfo->vgId;
|
||||||
|
}
|
||||||
|
pIpSet->inUse = 0;
|
||||||
|
pIpSet->numOfIps = pVgroupInfo->numOfIps;
|
||||||
|
for (int32_t i = 0; i < pVgroupInfo->numOfIps; ++i) {
|
||||||
|
strncpy(pIpSet->fqdn[i], pVgroupInfo->ipAddr[i].fqdn, TSDB_FQDN_LEN);
|
||||||
|
pIpSet->port[i] = pVgroupInfo->ipAddr[i].port;
|
||||||
|
}
|
||||||
|
taosCorEndRead(&pVgroupInfo->version);
|
||||||
|
}
|
||||||
|
static void tscSetVgroupInfoWithIpSet(SCMVgroupInfo *pVgroupInfo, SRpcIpSet *pIpSet) {
|
||||||
|
taosCorBeginWrite(&pVgroupInfo->version);
|
||||||
|
//TODO(dengyihao), dont care vgid
|
||||||
|
pVgroupInfo->numOfIps = pIpSet->numOfIps;
|
||||||
|
for (int32_t i = 0; pVgroupInfo->numOfIps; i++) {
|
||||||
|
strncpy(pVgroupInfo->ipAddr[i].fqdn, pIpSet->fqdn[i], TSDB_FQDN_LEN);
|
||||||
|
pVgroupInfo->ipAddr[i].port = pIpSet->port[i];
|
||||||
|
}
|
||||||
|
taosCorEndWrite(&pVgroupInfo->version);
|
||||||
|
}
|
||||||
|
static void tscSetVgroupInfo(SSqlObj *pObj, SRpcIpSet *pIpSet) {
|
||||||
|
if (tscIpSetIsEqual(&pObj->ipList, pIpSet)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
SSqlCmd *pCmd = &pObj->cmd;
|
||||||
pIpList->numOfIps = pVgroupInfo->numOfIps;
|
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
|
||||||
for(int32_t i = 0; i < pVgroupInfo->numOfIps; ++i) {
|
tscSetVgroupInfoWithIpSet(&pTableMetaInfo->pTableMeta->vgroupInfo, pIpSet);
|
||||||
strcpy(pIpList->fqdn[i], pVgroupInfo->ipAddr[i].fqdn);
|
|
||||||
pIpList->port[i] = pVgroupInfo->ipAddr[i].port;
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
void tscPrintMgmtIp() {
|
void tscPrintMgmtIp() {
|
||||||
SRpcIpSet dump;
|
SRpcIpSet dump;
|
||||||
tscDumpMgmtIpSet(&dump);
|
tscDumpMgmtIpSet(&dump);
|
||||||
|
@ -263,7 +279,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pCmd->command < TSDB_SQL_MGMT) {
|
if (pCmd->command < TSDB_SQL_MGMT) {
|
||||||
if (pIpSet) pSql->ipList = *pIpSet;
|
if (pIpSet) tscSetVgroupInfo(pSql, pIpSet);
|
||||||
} else {
|
} else {
|
||||||
if (pIpSet) tscSetMgmtIpList(pIpSet);
|
if (pIpSet) tscSetMgmtIpList(pIpSet);
|
||||||
}
|
}
|
||||||
|
@ -553,7 +569,7 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
||||||
|
|
||||||
// pSql->cmd.payloadLen is set during copying data into payload
|
// pSql->cmd.payloadLen is set during copying data into payload
|
||||||
pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT;
|
pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT;
|
||||||
tscSetDnodeIpList(pSql, &pTableMeta->vgroupInfo);
|
tscDumpIpSetFromVgroupInfo(&pTableMeta->vgroupInfo, &pSql->ipList, NULL);
|
||||||
|
|
||||||
tscDebug("%p build submit msg, vgId:%d numOfTables:%d numberOfIP:%d", pSql, vgId, pSql->cmd.numOfTablesInSubmit,
|
tscDebug("%p build submit msg, vgId:%d numOfTables:%d numberOfIP:%d", pSql, vgId, pSql->cmd.numOfTablesInSubmit,
|
||||||
pSql->ipList.numOfIps);
|
pSql->ipList.numOfIps);
|
||||||
|
@ -595,11 +611,9 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char
|
||||||
} else {
|
} else {
|
||||||
pVgroupInfo = &pTableMeta->vgroupInfo;
|
pVgroupInfo = &pTableMeta->vgroupInfo;
|
||||||
}
|
}
|
||||||
|
int32_t vgId = 0;
|
||||||
tscSetDnodeIpList(pSql, pVgroupInfo);
|
tscDumpIpSetFromVgroupInfo(pVgroupInfo, &pSql->ipList, &vgId);
|
||||||
if (pVgroupInfo != NULL) {
|
pQueryMsg->head.vgId = htonl(vgId);
|
||||||
pQueryMsg->head.vgId = htonl(pVgroupInfo->vgId);
|
|
||||||
}
|
|
||||||
|
|
||||||
STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg;
|
STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg;
|
||||||
pTableIdInfo->tid = htonl(pTableMeta->sid);
|
pTableIdInfo->tid = htonl(pTableMeta->sid);
|
||||||
|
@ -618,8 +632,9 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char
|
||||||
SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, index);
|
SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, index);
|
||||||
|
|
||||||
// set the vgroup info
|
// set the vgroup info
|
||||||
tscSetDnodeIpList(pSql, &pTableIdList->vgInfo);
|
int32_t vgId = 0;
|
||||||
pQueryMsg->head.vgId = htonl(pTableIdList->vgInfo.vgId);
|
tscDumpIpSetFromVgroupInfo(&pTableIdList->vgInfo, &pSql->ipList, &vgId);
|
||||||
|
pQueryMsg->head.vgId = htonl(vgId);
|
||||||
|
|
||||||
int32_t numOfTables = taosArrayGetSize(pTableIdList->itemList);
|
int32_t numOfTables = taosArrayGetSize(pTableIdList->itemList);
|
||||||
pQueryMsg->numOfTables = htonl(numOfTables); // set the number of tables
|
pQueryMsg->numOfTables = htonl(numOfTables); // set the number of tables
|
||||||
|
@ -1351,7 +1366,7 @@ int tscBuildUpdateTagMsg(SSqlObj* pSql, SSqlInfo *pInfo) {
|
||||||
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
|
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
|
||||||
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
||||||
|
|
||||||
tscSetDnodeIpList(pSql, &pTableMetaInfo->pTableMeta->vgroupInfo);
|
tscDumpIpSetFromVgroupInfo(&pTableMetaInfo->pTableMeta->vgroupInfo, &pSql->ipList, NULL);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -1875,11 +1890,11 @@ int tscProcessSTableVgroupRsp(SSqlObj *pSql) {
|
||||||
|
|
||||||
memcpy(pInfo->vgroupList, pVgroupInfo, size);
|
memcpy(pInfo->vgroupList, pVgroupInfo, size);
|
||||||
for (int32_t j = 0; j < pInfo->vgroupList->numOfVgroups; ++j) {
|
for (int32_t j = 0; j < pInfo->vgroupList->numOfVgroups; ++j) {
|
||||||
|
//just init, no need to lock
|
||||||
SCMVgroupInfo *pVgroups = &pInfo->vgroupList->vgroups[j];
|
SCMVgroupInfo *pVgroups = &pInfo->vgroupList->vgroups[j];
|
||||||
|
pVgroups->version = 0;
|
||||||
pVgroups->vgId = htonl(pVgroups->vgId);
|
pVgroups->vgId = htonl(pVgroups->vgId);
|
||||||
assert(pVgroups->numOfIps >= 1);
|
assert(pVgroups->numOfIps >= 1);
|
||||||
|
|
||||||
for (int32_t k = 0; k < pVgroups->numOfIps; ++k) {
|
for (int32_t k = 0; k < pVgroups->numOfIps; ++k) {
|
||||||
pVgroups->ipAddr[k].port = htons(pVgroups->ipAddr[k].port);
|
pVgroups->ipAddr[k].port = htons(pVgroups->ipAddr[k].port);
|
||||||
}
|
}
|
||||||
|
|
|
@ -647,6 +647,7 @@ typedef struct SCMSTableVgroupMsg {
|
||||||
} SCMSTableVgroupMsg, SCMSTableVgroupRspMsg;
|
} SCMSTableVgroupMsg, SCMSTableVgroupRspMsg;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
int32_t version;
|
||||||
int32_t vgId;
|
int32_t vgId;
|
||||||
int8_t numOfIps;
|
int8_t numOfIps;
|
||||||
SIpAddr ipAddr[TSDB_MAX_REPLICA];
|
SIpAddr ipAddr[TSDB_MAX_REPLICA];
|
||||||
|
|
Loading…
Reference in New Issue