[td-4151]
This commit is contained in:
parent
6c4c8e37fd
commit
c62e036042
|
@ -619,7 +619,10 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
|
||||||
|
|
||||||
case TSDB_SQL_SELECT: {
|
case TSDB_SQL_SELECT: {
|
||||||
const char* msg1 = "columns in select clause not identical";
|
const char* msg1 = "columns in select clause not identical";
|
||||||
loadAllTableMeta(pSql, pInfo);
|
int32_t code = loadAllTableMeta(pSql, pInfo);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
SQueryInfo* pCurrent = pCmd->pQueryInfo;
|
SQueryInfo* pCurrent = pCmd->pQueryInfo;
|
||||||
for(int32_t i = 0; i < pCmd->clauseIndex; ++i) {
|
for(int32_t i = 0; i < pCmd->clauseIndex; ++i) {
|
||||||
|
@ -7128,7 +7131,7 @@ static int32_t getTableNameFromSubquery(SSqlNode* pSqlNode, SArray* tableNameLis
|
||||||
}
|
}
|
||||||
|
|
||||||
static void freeElem(void* p) {
|
static void freeElem(void* p) {
|
||||||
tfree(p);
|
tfree(*(char**)p);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t loadAllTableMeta(SSqlObj* pSql, struct SSqlInfo* pInfo) {
|
int32_t loadAllTableMeta(SSqlObj* pSql, struct SSqlInfo* pInfo) {
|
||||||
|
@ -7252,7 +7255,10 @@ static int32_t doLoadAllTableMeta(SSqlObj* pSql, SQueryInfo* pQueryInfo, SSqlNod
|
||||||
strncpy(pTableMetaInfo->aliasName, tNameGetTableName(&pTableMetaInfo->name), tListLen(pTableMetaInfo->aliasName));
|
strncpy(pTableMetaInfo->aliasName, tNameGetTableName(&pTableMetaInfo->name), tListLen(pTableMetaInfo->aliasName));
|
||||||
}
|
}
|
||||||
|
|
||||||
code = tscGetTableMeta(pSql, pTableMetaInfo);
|
const char* name = tNameGetTableName(&pTableMetaInfo->name);
|
||||||
|
pTableMetaInfo->pTableMeta = taosHashGet(pCmd->pTableMetaMap, name, strlen(name));
|
||||||
|
assert(pTableMetaInfo->pTableMeta != NULL);
|
||||||
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -7376,13 +7382,13 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
|
||||||
} else {
|
} else {
|
||||||
pQueryInfo->command = TSDB_SQL_SELECT;
|
pQueryInfo->command = TSDB_SQL_SELECT;
|
||||||
|
|
||||||
size_t fromSize = taosArrayGetSize(pSqlNode->from->list);
|
size_t numOfTables = taosArrayGetSize(pSqlNode->from->list);
|
||||||
if (fromSize > TSDB_MAX_JOIN_TABLE_NUM) {
|
if (numOfTables > TSDB_MAX_JOIN_TABLE_NUM) {
|
||||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
|
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
|
||||||
}
|
}
|
||||||
|
|
||||||
// set all query tables, which are maybe more than one.
|
// set all query tables, which are maybe more than one.
|
||||||
code = doLoadAllTableMeta(pSql, pQueryInfo, pSqlNode, (int32_t) fromSize);
|
code = doLoadAllTableMeta(pSql, pQueryInfo, pSqlNode, (int32_t) numOfTables);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1822,18 +1822,16 @@ int tscBuildHeartBeatMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int tscProcessTableMetaRsp(SSqlObj *pSql) {
|
static int32_t tableMetaMsgConvert(STableMetaMsg* pMetaMsg) {
|
||||||
STableMetaMsg *pMetaMsg = (STableMetaMsg *)pSql->res.pRsp;
|
|
||||||
|
|
||||||
pMetaMsg->tid = htonl(pMetaMsg->tid);
|
pMetaMsg->tid = htonl(pMetaMsg->tid);
|
||||||
pMetaMsg->sversion = htons(pMetaMsg->sversion);
|
pMetaMsg->sversion = htons(pMetaMsg->sversion);
|
||||||
pMetaMsg->tversion = htons(pMetaMsg->tversion);
|
pMetaMsg->tversion = htons(pMetaMsg->tversion);
|
||||||
pMetaMsg->vgroup.vgId = htonl(pMetaMsg->vgroup.vgId);
|
pMetaMsg->vgroup.vgId = htonl(pMetaMsg->vgroup.vgId);
|
||||||
|
|
||||||
pMetaMsg->uid = htobe64(pMetaMsg->uid);
|
pMetaMsg->uid = htobe64(pMetaMsg->uid);
|
||||||
pMetaMsg->contLen = htons(pMetaMsg->contLen);
|
pMetaMsg->contLen = htons(pMetaMsg->contLen);
|
||||||
pMetaMsg->numOfColumns = htons(pMetaMsg->numOfColumns);
|
pMetaMsg->numOfColumns = htons(pMetaMsg->numOfColumns);
|
||||||
|
|
||||||
if ((pMetaMsg->tableType != TSDB_SUPER_TABLE) &&
|
if ((pMetaMsg->tableType != TSDB_SUPER_TABLE) &&
|
||||||
(pMetaMsg->tid <= 0 || pMetaMsg->vgroup.vgId < 2 || pMetaMsg->vgroup.numOfEps <= 0)) {
|
(pMetaMsg->tid <= 0 || pMetaMsg->vgroup.vgId < 2 || pMetaMsg->vgroup.numOfEps <= 0)) {
|
||||||
tscError("invalid value in table numOfEps:%d, vgId:%d tid:%d, name:%s", pMetaMsg->vgroup.numOfEps, pMetaMsg->vgroup.vgId,
|
tscError("invalid value in table numOfEps:%d, vgId:%d tid:%d, name:%s", pMetaMsg->vgroup.numOfEps, pMetaMsg->vgroup.vgId,
|
||||||
|
@ -1868,21 +1866,34 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) {
|
||||||
|
|
||||||
pSchema++;
|
pSchema++;
|
||||||
}
|
}
|
||||||
|
|
||||||
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
|
|
||||||
assert(pTableMetaInfo->pTableMeta == NULL);
|
|
||||||
|
|
||||||
STableMeta* pTableMeta = tscCreateTableMetaFromMsg(pMetaMsg);
|
return TSDB_CODE_SUCCESS;
|
||||||
if (!tIsValidSchema(pTableMeta->schema, pTableMeta->tableInfo.numOfColumns, pTableMeta->tableInfo.numOfTags)) {
|
}
|
||||||
tscError("0x%"PRIx64" invalid table meta from mnode, name:%s", pSql->self, tNameGetTableName(&pTableMetaInfo->name));
|
|
||||||
return TSDB_CODE_TSC_INVALID_VALUE;
|
// update the vgroupInfo if needed
|
||||||
|
static void doUpdateVgroupInfo(STableMeta *pTableMeta, SVgroupMsg *pVgroupMsg) {
|
||||||
|
if (pTableMeta->vgId > 0) {
|
||||||
|
int32_t vgId = pTableMeta->vgId;
|
||||||
|
assert(pTableMeta->tableType != TSDB_SUPER_TABLE);
|
||||||
|
|
||||||
|
SNewVgroupInfo vgroupInfo = {.inUse = -1};
|
||||||
|
taosHashGetClone(tscVgroupMap, &vgId, sizeof(vgId), NULL, &vgroupInfo, sizeof(SNewVgroupInfo));
|
||||||
|
|
||||||
|
// vgroup info exists, compare with it
|
||||||
|
if (((vgroupInfo.inUse >= 0) && !vgroupInfoIdentical(&vgroupInfo, pVgroupMsg)) || (vgroupInfo.inUse < 0)) {
|
||||||
|
vgroupInfo = createNewVgroupInfo(pVgroupMsg);
|
||||||
|
taosHashPut(tscVgroupMap, &vgId, sizeof(vgId), &vgroupInfo, sizeof(vgroupInfo));
|
||||||
|
tscDebug("add new VgroupInfo, vgId:%d, total cached:%d", vgId, (int32_t) taosHashGetSize(tscVgroupMap));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void doAddTableMetaLocalBuf(STableMeta* pTableMeta, STableMetaMsg* pMetaMsg, bool updateSTable) {
|
||||||
if (pTableMeta->tableType == TSDB_CHILD_TABLE) {
|
if (pTableMeta->tableType == TSDB_CHILD_TABLE) {
|
||||||
// check if super table hashmap or not
|
// add or update the corresponding super table meta data info
|
||||||
int32_t len = (int32_t) strnlen(pTableMeta->sTableName, TSDB_TABLE_FNAME_LEN);
|
int32_t len = (int32_t) strnlen(pTableMeta->sTableName, TSDB_TABLE_FNAME_LEN);
|
||||||
|
|
||||||
// super tableMeta data alreay exists, create it according to tableMeta and add it to hash map
|
// The super tableMeta already exists, create it according to tableMeta and add it to hash map
|
||||||
STableMeta* pSupTableMeta = createSuperTableMeta(pMetaMsg);
|
STableMeta* pSupTableMeta = createSuperTableMeta(pMetaMsg);
|
||||||
|
|
||||||
uint32_t size = tscGetTableMetaSize(pSupTableMeta);
|
uint32_t size = tscGetTableMetaSize(pSupTableMeta);
|
||||||
|
@ -1892,37 +1903,37 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) {
|
||||||
tfree(pSupTableMeta);
|
tfree(pSupTableMeta);
|
||||||
|
|
||||||
CChildTableMeta* cMeta = tscCreateChildMeta(pTableMeta);
|
CChildTableMeta* cMeta = tscCreateChildMeta(pTableMeta);
|
||||||
|
taosHashPut(tscTableMetaInfo, pMetaMsg->tableFname, strlen(pMetaMsg->tableFname), cMeta, sizeof(CChildTableMeta));
|
||||||
char name[TSDB_TABLE_FNAME_LEN] = {0};
|
|
||||||
tNameExtractFullName(&pTableMetaInfo->name, name);
|
|
||||||
|
|
||||||
taosHashPut(tscTableMetaInfo, name, strlen(name), cMeta, sizeof(CChildTableMeta));
|
|
||||||
tfree(cMeta);
|
tfree(cMeta);
|
||||||
} else {
|
} else {
|
||||||
uint32_t s = tscGetTableMetaSize(pTableMeta);
|
uint32_t s = tscGetTableMetaSize(pTableMeta);
|
||||||
|
taosHashPut(tscTableMetaInfo, pMetaMsg->tableFname, strlen(pMetaMsg->tableFname), pTableMeta, s);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
char name[TSDB_TABLE_FNAME_LEN] = {0};
|
int tscProcessTableMetaRsp(SSqlObj *pSql) {
|
||||||
tNameExtractFullName(&pTableMetaInfo->name, name);
|
STableMetaMsg *pMetaMsg = (STableMetaMsg *)pSql->res.pRsp;
|
||||||
|
int32_t code = tableMetaMsgConvert(pMetaMsg);
|
||||||
taosHashPut(tscTableMetaInfo, name, strlen(name), pTableMeta, s);
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
// update the vgroupInfo if needed
|
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
|
||||||
if (pTableMeta->vgId > 0) {
|
assert(pTableMetaInfo->pTableMeta == NULL);
|
||||||
int32_t vgId = pTableMeta->vgId;
|
|
||||||
assert(pTableMeta->tableType != TSDB_SUPER_TABLE);
|
|
||||||
|
|
||||||
SNewVgroupInfo vgroupInfo = {.inUse = -1};
|
STableMeta* pTableMeta = tscCreateTableMetaFromMsg(pMetaMsg);
|
||||||
taosHashGetClone(tscVgroupMap, &vgId, sizeof(vgId), NULL, &vgroupInfo, sizeof(SNewVgroupInfo));
|
if (!tIsValidSchema(pTableMeta->schema, pTableMeta->tableInfo.numOfColumns, pTableMeta->tableInfo.numOfTags)) {
|
||||||
|
tscError("0x%"PRIx64" invalid table meta from mnode, name:%s", pSql->self, tNameGetTableName(&pTableMetaInfo->name));
|
||||||
if (((vgroupInfo.inUse >= 0) && !vgroupInfoIdentical(&vgroupInfo, &pMetaMsg->vgroup)) ||
|
return TSDB_CODE_TSC_INVALID_VALUE;
|
||||||
(vgroupInfo.inUse < 0)) { // vgroup info exists, compare with it
|
|
||||||
vgroupInfo = createNewVgroupInfo(&pMetaMsg->vgroup);
|
|
||||||
taosHashPut(tscVgroupMap, &vgId, sizeof(vgId), &vgroupInfo, sizeof(vgroupInfo));
|
|
||||||
tscDebug("add new VgroupInfo, vgId:%d, total cached:%d", vgId, (int32_t) taosHashGetSize(tscVgroupMap));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
char name[TSDB_TABLE_FNAME_LEN] = {0};
|
||||||
|
tNameExtractFullName(&pTableMetaInfo->name, name);
|
||||||
|
assert(strncmp(pMetaMsg->tableFname, name, tListLen(pMetaMsg->tableFname)) == 0);
|
||||||
|
|
||||||
|
doAddTableMetaLocalBuf(pTableMeta, pMetaMsg, true);
|
||||||
|
doUpdateVgroupInfo(pTableMeta, &pMetaMsg->vgroup);
|
||||||
|
|
||||||
tscDebug("0x%"PRIx64" recv table meta, uid:%" PRIu64 ", tid:%d, name:%s", pSql->self, pTableMeta->id.uid, pTableMeta->id.tid,
|
tscDebug("0x%"PRIx64" recv table meta, uid:%" PRIu64 ", tid:%d, name:%s", pSql->self, pTableMeta->id.uid, pTableMeta->id.tid,
|
||||||
tNameGetTableName(&pTableMetaInfo->name));
|
tNameGetTableName(&pTableMetaInfo->name));
|
||||||
|
|
||||||
|
@ -1930,109 +1941,66 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/*
|
||||||
* multi table meta rsp pkg format:
|
* multi table meta rsp pkg format:
|
||||||
* | STaosRsp | SMultiTableInfoMsg | SMeterMeta0 | SSchema0 | SMeterMeta1 | SSchema1 | SMeterMeta2 | SSchema2
|
* | SMultiTableInfoMsg | SMeterMeta0 | SSchema0 | SMeterMeta1 | SSchema1 | SMeterMeta2 | SSchema2
|
||||||
* |...... 1B 4B
|
* | 4B
|
||||||
**/
|
*/
|
||||||
int tscProcessMultiMeterMetaRsp(SSqlObj *pSql) {
|
int tscProcessMultiTableMetaRsp(SSqlObj *pSql) {
|
||||||
#if 0
|
|
||||||
char *rsp = pSql->res.pRsp;
|
char *rsp = pSql->res.pRsp;
|
||||||
|
|
||||||
ieType = *rsp;
|
SMultiTableMeta *pMultiMeta = (SMultiTableMeta *)rsp;
|
||||||
if (ieType != TSDB_IE_TYPE_META) {
|
pMultiMeta->numOfTables = htonl(pMultiMeta->numOfTables);
|
||||||
tscError("invalid ie type:%d", ieType);
|
|
||||||
pSql->res.code = TSDB_CODE_TSC_INVALID_IE;
|
|
||||||
pSql->res.numOfTotal = 0;
|
|
||||||
return TSDB_CODE_TSC_APP_ERROR;
|
|
||||||
}
|
|
||||||
|
|
||||||
rsp++;
|
rsp += sizeof(SMultiTableMeta);
|
||||||
|
|
||||||
SMultiTableInfoMsg *pInfo = (SMultiTableInfoMsg *)rsp;
|
SSqlObj* pParentSql = (SSqlObj*)taosAcquireRef(tscObjRef, (int64_t)pSql->param);
|
||||||
totalNum = htonl(pInfo->numOfTables);
|
SSqlCmd *pParentCmd = &pParentSql->cmd;
|
||||||
rsp += sizeof(SMultiTableInfoMsg);
|
|
||||||
|
|
||||||
for (i = 0; i < totalNum; i++) {
|
SHashObj *pSet = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
||||||
SMultiTableMeta *pMultiMeta = (SMultiTableMeta *)rsp;
|
|
||||||
STableMeta * pMeta = pMultiMeta->metas;
|
|
||||||
|
|
||||||
pMeta->sid = htonl(pMeta->sid);
|
for (int32_t i = 0; i < pMultiMeta->numOfTables; i++) {
|
||||||
pMeta->sversion = htons(pMeta->sversion);
|
STableMetaMsg *pMetaMsg = (STableMetaMsg *)pMultiMeta->meta;
|
||||||
pMeta->vgId = htonl(pMeta->vgId);
|
int32_t code = tableMetaMsgConvert(pMetaMsg);
|
||||||
pMeta->uid = htobe64(pMeta->uid);
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
if (pMeta->sid <= 0 || pMeta->vgId < 0) {
|
|
||||||
tscError("invalid meter vgId:%d, sid%d", pMeta->vgId, pMeta->sid);
|
|
||||||
pSql->res.code = TSDB_CODE_TSC_INVALID_VALUE;
|
|
||||||
pSql->res.numOfTotal = i;
|
|
||||||
return TSDB_CODE_TSC_APP_ERROR;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// pMeta->numOfColumns = htons(pMeta->numOfColumns);
|
STableMeta* pTableMeta = tscCreateTableMetaFromMsg(pMetaMsg);
|
||||||
//
|
if (!tIsValidSchema(pTableMeta->schema, pTableMeta->tableInfo.numOfColumns, pTableMeta->tableInfo.numOfTags)) {
|
||||||
// if (pMeta->numOfTags > TSDB_MAX_TAGS || pMeta->numOfTags < 0) {
|
tscError("0x%"PRIx64" invalid table meta from mnode, name:%s", pSql->self, pMetaMsg->tableFname);
|
||||||
// tscError("invalid tag value count:%d", pMeta->numOfTags);
|
return TSDB_CODE_TSC_INVALID_VALUE;
|
||||||
// pSql->res.code = TSDB_CODE_TSC_INVALID_VALUE;
|
}
|
||||||
// pSql->res.numOfTotal = i;
|
|
||||||
// return TSDB_CODE_TSC_APP_ERROR;
|
int32_t t = tscGetTableMetaSize(pTableMeta);
|
||||||
// }
|
|
||||||
//
|
SName sn = {0};
|
||||||
// if (pMeta->numOfTags > TSDB_MAX_TAGS || pMeta->numOfTags < 0) {
|
tNameFromString(&sn, pMetaMsg->tableFname, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
||||||
// tscError("invalid numOfTags:%d", pMeta->numOfTags);
|
const char* tableName = tNameGetTableName(&sn);
|
||||||
// pSql->res.code = TSDB_CODE_TSC_INVALID_VALUE;
|
int32_t keyLen = strlen(tableName);
|
||||||
// pSql->res.numOfTotal = i;
|
taosHashPut(pParentCmd->pTableMetaMap, tableName, keyLen, pTableMeta, t);
|
||||||
// return TSDB_CODE_TSC_APP_ERROR;
|
|
||||||
// }
|
bool addToBuf = false;
|
||||||
//
|
if (taosHashGet(pSet, &pMetaMsg->uid, sizeof(pMetaMsg->uid)) == NULL) {
|
||||||
// if (pMeta->numOfColumns > TSDB_MAX_COLUMNS || pMeta->numOfColumns < 0) {
|
addToBuf = true;
|
||||||
// tscError("invalid numOfColumns:%d", pMeta->numOfColumns);
|
taosHashPut(pSet, &pMetaMsg->uid, sizeof(pMetaMsg->uid), "", 0);
|
||||||
// pSql->res.code = TSDB_CODE_TSC_INVALID_VALUE;
|
}
|
||||||
// pSql->res.numOfTotal = i;
|
|
||||||
// return TSDB_CODE_TSC_APP_ERROR;
|
// create the tableMeta and add it into the TableMeta map
|
||||||
// }
|
doAddTableMetaLocalBuf(pTableMeta, pMetaMsg, addToBuf);
|
||||||
//
|
|
||||||
// for (int j = 0; j < TSDB_REPLICA_MAX_NUM; ++j) {
|
// if the vgroup is not updated in current process, update it.
|
||||||
// pMeta->vpeerDesc[j].vnode = htonl(pMeta->vpeerDesc[j].vnode);
|
int64_t vgId = pMetaMsg->vgroup.vgId;
|
||||||
// }
|
if (taosHashGet(pSet, &vgId, sizeof(vgId)) == NULL) {
|
||||||
//
|
doUpdateVgroupInfo(pTableMeta, &pMetaMsg->vgroup);
|
||||||
// pMeta->rowSize = 0;
|
taosHashPut(pSet, &vgId, sizeof(vgId), "", 0);
|
||||||
// rsp += sizeof(SMultiTableMeta);
|
}
|
||||||
// pSchema = (SSchema *)rsp;
|
|
||||||
//
|
|
||||||
// int32_t numOfTotalCols = pMeta->numOfColumns + pMeta->numOfTags;
|
|
||||||
// for (int j = 0; j < numOfTotalCols; ++j) {
|
|
||||||
// pSchema->bytes = htons(pSchema->bytes);
|
|
||||||
// pSchema->colId = htons(pSchema->colId);
|
|
||||||
//
|
|
||||||
// // ignore the tags length
|
|
||||||
// if (j < pMeta->numOfColumns) {
|
|
||||||
// pMeta->rowSize += pSchema->bytes;
|
|
||||||
// }
|
|
||||||
// pSchema++;
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// rsp += numOfTotalCols * sizeof(SSchema);
|
|
||||||
//
|
|
||||||
// int32_t tagLen = 0;
|
|
||||||
// SSchema *pTagsSchema = tscGetTableTagSchema(pMeta);
|
|
||||||
//
|
|
||||||
// if (pMeta->tableType == TSDB_CHILD_TABLE) {
|
|
||||||
// for (int32_t j = 0; j < pMeta->numOfTags; ++j) {
|
|
||||||
// tagLen += pTagsSchema[j].bytes;
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// rsp += tagLen;
|
|
||||||
// int32_t size = (int32_t)(rsp - ((char *)pMeta)); // Consistent with STableMeta in cache
|
|
||||||
// }
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pSql->res.code = TSDB_CODE_SUCCESS;
|
pSql->res.code = TSDB_CODE_SUCCESS;
|
||||||
pSql->res.numOfTotal = i;
|
pSql->res.numOfTotal = pMultiMeta->numOfTables;
|
||||||
tscDebug("0x%"PRIx64" load multi-metermeta resp from complete num:%d", pSql->self, pSql->res.numOfTotal);
|
tscDebug("0x%"PRIx64" load multi-tableMeta resp from complete numOfTables:%d", pSql->self, pMultiMeta->numOfTables);
|
||||||
#endif
|
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2670,7 +2638,7 @@ void tscInitMsgsFp() {
|
||||||
tscProcessMsgRsp[TSDB_SQL_USE_DB] = tscProcessUseDbRsp;
|
tscProcessMsgRsp[TSDB_SQL_USE_DB] = tscProcessUseDbRsp;
|
||||||
tscProcessMsgRsp[TSDB_SQL_META] = tscProcessTableMetaRsp;
|
tscProcessMsgRsp[TSDB_SQL_META] = tscProcessTableMetaRsp;
|
||||||
tscProcessMsgRsp[TSDB_SQL_STABLEVGROUP] = tscProcessSTableVgroupRsp;
|
tscProcessMsgRsp[TSDB_SQL_STABLEVGROUP] = tscProcessSTableVgroupRsp;
|
||||||
tscProcessMsgRsp[TSDB_SQL_MULTI_META] = tscProcessMultiMeterMetaRsp;
|
tscProcessMsgRsp[TSDB_SQL_MULTI_META] = tscProcessMultiTableMetaRsp;
|
||||||
|
|
||||||
tscProcessMsgRsp[TSDB_SQL_SHOW] = tscProcessShowRsp;
|
tscProcessMsgRsp[TSDB_SQL_SHOW] = tscProcessShowRsp;
|
||||||
tscProcessMsgRsp[TSDB_SQL_RETRIEVE] = tscProcessRetrieveRspFromNode; // rsp handled by same function.
|
tscProcessMsgRsp[TSDB_SQL_RETRIEVE] = tscProcessRetrieveRspFromNode; // rsp handled by same function.
|
||||||
|
|
|
@ -754,6 +754,7 @@ typedef struct STableMetaMsg {
|
||||||
|
|
||||||
typedef struct SMultiTableMeta {
|
typedef struct SMultiTableMeta {
|
||||||
int32_t numOfTables;
|
int32_t numOfTables;
|
||||||
|
int32_t numOfVgroup;
|
||||||
int32_t contLen;
|
int32_t contLen;
|
||||||
char meta[];
|
char meta[];
|
||||||
} SMultiTableMeta;
|
} SMultiTableMeta;
|
||||||
|
|
|
@ -1709,11 +1709,7 @@ static int32_t mnodeGetSuperTableMeta(SMnodeMsg *pMsg) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *pMsg) {
|
static int32_t calculateVgroupMsgLength(SSTableVgroupMsg* pInfo, int32_t numOfTable) {
|
||||||
SSTableVgroupMsg *pInfo = pMsg->rpcMsg.pCont;
|
|
||||||
int32_t numOfTable = htonl(pInfo->numOfTables);
|
|
||||||
|
|
||||||
// reserve space
|
|
||||||
int32_t contLen = sizeof(SSTableVgroupRspMsg) + 32 * sizeof(SVgroupMsg) + sizeof(SVgroupsMsg);
|
int32_t contLen = sizeof(SSTableVgroupRspMsg) + 32 * sizeof(SVgroupMsg) + sizeof(SVgroupsMsg);
|
||||||
for (int32_t i = 0; i < numOfTable; ++i) {
|
for (int32_t i = 0; i < numOfTable; ++i) {
|
||||||
char *stableName = (char *)pInfo + sizeof(SSTableVgroupMsg) + (TSDB_TABLE_FNAME_LEN)*i;
|
char *stableName = (char *)pInfo + sizeof(SSTableVgroupMsg) + (TSDB_TABLE_FNAME_LEN)*i;
|
||||||
|
@ -1725,6 +1721,72 @@ static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *pMsg) {
|
||||||
mnodeDecTableRef(pTable);
|
mnodeDecTableRef(pTable);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return contLen;
|
||||||
|
}
|
||||||
|
|
||||||
|
static char* serializeVgroupInfo(SSTableObj *pTable, char* msg, SMnodeMsg* pMsgBody, void* handle) {
|
||||||
|
*(uint64_t*)msg = htobe64(pTable->uid);
|
||||||
|
msg += sizeof(sizeof(pTable->uid));
|
||||||
|
|
||||||
|
if (pTable->vgHash == NULL) {
|
||||||
|
mDebug("msg:%p, app:%p stable:%s, no vgroup exist while get stable vgroup info", pMsgBody, handle, stableName);
|
||||||
|
mnodeDecTableRef(pTable);
|
||||||
|
|
||||||
|
// even this super table has no corresponding table, still return
|
||||||
|
int64_t uid = htobe64(pTable->uid);
|
||||||
|
SVgroupsMsg *pVgroupMsg = (SVgroupsMsg *)msg;
|
||||||
|
pVgroupMsg->numOfVgroups = 0;
|
||||||
|
|
||||||
|
msg += sizeof(SVgroupsMsg);
|
||||||
|
} else {
|
||||||
|
SVgroupsMsg *pVgroupMsg = (SVgroupsMsg *)msg;
|
||||||
|
mDebug("msg:%p, app:%p stable:%s, hash:%p sizeOfVgList:%d will be returned", pMsgBody, handle,
|
||||||
|
pTable->info.tableId, pTable->vgHash, taosHashGetSize(pTable->vgHash));
|
||||||
|
|
||||||
|
int32_t *pVgId = taosHashIterate(pTable->vgHash, NULL);
|
||||||
|
int32_t vgSize = 0;
|
||||||
|
while (pVgId) {
|
||||||
|
SVgObj *pVgroup = mnodeGetVgroup(*pVgId);
|
||||||
|
pVgId = taosHashIterate(pTable->vgHash, pVgId);
|
||||||
|
if (pVgroup == NULL) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
pVgroupMsg->vgroups[vgSize].vgId = htonl(pVgroup->vgId);
|
||||||
|
pVgroupMsg->vgroups[vgSize].numOfEps = 0;
|
||||||
|
|
||||||
|
for (int32_t vn = 0; vn < pVgroup->numOfVnodes; ++vn) {
|
||||||
|
SDnodeObj *pDnode = pVgroup->vnodeGid[vn].pDnode;
|
||||||
|
if (pDnode == NULL) break;
|
||||||
|
|
||||||
|
tstrncpy(pVgroupMsg->vgroups[vgSize].epAddr[vn].fqdn, pDnode->dnodeFqdn, TSDB_FQDN_LEN);
|
||||||
|
pVgroupMsg->vgroups[vgSize].epAddr[vn].port = htons(pDnode->dnodePort);
|
||||||
|
|
||||||
|
pVgroupMsg->vgroups[vgSize].numOfEps++;
|
||||||
|
}
|
||||||
|
|
||||||
|
vgSize++;
|
||||||
|
mnodeDecVgroupRef(pVgroup);
|
||||||
|
}
|
||||||
|
|
||||||
|
taosHashCancelIterate(pTable->vgHash, pVgId);
|
||||||
|
mnodeDecTableRef(pTable);
|
||||||
|
|
||||||
|
pVgroupMsg->numOfVgroups = htonl(vgSize);
|
||||||
|
|
||||||
|
// one table is done, try the next table
|
||||||
|
msg += sizeof(SVgroupsMsg) + vgSize * sizeof(SVgroupMsg);
|
||||||
|
}
|
||||||
|
|
||||||
|
return msg;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *pMsg) {
|
||||||
|
SSTableVgroupMsg *pInfo = pMsg->rpcMsg.pCont;
|
||||||
|
int32_t numOfTable = htonl(pInfo->numOfTables);
|
||||||
|
|
||||||
|
// calculate the required space.
|
||||||
|
int32_t contLen = calculateVgroupMsgLength(pInfo, numOfTable);
|
||||||
SSTableVgroupRspMsg *pRsp = rpcMallocCont(contLen);
|
SSTableVgroupRspMsg *pRsp = rpcMallocCont(contLen);
|
||||||
if (pRsp == NULL) {
|
if (pRsp == NULL) {
|
||||||
return TSDB_CODE_MND_OUT_OF_MEMORY;
|
return TSDB_CODE_MND_OUT_OF_MEMORY;
|
||||||
|
@ -1735,62 +1797,67 @@ static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *pMsg) {
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfTable; ++i) {
|
for (int32_t i = 0; i < numOfTable; ++i) {
|
||||||
char *stableName = (char *)pInfo + sizeof(SSTableVgroupMsg) + (TSDB_TABLE_FNAME_LEN)*i;
|
char *stableName = (char *)pInfo + sizeof(SSTableVgroupMsg) + (TSDB_TABLE_FNAME_LEN)*i;
|
||||||
|
|
||||||
SSTableObj *pTable = mnodeGetSuperTable(stableName);
|
SSTableObj *pTable = mnodeGetSuperTable(stableName);
|
||||||
if (pTable == NULL) {
|
if (pTable == NULL) {
|
||||||
mError("msg:%p, app:%p stable:%s, not exist while get stable vgroup info", pMsg, pMsg->rpcMsg.ahandle, stableName);
|
mError("msg:%p, app:%p stable:%s, not exist while get stable vgroup info", pMsg, pMsg->rpcMsg.ahandle, stableName);
|
||||||
mnodeDecTableRef(pTable);
|
mnodeDecTableRef(pTable);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if (pTable->vgHash == NULL) {
|
|
||||||
mDebug("msg:%p, app:%p stable:%s, no vgroup exist while get stable vgroup info", pMsg, pMsg->rpcMsg.ahandle,
|
|
||||||
stableName);
|
|
||||||
mnodeDecTableRef(pTable);
|
|
||||||
|
|
||||||
// even this super table has no corresponding table, still return
|
msg = serializeVgroupInfo(pTable, msg, pMsg, pMsg->rpcMsg.ahandle);
|
||||||
pRsp->numOfTables++;
|
pRsp->numOfTables++;
|
||||||
|
|
||||||
SVgroupsMsg *pVgroupMsg = (SVgroupsMsg *)msg;
|
// if (pTable->vgHash == NULL) {
|
||||||
pVgroupMsg->numOfVgroups = 0;
|
// mDebug("msg:%p, app:%p stable:%s, no vgroup exist while get stable vgroup info", pMsg, pMsg->rpcMsg.ahandle,
|
||||||
|
// stableName);
|
||||||
msg += sizeof(SVgroupsMsg);
|
// mnodeDecTableRef(pTable);
|
||||||
} else {
|
//
|
||||||
SVgroupsMsg *pVgroupMsg = (SVgroupsMsg *)msg;
|
// // even this super table has no corresponding table, still return
|
||||||
mDebug("msg:%p, app:%p stable:%s, hash:%p sizeOfVgList:%d will be returned", pMsg, pMsg->rpcMsg.ahandle,
|
// pRsp->numOfTables++;
|
||||||
pTable->info.tableId, pTable->vgHash, taosHashGetSize(pTable->vgHash));
|
//
|
||||||
|
// SVgroupsMsg *pVgroupMsg = (SVgroupsMsg *)msg;
|
||||||
int32_t *pVgId = taosHashIterate(pTable->vgHash, NULL);
|
// pVgroupMsg->numOfVgroups = 0;
|
||||||
int32_t vgSize = 0;
|
//
|
||||||
while (pVgId) {
|
// msg += sizeof(SVgroupsMsg);
|
||||||
SVgObj *pVgroup = mnodeGetVgroup(*pVgId);
|
// } else {
|
||||||
pVgId = taosHashIterate(pTable->vgHash, pVgId);
|
// SVgroupsMsg *pVgroupMsg = (SVgroupsMsg *)msg;
|
||||||
if (pVgroup == NULL) continue;
|
// mDebug("msg:%p, app:%p stable:%s, hash:%p sizeOfVgList:%d will be returned", pMsg, pMsg->rpcMsg.ahandle,
|
||||||
|
// pTable->info.tableId, pTable->vgHash, taosHashGetSize(pTable->vgHash));
|
||||||
pVgroupMsg->vgroups[vgSize].vgId = htonl(pVgroup->vgId);
|
//
|
||||||
pVgroupMsg->vgroups[vgSize].numOfEps = 0;
|
// int32_t *pVgId = taosHashIterate(pTable->vgHash, NULL);
|
||||||
|
// int32_t vgSize = 0;
|
||||||
for (int32_t vn = 0; vn < pVgroup->numOfVnodes; ++vn) {
|
// while (pVgId) {
|
||||||
SDnodeObj *pDnode = pVgroup->vnodeGid[vn].pDnode;
|
// SVgObj *pVgroup = mnodeGetVgroup(*pVgId);
|
||||||
if (pDnode == NULL) break;
|
// pVgId = taosHashIterate(pTable->vgHash, pVgId);
|
||||||
|
// if (pVgroup == NULL) continue;
|
||||||
tstrncpy(pVgroupMsg->vgroups[vgSize].epAddr[vn].fqdn, pDnode->dnodeFqdn, TSDB_FQDN_LEN);
|
//
|
||||||
pVgroupMsg->vgroups[vgSize].epAddr[vn].port = htons(pDnode->dnodePort);
|
// pVgroupMsg->vgroups[vgSize].vgId = htonl(pVgroup->vgId);
|
||||||
|
// pVgroupMsg->vgroups[vgSize].numOfEps = 0;
|
||||||
pVgroupMsg->vgroups[vgSize].numOfEps++;
|
//
|
||||||
}
|
// for (int32_t vn = 0; vn < pVgroup->numOfVnodes; ++vn) {
|
||||||
|
// SDnodeObj *pDnode = pVgroup->vnodeGid[vn].pDnode;
|
||||||
vgSize++;
|
// if (pDnode == NULL) break;
|
||||||
mnodeDecVgroupRef(pVgroup);
|
//
|
||||||
}
|
// tstrncpy(pVgroupMsg->vgroups[vgSize].epAddr[vn].fqdn, pDnode->dnodeFqdn, TSDB_FQDN_LEN);
|
||||||
|
// pVgroupMsg->vgroups[vgSize].epAddr[vn].port = htons(pDnode->dnodePort);
|
||||||
taosHashCancelIterate(pTable->vgHash, pVgId);
|
//
|
||||||
mnodeDecTableRef(pTable);
|
// pVgroupMsg->vgroups[vgSize].numOfEps++;
|
||||||
|
// }
|
||||||
pVgroupMsg->numOfVgroups = htonl(vgSize);
|
//
|
||||||
|
// vgSize++;
|
||||||
// one table is done, try the next table
|
// mnodeDecVgroupRef(pVgroup);
|
||||||
msg += sizeof(SVgroupsMsg) + vgSize * sizeof(SVgroupMsg);
|
// }
|
||||||
pRsp->numOfTables++;
|
//
|
||||||
}
|
// taosHashCancelIterate(pTable->vgHash, pVgId);
|
||||||
|
// mnodeDecTableRef(pTable);
|
||||||
|
//
|
||||||
|
// pVgroupMsg->numOfVgroups = htonl(vgSize);
|
||||||
|
//
|
||||||
|
// // one table is done, try the next table
|
||||||
|
// msg += sizeof(SVgroupsMsg) + vgSize * sizeof(SVgroupMsg);
|
||||||
|
// pRsp->numOfTables++;
|
||||||
|
// }
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pRsp->numOfTables != numOfTable) {
|
if (pRsp->numOfTables != numOfTable) {
|
||||||
|
@ -2818,14 +2885,13 @@ static void mnodeProcessAlterTableRsp(SRpcMsg *rpcMsg) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
//TODO. set the vgroup info for the super table
|
|
||||||
static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) {
|
static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) {
|
||||||
SMultiTableInfoMsg *pInfo = pMsg->rpcMsg.pCont;
|
SMultiTableInfoMsg *pInfo = pMsg->rpcMsg.pCont;
|
||||||
pInfo->numOfTables = htonl(pInfo->numOfTables);
|
pInfo->numOfTables = htonl(pInfo->numOfTables);
|
||||||
pInfo->loadVgroup = htonl(pInfo->loadVgroup);
|
pInfo->loadVgroup = htonl(pInfo->loadVgroup);
|
||||||
|
|
||||||
// first malloc 4KB, subsequent reallocation will expand the size as twice of the original size
|
// first malloc 80KB, subsequent reallocation will expand the size as twice of the original size
|
||||||
int32_t totalMallocLen = 4 * 1024;
|
int32_t totalMallocLen = sizeof(STableMetaMsg) + sizeof(SSchema) * (TSDB_MAX_TAGS + TSDB_MAX_COLUMNS + 16);
|
||||||
|
|
||||||
SMultiTableMeta *pMultiMeta = rpcMallocCont(totalMallocLen);
|
SMultiTableMeta *pMultiMeta = rpcMallocCont(totalMallocLen);
|
||||||
if (pMultiMeta == NULL) {
|
if (pMultiMeta == NULL) {
|
||||||
|
@ -2835,13 +2901,17 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) {
|
||||||
pMultiMeta->contLen = sizeof(SMultiTableMeta);
|
pMultiMeta->contLen = sizeof(SMultiTableMeta);
|
||||||
pMultiMeta->numOfTables = 0;
|
pMultiMeta->numOfTables = 0;
|
||||||
|
|
||||||
|
SArray* pList = taosArrayInit(4, POINTER_BYTES);
|
||||||
|
|
||||||
for (int32_t t = 0; t < pInfo->numOfTables; ++t) {
|
for (int32_t t = 0; t < pInfo->numOfTables; ++t) {
|
||||||
char *fullName = (char *)(pInfo->tableNames + t * TSDB_TABLE_FNAME_LEN);
|
char *fullName = (char *)(pInfo->tableNames + t * TSDB_TABLE_FNAME_LEN);
|
||||||
|
|
||||||
if (pMsg->pTable == NULL) {
|
if (pMsg->pTable == NULL) {
|
||||||
pMsg->pTable = mnodeGetTable(fullName);
|
pMsg->pTable = mnodeGetTable(fullName);
|
||||||
if (pMsg->pTable == NULL) { // TODO: return error to client?
|
if (pMsg->pTable == NULL) {
|
||||||
continue;
|
rpcFreeCont(pMultiMeta);
|
||||||
|
mError("msg:%p, app:%p table:%s, failed to get table meta, table not exist", pMsg, pMsg->rpcMsg.ahandle, fullName);
|
||||||
|
return TSDB_CODE_MND_INVALID_TABLE_NAME;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2850,8 +2920,9 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pMsg->pDb == NULL || pMsg->pDb->status != TSDB_DB_STATUS_READY) {
|
if (pMsg->pDb == NULL || pMsg->pDb->status != TSDB_DB_STATUS_READY) {
|
||||||
mnodeDecTableRef(pMsg->pTable); // TODO: return error to client?
|
rpcFreeCont(pMultiMeta);
|
||||||
continue;
|
mnodeDecTableRef(pMsg->pTable);
|
||||||
|
return TSDB_CODE_APP_NOT_READY;
|
||||||
}
|
}
|
||||||
|
|
||||||
int availLen = totalMallocLen - pMultiMeta->contLen;
|
int availLen = totalMallocLen - pMultiMeta->contLen;
|
||||||
|
@ -2864,13 +2935,14 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
STableMetaMsg *pMeta = (STableMetaMsg *)(pMultiMeta->meta + pMultiMeta->contLen);
|
STableMetaMsg *pMeta = (STableMetaMsg *)((char*) pMultiMeta + pMultiMeta->contLen);
|
||||||
|
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
if (pMsg->pTable->type != TSDB_SUPER_TABLE) {
|
if (pMsg->pTable->type != TSDB_SUPER_TABLE) {
|
||||||
code = mnodeDoGetChildTableMeta(pMsg, pMeta);
|
code = mnodeDoGetChildTableMeta(pMsg, pMeta);
|
||||||
} else {
|
} else {
|
||||||
code = mnodeDoGetSuperTableMeta(pMsg, pMeta);
|
code = mnodeDoGetSuperTableMeta(pMsg, pMeta);
|
||||||
|
taosArrayPush(pList, fullName); // keep the super table full name
|
||||||
}
|
}
|
||||||
|
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
|
@ -2881,6 +2953,23 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) {
|
||||||
mnodeDecTableRef(pMsg->pTable);
|
mnodeDecTableRef(pMsg->pTable);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
char* msg = (char*) pMultiMeta + pMultiMeta->contLen;
|
||||||
|
|
||||||
|
for(int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
|
||||||
|
char* name = taosArrayGet(pList, i);
|
||||||
|
SSTableObj *pTable = mnodeGetSuperTable(name);
|
||||||
|
if (pTable == NULL) {
|
||||||
|
mError("msg:%p, app:%p stable:%s, not exist while get stable vgroup info", pMsg, pMsg->rpcMsg.ahandle, stableName);
|
||||||
|
mnodeDecTableRef(pTable);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
msg = serializeVgroupInfo(pTable, msg, pMsg, pMsg->rpcMsg.ahandle);
|
||||||
|
}
|
||||||
|
|
||||||
|
pMultiMeta->contLen = (msg - (char*) pMultiMeta);
|
||||||
|
|
||||||
|
pMultiMeta->numOfTables = htonl(pMultiMeta->numOfTables);
|
||||||
pMsg->rpcRsp.rsp = pMultiMeta;
|
pMsg->rpcRsp.rsp = pMultiMeta;
|
||||||
pMsg->rpcRsp.len = pMultiMeta->contLen;
|
pMsg->rpcRsp.len = pMultiMeta->contLen;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue