[td-4151] code refactor.
This commit is contained in:
parent
c62e036042
commit
9326bee430
|
@ -305,7 +305,7 @@ void tscTryQueryNextVnode(SSqlObj *pSql, __async_cb_func_t fp);
|
||||||
void tscAsyncQuerySingleRowForNextVnode(void *param, TAOS_RES *tres, int numOfRows);
|
void tscAsyncQuerySingleRowForNextVnode(void *param, TAOS_RES *tres, int numOfRows);
|
||||||
void tscTryQueryNextClause(SSqlObj* pSql, __async_cb_func_t fp);
|
void tscTryQueryNextClause(SSqlObj* pSql, __async_cb_func_t fp);
|
||||||
int tscSetMgmtEpSetFromCfg(const char *first, const char *second, SRpcCorEpSet *corEpSet);
|
int tscSetMgmtEpSetFromCfg(const char *first, const char *second, SRpcCorEpSet *corEpSet);
|
||||||
int32_t getMultiTableMetaFromMnode(SSqlObj *pSql, SArray* pNameList, bool loadVgroupInfo);
|
int32_t getMultiTableMetaFromMnode(SSqlObj *pSql, SArray* pNameList, SArray* pVgroupList);
|
||||||
int tscTransferTableNameList(SSqlObj *pSql, const char *pNameList, int32_t length);
|
int tscTransferTableNameList(SSqlObj *pSql, const char *pNameList, int32_t length);
|
||||||
|
|
||||||
bool tscSetSqlOwner(SSqlObj* pSql);
|
bool tscSetSqlOwner(SSqlObj* pSql);
|
||||||
|
|
|
@ -246,6 +246,11 @@ typedef struct SQueryInfo {
|
||||||
bool onlyTagQuery;
|
bool onlyTagQuery;
|
||||||
} SQueryInfo;
|
} SQueryInfo;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
STableMeta *pTableMeta;
|
||||||
|
SVgroupsInfo *pVgroupInfo;
|
||||||
|
} STableMetaVgroupInfo;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int command;
|
int command;
|
||||||
uint8_t msgType;
|
uint8_t msgType;
|
||||||
|
|
|
@ -5607,10 +5607,8 @@ int32_t validateLimitNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSqlN
|
||||||
* And then launching multiple async-queries against all qualified virtual nodes, during the first-stage
|
* And then launching multiple async-queries against all qualified virtual nodes, during the first-stage
|
||||||
* query operation.
|
* query operation.
|
||||||
*/
|
*/
|
||||||
int32_t code = tscGetSTableVgroupInfo(pSql, pQueryInfo);
|
// assert(allVgroupInfoRetrieved(pQueryInfo));
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
// No tables included. No results generated. Query results are empty.
|
// No tables included. No results generated. Query results are empty.
|
||||||
if (pTableMetaInfo->vgroupList->numOfVgroups == 0) {
|
if (pTableMetaInfo->vgroupList->numOfVgroups == 0) {
|
||||||
|
@ -7172,6 +7170,8 @@ int32_t loadAllTableMeta(SSqlObj* pSql, struct SSqlInfo* pInfo) {
|
||||||
STableMeta* pTableMeta = calloc(1, maxSize);
|
STableMeta* pTableMeta = calloc(1, maxSize);
|
||||||
|
|
||||||
SArray* plist = taosArrayInit(4, POINTER_BYTES);
|
SArray* plist = taosArrayInit(4, POINTER_BYTES);
|
||||||
|
SArray* pVgroupList = taosArrayInit(4, POINTER_BYTES);
|
||||||
|
|
||||||
for(int32_t i = 0; i < numOfTables; ++i) {
|
for(int32_t i = 0; i < numOfTables; ++i) {
|
||||||
SName* pname = taosArrayGet(tableNameList, i);
|
SName* pname = taosArrayGet(tableNameList, i);
|
||||||
tNameExtractFullName(pname, name);
|
tNameExtractFullName(pname, name);
|
||||||
|
@ -7183,14 +7183,19 @@ int32_t loadAllTableMeta(SSqlObj* pSql, struct SSqlInfo* pInfo) {
|
||||||
if (pTableMeta->id.uid > 0) {
|
if (pTableMeta->id.uid > 0) {
|
||||||
if (pTableMeta->tableType == TSDB_CHILD_TABLE) {
|
if (pTableMeta->tableType == TSDB_CHILD_TABLE) {
|
||||||
int32_t code = tscCreateTableMetaFromCChildMeta(pTableMeta, name);
|
int32_t code = tscCreateTableMetaFromCChildMeta(pTableMeta, name);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) { // add to retrieve list
|
||||||
// add to retrieve list
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
} else if (pTableMeta->tableType == TSDB_SUPER_TABLE) {
|
||||||
|
// the vgroup list of a super table is not kept in local buffer, so here need retrieve it
|
||||||
|
// from the mnode each time
|
||||||
|
char* t = strdup(name);
|
||||||
|
taosArrayPush(pVgroupList, &t);
|
||||||
}
|
}
|
||||||
|
|
||||||
STableMeta* pMeta = tscTableMetaDup(pTableMeta);
|
STableMeta* pMeta = tscTableMetaDup(pTableMeta);
|
||||||
taosHashPut(pCmd->pTableMetaMap, name, strlen(name), &pMeta, POINTER_BYTES);
|
STableMetaVgroupInfo p = {.pTableMeta = pMeta,};
|
||||||
|
taosHashPut(pCmd->pTableMetaMap, name, strlen(name), &p, sizeof(STableMetaVgroupInfo));
|
||||||
} else {// add to the retrieve table meta array list.
|
} else {// add to the retrieve table meta array list.
|
||||||
char* t = strdup(name);
|
char* t = strdup(name);
|
||||||
taosArrayPush(plist, &t);
|
taosArrayPush(plist, &t);
|
||||||
|
@ -7199,7 +7204,7 @@ int32_t loadAllTableMeta(SSqlObj* pSql, struct SSqlInfo* pInfo) {
|
||||||
|
|
||||||
// load the table meta for a given table name list
|
// load the table meta for a given table name list
|
||||||
if (taosArrayGetSize(plist) > 0) {
|
if (taosArrayGetSize(plist) > 0) {
|
||||||
int32_t code = getMultiTableMetaFromMnode(pSql, plist, true);
|
int32_t code = getMultiTableMetaFromMnode(pSql, plist, pVgroupList);
|
||||||
taosArrayDestroyEx(plist, freeElem);
|
taosArrayDestroyEx(plist, freeElem);
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
|
@ -7256,7 +7261,11 @@ static int32_t doLoadAllTableMeta(SSqlObj* pSql, SQueryInfo* pQueryInfo, SSqlNod
|
||||||
}
|
}
|
||||||
|
|
||||||
const char* name = tNameGetTableName(&pTableMetaInfo->name);
|
const char* name = tNameGetTableName(&pTableMetaInfo->name);
|
||||||
pTableMetaInfo->pTableMeta = taosHashGet(pCmd->pTableMetaMap, name, strlen(name));
|
STableMetaVgroupInfo* p = taosHashGet(pCmd->pTableMetaMap, name, strlen(name));
|
||||||
|
|
||||||
|
pTableMetaInfo->pTableMeta = p->pTableMeta;
|
||||||
|
pTableMetaInfo->vgroupList = p->pVgroupInfo;
|
||||||
|
|
||||||
assert(pTableMetaInfo->pTableMeta != NULL);
|
assert(pTableMetaInfo->pTableMeta != NULL);
|
||||||
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -7394,16 +7403,9 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
|
||||||
}
|
}
|
||||||
|
|
||||||
bool isSTable = UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo);
|
bool isSTable = UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo);
|
||||||
if (isSTable) {
|
|
||||||
code = tscGetSTableVgroupInfo(pSql, pQueryInfo); // TODO refactor: getTablemeta along with vgroupInfo
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_STABLE_QUERY);
|
int32_t type = isSTable? TSDB_QUERY_TYPE_STABLE_QUERY:TSDB_QUERY_TYPE_TABLE_QUERY;
|
||||||
} else {
|
TSDB_QUERY_SET_TYPE(pQueryInfo->type, type);
|
||||||
TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TABLE_QUERY);
|
|
||||||
}
|
|
||||||
|
|
||||||
// parse the group by clause in the first place
|
// parse the group by clause in the first place
|
||||||
if (validateGroupbyNode(pQueryInfo, pSqlNode->pGroupby, pCmd) != TSDB_CODE_SUCCESS) {
|
if (validateGroupbyNode(pQueryInfo, pSqlNode->pGroupby, pCmd) != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -7527,7 +7529,7 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
|
||||||
pQueryInfo->exprList1 = taosArrayInit(4, POINTER_BYTES);
|
pQueryInfo->exprList1 = taosArrayInit(4, POINTER_BYTES);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayPushBatch(pQueryInfo->exprList1, (void*) p, numOfExpr);
|
taosArrayAddBatch(pQueryInfo->exprList1, (void*) p, numOfExpr);
|
||||||
}
|
}
|
||||||
|
|
||||||
#if 0
|
#if 0
|
||||||
|
|
|
@ -1829,7 +1829,7 @@ static int32_t tableMetaMsgConvert(STableMetaMsg* pMetaMsg) {
|
||||||
pMetaMsg->vgroup.vgId = htonl(pMetaMsg->vgroup.vgId);
|
pMetaMsg->vgroup.vgId = htonl(pMetaMsg->vgroup.vgId);
|
||||||
|
|
||||||
pMetaMsg->uid = htobe64(pMetaMsg->uid);
|
pMetaMsg->uid = htobe64(pMetaMsg->uid);
|
||||||
pMetaMsg->contLen = htons(pMetaMsg->contLen);
|
// pMetaMsg->contLen = htonl(pMetaMsg->contLen);
|
||||||
pMetaMsg->numOfColumns = htons(pMetaMsg->numOfColumns);
|
pMetaMsg->numOfColumns = htons(pMetaMsg->numOfColumns);
|
||||||
|
|
||||||
if ((pMetaMsg->tableType != TSDB_SUPER_TABLE) &&
|
if ((pMetaMsg->tableType != TSDB_SUPER_TABLE) &&
|
||||||
|
@ -1941,16 +1941,61 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
static SVgroupsInfo* createVgroupInfoFromMsg(char* pMsg, int32_t* size, uint64_t id) {
|
||||||
* multi table meta rsp pkg format:
|
SVgroupsMsg *pVgroupMsg = (SVgroupsMsg *) pMsg;
|
||||||
* | SMultiTableInfoMsg | SMeterMeta0 | SSchema0 | SMeterMeta1 | SSchema1 | SMeterMeta2 | SSchema2
|
pVgroupMsg->numOfVgroups = htonl(pVgroupMsg->numOfVgroups);
|
||||||
* | 4B
|
|
||||||
*/
|
*size = (int32_t) (sizeof(SVgroupMsg) * pVgroupMsg->numOfVgroups + sizeof(SVgroupsMsg));
|
||||||
|
|
||||||
|
size_t vgroupsz = sizeof(SVgroupInfo) * pVgroupMsg->numOfVgroups + sizeof(SVgroupsInfo);
|
||||||
|
SVgroupsInfo* pVgroupInfo = calloc(1, vgroupsz);
|
||||||
|
assert(pVgroupInfo != NULL);
|
||||||
|
|
||||||
|
pVgroupInfo->numOfVgroups = pVgroupMsg->numOfVgroups;
|
||||||
|
if (pVgroupInfo->numOfVgroups <= 0) {
|
||||||
|
tscDebug("0x%"PRIx64" empty vgroup info, no corresponding tables for stable", id);
|
||||||
|
} else {
|
||||||
|
for (int32_t j = 0; j < pVgroupInfo->numOfVgroups; ++j) {
|
||||||
|
// just init, no need to lock
|
||||||
|
SVgroupInfo *pVgroup = &pVgroupInfo->vgroups[j];
|
||||||
|
|
||||||
|
SVgroupMsg *vmsg = &pVgroupMsg->vgroups[j];
|
||||||
|
vmsg->vgId = htonl(vmsg->vgId);
|
||||||
|
vmsg->numOfEps = vmsg->numOfEps;
|
||||||
|
for (int32_t k = 0; k < vmsg->numOfEps; ++k) {
|
||||||
|
vmsg->epAddr[k].port = htons(vmsg->epAddr[k].port);
|
||||||
|
}
|
||||||
|
|
||||||
|
SNewVgroupInfo newVi = createNewVgroupInfo(vmsg);
|
||||||
|
pVgroup->numOfEps = newVi.numOfEps;
|
||||||
|
pVgroup->vgId = newVi.vgId;
|
||||||
|
for (int32_t k = 0; k < vmsg->numOfEps; ++k) {
|
||||||
|
pVgroup->epAddr[k].port = newVi.ep[k].port;
|
||||||
|
pVgroup->epAddr[k].fqdn = strndup(newVi.ep[k].fqdn, TSDB_FQDN_LEN);
|
||||||
|
}
|
||||||
|
|
||||||
|
// check if current buffer contains the vgroup info.
|
||||||
|
// If not, add it
|
||||||
|
SNewVgroupInfo existVgroupInfo = {.inUse = -1};
|
||||||
|
taosHashGetClone(tscVgroupMap, &newVi.vgId, sizeof(newVi.vgId), NULL, &existVgroupInfo, sizeof(SNewVgroupInfo));
|
||||||
|
|
||||||
|
if (((existVgroupInfo.inUse >= 0) && !vgroupInfoIdentical(&existVgroupInfo, vmsg)) ||
|
||||||
|
(existVgroupInfo.inUse < 0)) { // vgroup info exists, compare with it
|
||||||
|
taosHashPut(tscVgroupMap, &newVi.vgId, sizeof(newVi.vgId), &newVi, sizeof(newVi));
|
||||||
|
tscDebug("0x%"PRIx64" add new VgroupInfo, vgId:%d, total cached:%d", id, newVi.vgId, (int32_t) taosHashGetSize(tscVgroupMap));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return pVgroupInfo;
|
||||||
|
}
|
||||||
|
|
||||||
int tscProcessMultiTableMetaRsp(SSqlObj *pSql) {
|
int tscProcessMultiTableMetaRsp(SSqlObj *pSql) {
|
||||||
char *rsp = pSql->res.pRsp;
|
char *rsp = pSql->res.pRsp;
|
||||||
|
|
||||||
SMultiTableMeta *pMultiMeta = (SMultiTableMeta *)rsp;
|
SMultiTableMeta *pMultiMeta = (SMultiTableMeta *)rsp;
|
||||||
pMultiMeta->numOfTables = htonl(pMultiMeta->numOfTables);
|
pMultiMeta->numOfTables = htonl(pMultiMeta->numOfTables);
|
||||||
|
pMultiMeta->numOfVgroup = htonl(pMultiMeta->numOfVgroup);
|
||||||
|
|
||||||
rsp += sizeof(SMultiTableMeta);
|
rsp += sizeof(SMultiTableMeta);
|
||||||
|
|
||||||
|
@ -1959,8 +2004,9 @@ int tscProcessMultiTableMetaRsp(SSqlObj *pSql) {
|
||||||
|
|
||||||
SHashObj *pSet = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
SHashObj *pSet = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
||||||
|
|
||||||
|
char* pMsg = pMultiMeta->meta;
|
||||||
for (int32_t i = 0; i < pMultiMeta->numOfTables; i++) {
|
for (int32_t i = 0; i < pMultiMeta->numOfTables; i++) {
|
||||||
STableMetaMsg *pMetaMsg = (STableMetaMsg *)pMultiMeta->meta;
|
STableMetaMsg *pMetaMsg = (STableMetaMsg *)pMsg;
|
||||||
int32_t code = tableMetaMsgConvert(pMetaMsg);
|
int32_t code = tableMetaMsgConvert(pMetaMsg);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
|
@ -1972,13 +2018,14 @@ int tscProcessMultiTableMetaRsp(SSqlObj *pSql) {
|
||||||
return TSDB_CODE_TSC_INVALID_VALUE;
|
return TSDB_CODE_TSC_INVALID_VALUE;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t t = tscGetTableMetaSize(pTableMeta);
|
|
||||||
|
|
||||||
SName sn = {0};
|
SName sn = {0};
|
||||||
tNameFromString(&sn, pMetaMsg->tableFname, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
tNameFromString(&sn, pMetaMsg->tableFname, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
||||||
|
|
||||||
const char* tableName = tNameGetTableName(&sn);
|
const char* tableName = tNameGetTableName(&sn);
|
||||||
int32_t keyLen = strlen(tableName);
|
int32_t keyLen = strlen(tableName);
|
||||||
taosHashPut(pParentCmd->pTableMetaMap, tableName, keyLen, pTableMeta, t);
|
|
||||||
|
STableMetaVgroupInfo p = {.pTableMeta = pTableMeta,};
|
||||||
|
taosHashPut(pParentCmd->pTableMetaMap, tableName, keyLen, &p, sizeof(STableMetaVgroupInfo));
|
||||||
|
|
||||||
bool addToBuf = false;
|
bool addToBuf = false;
|
||||||
if (taosHashGet(pSet, &pMetaMsg->uid, sizeof(pMetaMsg->uid)) == NULL) {
|
if (taosHashGet(pSet, &pMetaMsg->uid, sizeof(pMetaMsg->uid)) == NULL) {
|
||||||
|
@ -1991,10 +2038,26 @@ int tscProcessMultiTableMetaRsp(SSqlObj *pSql) {
|
||||||
|
|
||||||
// if the vgroup is not updated in current process, update it.
|
// if the vgroup is not updated in current process, update it.
|
||||||
int64_t vgId = pMetaMsg->vgroup.vgId;
|
int64_t vgId = pMetaMsg->vgroup.vgId;
|
||||||
if (taosHashGet(pSet, &vgId, sizeof(vgId)) == NULL) {
|
if (pTableMeta->tableType != TSDB_SUPER_TABLE && taosHashGet(pSet, &vgId, sizeof(vgId)) == NULL) {
|
||||||
doUpdateVgroupInfo(pTableMeta, &pMetaMsg->vgroup);
|
doUpdateVgroupInfo(pTableMeta, &pMetaMsg->vgroup);
|
||||||
taosHashPut(pSet, &vgId, sizeof(vgId), "", 0);
|
taosHashPut(pSet, &vgId, sizeof(vgId), "", 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pMsg += pMetaMsg->contLen;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pMultiMeta->numOfVgroup > 0) {
|
||||||
|
char* name = pMsg;
|
||||||
|
pMsg += TSDB_TABLE_NAME_LEN;
|
||||||
|
|
||||||
|
STableMetaVgroupInfo* p = taosHashGet(pParentCmd->pTableMetaMap, name, strnlen(name, TSDB_TABLE_NAME_LEN));
|
||||||
|
assert(p != NULL);
|
||||||
|
|
||||||
|
int32_t size = 0;
|
||||||
|
SVgroupsInfo* pVgroupInfo = createVgroupInfoFromMsg(pMsg, &size, pSql->self);
|
||||||
|
|
||||||
|
p->pVgroupInfo = pVgroupInfo;
|
||||||
|
pMsg += size;
|
||||||
}
|
}
|
||||||
|
|
||||||
pSql->res.code = TSDB_CODE_SUCCESS;
|
pSql->res.code = TSDB_CODE_SUCCESS;
|
||||||
|
@ -2027,9 +2090,9 @@ int tscProcessSTableVgroupRsp(SSqlObj *pSql) {
|
||||||
SVgroupsMsg *pVgroupMsg = (SVgroupsMsg *) pMsg;
|
SVgroupsMsg *pVgroupMsg = (SVgroupsMsg *) pMsg;
|
||||||
pVgroupMsg->numOfVgroups = htonl(pVgroupMsg->numOfVgroups);
|
pVgroupMsg->numOfVgroups = htonl(pVgroupMsg->numOfVgroups);
|
||||||
|
|
||||||
size_t size = sizeof(SVgroupMsg) * pVgroupMsg->numOfVgroups + sizeof(SVgroupsMsg);
|
int32_t size = 0;
|
||||||
|
pInfo->vgroupList = createVgroupInfoFromMsg(pMsg, &size, pSql->self);
|
||||||
size_t vgroupsz = sizeof(SVgroupInfo) * pVgroupMsg->numOfVgroups + sizeof(SVgroupsInfo);
|
/* size_t vgroupsz = sizeof(SVgroupInfo) * pVgroupMsg->numOfVgroups + sizeof(SVgroupsInfo);
|
||||||
pInfo->vgroupList = calloc(1, vgroupsz);
|
pInfo->vgroupList = calloc(1, vgroupsz);
|
||||||
assert(pInfo->vgroupList != NULL);
|
assert(pInfo->vgroupList != NULL);
|
||||||
|
|
||||||
|
@ -2068,7 +2131,7 @@ int tscProcessSTableVgroupRsp(SSqlObj *pSql) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
*/
|
||||||
pMsg += size;
|
pMsg += size;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2403,7 +2466,7 @@ static int32_t getTableMetaFromMnode(SSqlObj *pSql, STableMetaInfo *pTableMetaIn
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t getMultiTableMetaFromMnode(SSqlObj *pSql, SArray* pNameList, bool loadVgroupInfo) {
|
int32_t getMultiTableMetaFromMnode(SSqlObj *pSql, SArray* pNameList, SArray* pVgroupNameList) {
|
||||||
SSqlObj *pNew = calloc(1, sizeof(SSqlObj));
|
SSqlObj *pNew = calloc(1, sizeof(SSqlObj));
|
||||||
if (NULL == pNew) {
|
if (NULL == pNew) {
|
||||||
tscError("0x%"PRIx64" failed to allocate sqlobj to get multiple table meta", pSql->self);
|
tscError("0x%"PRIx64" failed to allocate sqlobj to get multiple table meta", pSql->self);
|
||||||
|
@ -2414,8 +2477,10 @@ int32_t getMultiTableMetaFromMnode(SSqlObj *pSql, SArray* pNameList, bool loadVg
|
||||||
pNew->signature = pNew;
|
pNew->signature = pNew;
|
||||||
pNew->cmd.command = TSDB_SQL_MULTI_META;
|
pNew->cmd.command = TSDB_SQL_MULTI_META;
|
||||||
|
|
||||||
int32_t numOfTables = taosArrayGetSize(pNameList);
|
int32_t numOfTable = (int32_t) taosArrayGetSize(pNameList);
|
||||||
int32_t size = numOfTables * TSDB_TABLE_FNAME_LEN + sizeof(SMultiTableInfoMsg);
|
int32_t numOfVgroupList = (int32_t) taosArrayGetSize(pVgroupNameList);
|
||||||
|
|
||||||
|
int32_t size = (numOfTable + numOfVgroupList) * TSDB_TABLE_FNAME_LEN + sizeof(SMultiTableInfoMsg);
|
||||||
if (TSDB_CODE_SUCCESS != tscAllocPayload(&pNew->cmd, size)) {
|
if (TSDB_CODE_SUCCESS != tscAllocPayload(&pNew->cmd, size)) {
|
||||||
tscError("0x%"PRIx64" malloc failed for payload to get table meta", pSql->self);
|
tscError("0x%"PRIx64" malloc failed for payload to get table meta", pSql->self);
|
||||||
tscFreeSqlObj(pNew);
|
tscFreeSqlObj(pNew);
|
||||||
|
@ -2423,14 +2488,25 @@ int32_t getMultiTableMetaFromMnode(SSqlObj *pSql, SArray* pNameList, bool loadVg
|
||||||
}
|
}
|
||||||
|
|
||||||
SMultiTableInfoMsg* pInfo = (SMultiTableInfoMsg*) pNew->cmd.payload;
|
SMultiTableInfoMsg* pInfo = (SMultiTableInfoMsg*) pNew->cmd.payload;
|
||||||
pInfo->loadVgroup = htonl(loadVgroupInfo? 1:0);
|
pInfo->numOfTables = htonl((uint32_t) taosArrayGetSize(pNameList));
|
||||||
pInfo->numOfTables = htonl(numOfTables);
|
pInfo->numOfVgroups = htonl((uint32_t) taosArrayGetSize(pVgroupNameList));
|
||||||
|
|
||||||
char* start = pInfo->tableNames;
|
char* start = pInfo->tableNames;
|
||||||
int32_t len = 0;
|
int32_t len = 0;
|
||||||
for(int32_t i = 0; i < numOfTables; ++i) {
|
for(int32_t i = 0; i < numOfTable; ++i) {
|
||||||
char* name = taosArrayGetP(pNameList, i);
|
char* name = taosArrayGetP(pNameList, i);
|
||||||
if (i < numOfTables - 1) {
|
if (i < numOfTable - 1 || numOfVgroupList > 0) {
|
||||||
|
len = sprintf(start, "%s,", name);
|
||||||
|
} else {
|
||||||
|
len = sprintf(start, "%s", name);
|
||||||
|
}
|
||||||
|
|
||||||
|
start += len;
|
||||||
|
}
|
||||||
|
|
||||||
|
for(int32_t i = 0; i < numOfVgroupList; ++i) {
|
||||||
|
char* name = taosArrayGetP(pVgroupNameList, i);
|
||||||
|
if (i < numOfVgroupList - 1) {
|
||||||
len = sprintf(start, "%s, ", name);
|
len = sprintf(start, "%s, ", name);
|
||||||
} else {
|
} else {
|
||||||
len = sprintf(start, "%s", name);
|
len = sprintf(start, "%s", name);
|
||||||
|
@ -2443,8 +2519,8 @@ int32_t getMultiTableMetaFromMnode(SSqlObj *pSql, SArray* pNameList, bool loadVg
|
||||||
pNew->cmd.msgType = TSDB_MSG_TYPE_CM_TABLES_META;
|
pNew->cmd.msgType = TSDB_MSG_TYPE_CM_TABLES_META;
|
||||||
|
|
||||||
registerSqlObj(pNew);
|
registerSqlObj(pNew);
|
||||||
tscDebug("0x%"PRIx64" new pSqlObj:0x%"PRIx64" to get %d tableMeta, loadVgroup:%d, msg size:%d", pSql->self,
|
tscDebug("0x%"PRIx64" new pSqlObj:0x%"PRIx64" to get %d tableMeta, vgroupInfo:%d, msg size:%d", pSql->self,
|
||||||
pNew->self, numOfTables, loadVgroupInfo, pNew->cmd.payloadLen);
|
pNew->self, numOfTable, numOfVgroupList, pNew->cmd.payloadLen);
|
||||||
|
|
||||||
pNew->fp = tscTableMetaCallBack;
|
pNew->fp = tscTableMetaCallBack;
|
||||||
pNew->param = (void *)pSql->self;
|
pNew->param = (void *)pSql->self;
|
||||||
|
|
|
@ -703,7 +703,7 @@ typedef struct {
|
||||||
} STableInfoMsg;
|
} STableInfoMsg;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t loadVgroup;
|
int32_t numOfVgroups;
|
||||||
int32_t numOfTables;
|
int32_t numOfTables;
|
||||||
char tableNames[];
|
char tableNames[];
|
||||||
} SMultiTableInfoMsg;
|
} SMultiTableInfoMsg;
|
||||||
|
|
|
@ -1724,16 +1724,19 @@ static int32_t calculateVgroupMsgLength(SSTableVgroupMsg* pInfo, int32_t numOfTa
|
||||||
return contLen;
|
return contLen;
|
||||||
}
|
}
|
||||||
|
|
||||||
static char* serializeVgroupInfo(SSTableObj *pTable, char* msg, SMnodeMsg* pMsgBody, void* handle) {
|
static char* serializeVgroupInfo(SSTableObj *pTable, char* name, char* msg, SMnodeMsg* pMsgBody, void* handle) {
|
||||||
*(uint64_t*)msg = htobe64(pTable->uid);
|
SName sn = {0};
|
||||||
msg += sizeof(sizeof(pTable->uid));
|
tNameFromString(&sn, name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
||||||
|
const char* tableName = tNameGetTableName(&sn);
|
||||||
|
|
||||||
|
strncpy(msg, tableName, TSDB_TABLE_NAME_LEN);
|
||||||
|
msg += TSDB_TABLE_NAME_LEN;
|
||||||
|
|
||||||
if (pTable->vgHash == NULL) {
|
if (pTable->vgHash == NULL) {
|
||||||
mDebug("msg:%p, app:%p stable:%s, no vgroup exist while get stable vgroup info", pMsgBody, handle, stableName);
|
mDebug("msg:%p, app:%p stable:%s, no vgroup exist while get stable vgroup info", pMsgBody, handle, name);
|
||||||
mnodeDecTableRef(pTable);
|
mnodeDecTableRef(pTable);
|
||||||
|
|
||||||
// even this super table has no corresponding table, still return
|
// even this super table has no corresponding table, still return
|
||||||
int64_t uid = htobe64(pTable->uid);
|
|
||||||
SVgroupsMsg *pVgroupMsg = (SVgroupsMsg *)msg;
|
SVgroupsMsg *pVgroupMsg = (SVgroupsMsg *)msg;
|
||||||
pVgroupMsg->numOfVgroups = 0;
|
pVgroupMsg->numOfVgroups = 0;
|
||||||
|
|
||||||
|
@ -1805,59 +1808,8 @@ static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *pMsg) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
msg = serializeVgroupInfo(pTable, msg, pMsg, pMsg->rpcMsg.ahandle);
|
msg = serializeVgroupInfo(pTable, stableName, msg, pMsg, pMsg->rpcMsg.ahandle);
|
||||||
pRsp->numOfTables++;
|
pRsp->numOfTables++;
|
||||||
|
|
||||||
// if (pTable->vgHash == NULL) {
|
|
||||||
// mDebug("msg:%p, app:%p stable:%s, no vgroup exist while get stable vgroup info", pMsg, pMsg->rpcMsg.ahandle,
|
|
||||||
// stableName);
|
|
||||||
// mnodeDecTableRef(pTable);
|
|
||||||
//
|
|
||||||
// // even this super table has no corresponding table, still return
|
|
||||||
// pRsp->numOfTables++;
|
|
||||||
//
|
|
||||||
// SVgroupsMsg *pVgroupMsg = (SVgroupsMsg *)msg;
|
|
||||||
// pVgroupMsg->numOfVgroups = 0;
|
|
||||||
//
|
|
||||||
// msg += sizeof(SVgroupsMsg);
|
|
||||||
// } else {
|
|
||||||
// SVgroupsMsg *pVgroupMsg = (SVgroupsMsg *)msg;
|
|
||||||
// mDebug("msg:%p, app:%p stable:%s, hash:%p sizeOfVgList:%d will be returned", pMsg, pMsg->rpcMsg.ahandle,
|
|
||||||
// pTable->info.tableId, pTable->vgHash, taosHashGetSize(pTable->vgHash));
|
|
||||||
//
|
|
||||||
// int32_t *pVgId = taosHashIterate(pTable->vgHash, NULL);
|
|
||||||
// int32_t vgSize = 0;
|
|
||||||
// while (pVgId) {
|
|
||||||
// SVgObj *pVgroup = mnodeGetVgroup(*pVgId);
|
|
||||||
// pVgId = taosHashIterate(pTable->vgHash, pVgId);
|
|
||||||
// if (pVgroup == NULL) continue;
|
|
||||||
//
|
|
||||||
// pVgroupMsg->vgroups[vgSize].vgId = htonl(pVgroup->vgId);
|
|
||||||
// pVgroupMsg->vgroups[vgSize].numOfEps = 0;
|
|
||||||
//
|
|
||||||
// for (int32_t vn = 0; vn < pVgroup->numOfVnodes; ++vn) {
|
|
||||||
// SDnodeObj *pDnode = pVgroup->vnodeGid[vn].pDnode;
|
|
||||||
// if (pDnode == NULL) break;
|
|
||||||
//
|
|
||||||
// tstrncpy(pVgroupMsg->vgroups[vgSize].epAddr[vn].fqdn, pDnode->dnodeFqdn, TSDB_FQDN_LEN);
|
|
||||||
// pVgroupMsg->vgroups[vgSize].epAddr[vn].port = htons(pDnode->dnodePort);
|
|
||||||
//
|
|
||||||
// pVgroupMsg->vgroups[vgSize].numOfEps++;
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// vgSize++;
|
|
||||||
// mnodeDecVgroupRef(pVgroup);
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// taosHashCancelIterate(pTable->vgHash, pVgId);
|
|
||||||
// mnodeDecTableRef(pTable);
|
|
||||||
//
|
|
||||||
// pVgroupMsg->numOfVgroups = htonl(vgSize);
|
|
||||||
//
|
|
||||||
// // one table is done, try the next table
|
|
||||||
// msg += sizeof(SVgroupsMsg) + vgSize * sizeof(SVgroupMsg);
|
|
||||||
// pRsp->numOfTables++;
|
|
||||||
// }
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pRsp->numOfTables != numOfTable) {
|
if (pRsp->numOfTables != numOfTable) {
|
||||||
|
@ -2887,8 +2839,9 @@ static void mnodeProcessAlterTableRsp(SRpcMsg *rpcMsg) {
|
||||||
|
|
||||||
static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) {
|
static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) {
|
||||||
SMultiTableInfoMsg *pInfo = pMsg->rpcMsg.pCont;
|
SMultiTableInfoMsg *pInfo = pMsg->rpcMsg.pCont;
|
||||||
|
|
||||||
pInfo->numOfTables = htonl(pInfo->numOfTables);
|
pInfo->numOfTables = htonl(pInfo->numOfTables);
|
||||||
pInfo->loadVgroup = htonl(pInfo->loadVgroup);
|
pInfo->numOfVgroups = htonl(pInfo->numOfVgroups);
|
||||||
|
|
||||||
// first malloc 80KB, subsequent reallocation will expand the size as twice of the original size
|
// first malloc 80KB, subsequent reallocation will expand the size as twice of the original size
|
||||||
int32_t totalMallocLen = sizeof(STableMetaMsg) + sizeof(SSchema) * (TSDB_MAX_TAGS + TSDB_MAX_COLUMNS + 16);
|
int32_t totalMallocLen = sizeof(STableMetaMsg) + sizeof(SSchema) * (TSDB_MAX_TAGS + TSDB_MAX_COLUMNS + 16);
|
||||||
|
@ -2925,8 +2878,8 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) {
|
||||||
return TSDB_CODE_APP_NOT_READY;
|
return TSDB_CODE_APP_NOT_READY;
|
||||||
}
|
}
|
||||||
|
|
||||||
int availLen = totalMallocLen - pMultiMeta->contLen;
|
int remain = totalMallocLen - pMultiMeta->contLen;
|
||||||
if (availLen <= sizeof(STableMetaMsg) + sizeof(SSchema) * (TSDB_MAX_TAGS + TSDB_MAX_COLUMNS + 16)) {
|
if (remain <= sizeof(STableMetaMsg) + sizeof(SSchema) * (TSDB_MAX_TAGS + TSDB_MAX_COLUMNS + 16)) {
|
||||||
totalMallocLen *= 2;
|
totalMallocLen *= 2;
|
||||||
pMultiMeta = rpcReallocCont(pMultiMeta, totalMallocLen);
|
pMultiMeta = rpcReallocCont(pMultiMeta, totalMallocLen);
|
||||||
if (pMultiMeta == NULL) {
|
if (pMultiMeta == NULL) {
|
||||||
|
@ -2942,7 +2895,9 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) {
|
||||||
code = mnodeDoGetChildTableMeta(pMsg, pMeta);
|
code = mnodeDoGetChildTableMeta(pMsg, pMeta);
|
||||||
} else {
|
} else {
|
||||||
code = mnodeDoGetSuperTableMeta(pMsg, pMeta);
|
code = mnodeDoGetSuperTableMeta(pMsg, pMeta);
|
||||||
taosArrayPush(pList, fullName); // keep the super table full name
|
|
||||||
|
// keep the full name for each super table for retrieve vgroup list
|
||||||
|
taosArrayPush(pList, &fullName);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
|
@ -2955,16 +2910,26 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) {
|
||||||
|
|
||||||
char* msg = (char*) pMultiMeta + pMultiMeta->contLen;
|
char* msg = (char*) pMultiMeta + pMultiMeta->contLen;
|
||||||
|
|
||||||
for(int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
|
// add the additional super table names that needs the vgroup info
|
||||||
char* name = taosArrayGet(pList, i);
|
for(int32_t i = 0; i < pInfo->numOfVgroups; ++i) {
|
||||||
|
char *fullName = (char *)(pInfo->tableNames + (i + pInfo->numOfTables) * TSDB_TABLE_FNAME_LEN);
|
||||||
|
taosArrayPush(pList, fullName);
|
||||||
|
}
|
||||||
|
|
||||||
|
// add the pVgroupList into the pList
|
||||||
|
int32_t numOfStable = (int32_t) taosArrayGetSize(pList);
|
||||||
|
pMultiMeta->numOfVgroup = htonl(numOfStable);
|
||||||
|
|
||||||
|
for(int32_t i = 0; i < numOfStable; ++i) {
|
||||||
|
char* name = taosArrayGetP(pList, i);
|
||||||
SSTableObj *pTable = mnodeGetSuperTable(name);
|
SSTableObj *pTable = mnodeGetSuperTable(name);
|
||||||
if (pTable == NULL) {
|
if (pTable == NULL) {
|
||||||
mError("msg:%p, app:%p stable:%s, not exist while get stable vgroup info", pMsg, pMsg->rpcMsg.ahandle, stableName);
|
mError("msg:%p, app:%p stable:%s, not exist while get stable vgroup info", pMsg, pMsg->rpcMsg.ahandle, name);
|
||||||
mnodeDecTableRef(pTable);
|
mnodeDecTableRef(pTable);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
msg = serializeVgroupInfo(pTable, msg, pMsg, pMsg->rpcMsg.ahandle);
|
msg = serializeVgroupInfo(pTable, name, msg, pMsg, pMsg->rpcMsg.ahandle);
|
||||||
}
|
}
|
||||||
|
|
||||||
pMultiMeta->contLen = (msg - (char*) pMultiMeta);
|
pMultiMeta->contLen = (msg - (char*) pMultiMeta);
|
||||||
|
|
|
@ -4779,7 +4779,7 @@ static void mergeTableBlockDist(STableBlockDist* pDist, const STableBlockDist* p
|
||||||
pDist->dataBlockInfos = taosArrayInit(4, sizeof(SFileBlockInfo));
|
pDist->dataBlockInfos = taosArrayInit(4, sizeof(SFileBlockInfo));
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayPushBatch(pDist->dataBlockInfos, pSrc->dataBlockInfos->pData, (int32_t) taosArrayGetSize(pSrc->dataBlockInfos));
|
taosArrayAddBatch(pDist->dataBlockInfos, pSrc->dataBlockInfos->pData, (int32_t) taosArrayGetSize(pSrc->dataBlockInfos));
|
||||||
}
|
}
|
||||||
|
|
||||||
void block_func_merge(SQLFunctionCtx* pCtx) {
|
void block_func_merge(SQLFunctionCtx* pCtx) {
|
||||||
|
|
|
@ -215,7 +215,7 @@ SArray* createQueryPlanImpl(SQueryInfo* pQueryInfo) {
|
||||||
for(int32_t i = 0; i < size; ++i) {
|
for(int32_t i = 0; i < size; ++i) {
|
||||||
SQueryInfo* pq = taosArrayGet(pQueryInfo->pUpstream, i);
|
SQueryInfo* pq = taosArrayGet(pQueryInfo->pUpstream, i);
|
||||||
SArray* p = createQueryPlanImpl(pq);
|
SArray* p = createQueryPlanImpl(pq);
|
||||||
taosArrayPushBatch(upstream, p->pData, (int32_t) taosArrayGetSize(p));
|
taosArrayAddBatch(upstream, p->pData, (int32_t) taosArrayGetSize(p));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1149,7 +1149,7 @@ static int tsdbCommitAddBlock(SCommitH *pCommith, const SBlock *pSupBlock, const
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pSubBlocks && taosArrayPushBatch(pCommith->aSubBlk, pSubBlocks, nSubBlocks) == NULL) {
|
if (pSubBlocks && taosArrayAddBatch(pCommith->aSubBlk, pSubBlocks, nSubBlocks) == NULL) {
|
||||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -50,7 +50,15 @@ void* taosArrayInit(size_t size, size_t elemSize);
|
||||||
* @param nEles
|
* @param nEles
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
void *taosArrayPushBatch(SArray *pArray, const void *pData, int nEles);
|
void *taosArrayAddBatch(SArray *pArray, const void *pData, int nEles);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* add all element from the source array list into the destination
|
||||||
|
* @param pArray
|
||||||
|
* @param pInput
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
void* taosArrayAddAll(SArray* pArray, const SArray* pInput);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
|
@ -59,7 +67,7 @@ void *taosArrayPushBatch(SArray *pArray, const void *pData, int nEles);
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
static FORCE_INLINE void* taosArrayPush(SArray* pArray, const void* pData) {
|
static FORCE_INLINE void* taosArrayPush(SArray* pArray, const void* pData) {
|
||||||
return taosArrayPushBatch(pArray, pData, 1);
|
return taosArrayAddBatch(pArray, pData, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -55,7 +55,7 @@ static int32_t taosArrayResize(SArray* pArray) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void* taosArrayPushBatch(SArray* pArray, const void* pData, int nEles) {
|
void* taosArrayAddBatch(SArray* pArray, const void* pData, int nEles) {
|
||||||
if (pArray == NULL || pData == NULL) {
|
if (pArray == NULL || pData == NULL) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -81,6 +81,10 @@ void* taosArrayPushBatch(SArray* pArray, const void* pData, int nEles) {
|
||||||
return dst;
|
return dst;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void* taosArrayAddAll(SArray* pArray, const SArray* pInput) {
|
||||||
|
return taosArrayAddBatch(pArray, pInput->pData, (int32_t) taosArrayGetSize(pInput));
|
||||||
|
}
|
||||||
|
|
||||||
void* taosArrayPop(SArray* pArray) {
|
void* taosArrayPop(SArray* pArray) {
|
||||||
assert( pArray != NULL );
|
assert( pArray != NULL );
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue