[td-4151]<enhance>: improve the validation procedure of sql statement.
This commit is contained in:
parent
a901745a03
commit
6c4c8e37fd
|
@ -305,6 +305,8 @@ void tscTryQueryNextVnode(SSqlObj *pSql, __async_cb_func_t fp);
|
|||
void tscAsyncQuerySingleRowForNextVnode(void *param, TAOS_RES *tres, int numOfRows);
|
||||
void tscTryQueryNextClause(SSqlObj* pSql, __async_cb_func_t fp);
|
||||
int tscSetMgmtEpSetFromCfg(const char *first, const char *second, SRpcCorEpSet *corEpSet);
|
||||
int32_t getMultiTableMetaFromMnode(SSqlObj *pSql, SArray* pNameList, bool loadVgroupInfo);
|
||||
int tscTransferTableNameList(SSqlObj *pSql, const char *pNameList, int32_t length);
|
||||
|
||||
bool tscSetSqlOwner(SSqlObj* pSql);
|
||||
void tscClearSqlOwner(SSqlObj* pSql);
|
||||
|
|
|
@ -7127,6 +7127,10 @@ static int32_t getTableNameFromSubquery(SSqlNode* pSqlNode, SArray* tableNameLis
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static void freeElem(void* p) {
|
||||
tfree(p);
|
||||
}
|
||||
|
||||
int32_t loadAllTableMeta(SSqlObj* pSql, struct SSqlInfo* pInfo) {
|
||||
SSqlCmd* pCmd = &pSql->cmd;
|
||||
|
||||
|
@ -7164,7 +7168,7 @@ int32_t loadAllTableMeta(SSqlObj* pSql, struct SSqlInfo* pInfo) {
|
|||
int32_t numOfTables = taosArrayGetSize(tableNameList);
|
||||
STableMeta* pTableMeta = calloc(1, maxSize);
|
||||
|
||||
SArray* plist = taosArrayInit(4, sizeof(SName));
|
||||
SArray* plist = taosArrayInit(4, POINTER_BYTES);
|
||||
for(int32_t i = 0; i < numOfTables; ++i) {
|
||||
SName* pname = taosArrayGet(tableNameList, i);
|
||||
tNameExtractFullName(pname, name);
|
||||
|
@ -7185,16 +7189,19 @@ int32_t loadAllTableMeta(SSqlObj* pSql, struct SSqlInfo* pInfo) {
|
|||
STableMeta* pMeta = tscTableMetaDup(pTableMeta);
|
||||
taosHashPut(pCmd->pTableMetaMap, name, strlen(name), &pMeta, POINTER_BYTES);
|
||||
} else {// add to the retrieve table meta array list.
|
||||
taosArrayPush(plist, pname);
|
||||
char* t = strdup(name);
|
||||
taosArrayPush(plist, &t);
|
||||
}
|
||||
}
|
||||
|
||||
// load the table meta for a given table name list
|
||||
if (taosArrayGetSize(plist) > 0) {
|
||||
doGetTableMetaFromMnode();
|
||||
int32_t code = getMultiTableMetaFromMnode(pSql, plist, true);
|
||||
taosArrayDestroyEx(plist, freeElem);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
// return getTableMetaFromMnode(pSql, pTableMetaInfo);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
|
@ -1719,44 +1719,36 @@ int tscBuildTableMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|||
|
||||
/**
|
||||
* multi table meta req pkg format:
|
||||
* | SMgmtHead | SMultiTableInfoMsg | tableId0 | tableId1 | tableId2 | ......
|
||||
* no used 4B
|
||||
* |SMultiTableInfoMsg | tableId0 | tableId1 | tableId2 | ......
|
||||
* 4B
|
||||
**/
|
||||
int tscBuildMultiMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
||||
#if 0
|
||||
int tscBuildMultiTableMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
||||
SSqlCmd *pCmd = &pSql->cmd;
|
||||
|
||||
// copy payload content to temp buff
|
||||
char *tmpData = 0;
|
||||
if (pCmd->payloadLen > 0) {
|
||||
if ((tmpData = calloc(1, pCmd->payloadLen + 1)) == NULL) return -1;
|
||||
memcpy(tmpData, pCmd->payload, pCmd->payloadLen);
|
||||
}
|
||||
// copy payload content to temp buf
|
||||
// char *tmpData = 0;
|
||||
// if (pCmd->payloadLen > 0) {
|
||||
// if ((tmpData = calloc(1, pCmd->payloadLen + 1)) == NULL) return -1;
|
||||
// memcpy(tmpData, pCmd->payload, pCmd->payloadLen);
|
||||
// }
|
||||
|
||||
// fill head info
|
||||
SMgmtHead *pMgmt = (SMgmtHead *)(pCmd->payload + tsRpcHeadSize);
|
||||
memset(pMgmt->db, 0, TSDB_TABLE_FNAME_LEN); // server don't need the db
|
||||
// SMultiTableInfoMsg *pInfoMsg = (SMultiTableInfoMsg *)(pCmd->payload);
|
||||
// pInfoMsg->numOfTables = htonl((int32_t)pCmd->count);
|
||||
//
|
||||
// if (pCmd->payloadLen > 0) {
|
||||
// memcpy(pInfoMsg->tableIds, tmpData, pCmd->payloadLen);
|
||||
// }
|
||||
//
|
||||
// tfree(tmpData);
|
||||
|
||||
SMultiTableInfoMsg *pInfoMsg = (SMultiTableInfoMsg *)(pCmd->payload + tsRpcHeadSize + sizeof(SMgmtHead));
|
||||
pInfoMsg->numOfTables = htonl((int32_t)pCmd->count);
|
||||
|
||||
if (pCmd->payloadLen > 0) {
|
||||
memcpy(pInfoMsg->tableIds, tmpData, pCmd->payloadLen);
|
||||
}
|
||||
|
||||
tfree(tmpData);
|
||||
|
||||
pCmd->payloadLen += sizeof(SMgmtHead) + sizeof(SMultiTableInfoMsg);
|
||||
// pCmd->payloadLen += sizeof(SMgmtHead) + sizeof(SMultiTableInfoMsg);
|
||||
pCmd->msgType = TSDB_MSG_TYPE_CM_TABLES_META;
|
||||
|
||||
assert(pCmd->payloadLen + minMsgSize() <= pCmd->allocSize);
|
||||
|
||||
tscDebug("0x%"PRIx64" build load multi-metermeta msg completed, numOfTables:%d, msg size:%d", pSql->self, pCmd->count,
|
||||
tscDebug("0x%"PRIx64" build load multi-tablemeta msg completed, numOfTables:%d, msg size:%d", pSql->self, pCmd->count,
|
||||
pCmd->payloadLen);
|
||||
|
||||
return pCmd->payloadLen;
|
||||
#endif
|
||||
return 0;
|
||||
}
|
||||
|
||||
int tscBuildSTableVgroupMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
||||
|
@ -2443,6 +2435,63 @@ static int32_t getTableMetaFromMnode(SSqlObj *pSql, STableMetaInfo *pTableMetaIn
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t getMultiTableMetaFromMnode(SSqlObj *pSql, SArray* pNameList, bool loadVgroupInfo) {
|
||||
SSqlObj *pNew = calloc(1, sizeof(SSqlObj));
|
||||
if (NULL == pNew) {
|
||||
tscError("0x%"PRIx64" failed to allocate sqlobj to get multiple table meta", pSql->self);
|
||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
pNew->pTscObj = pSql->pTscObj;
|
||||
pNew->signature = pNew;
|
||||
pNew->cmd.command = TSDB_SQL_MULTI_META;
|
||||
|
||||
int32_t numOfTables = taosArrayGetSize(pNameList);
|
||||
int32_t size = numOfTables * TSDB_TABLE_FNAME_LEN + sizeof(SMultiTableInfoMsg);
|
||||
if (TSDB_CODE_SUCCESS != tscAllocPayload(&pNew->cmd, size)) {
|
||||
tscError("0x%"PRIx64" malloc failed for payload to get table meta", pSql->self);
|
||||
tscFreeSqlObj(pNew);
|
||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
SMultiTableInfoMsg* pInfo = (SMultiTableInfoMsg*) pNew->cmd.payload;
|
||||
pInfo->loadVgroup = htonl(loadVgroupInfo? 1:0);
|
||||
pInfo->numOfTables = htonl(numOfTables);
|
||||
|
||||
char* start = pInfo->tableNames;
|
||||
int32_t len = 0;
|
||||
for(int32_t i = 0; i < numOfTables; ++i) {
|
||||
char* name = taosArrayGetP(pNameList, i);
|
||||
if (i < numOfTables - 1) {
|
||||
len = sprintf(start, "%s,", name);
|
||||
} else {
|
||||
len = sprintf(start, "%s", name);
|
||||
}
|
||||
|
||||
start += len;
|
||||
}
|
||||
|
||||
pNew->cmd.payloadLen = (start - pInfo->tableNames) + sizeof(SMultiTableInfoMsg);
|
||||
pNew->cmd.msgType = TSDB_MSG_TYPE_CM_TABLES_META;
|
||||
|
||||
registerSqlObj(pNew);
|
||||
tscDebug("0x%"PRIx64" new pSqlObj:0x%"PRIx64" to get %d tableMeta, loadVgroup:%d, msg size:%d", pSql->self,
|
||||
pNew->self, numOfTables, loadVgroupInfo, pNew->cmd.payloadLen);
|
||||
|
||||
pNew->fp = tscTableMetaCallBack;
|
||||
pNew->param = (void *)pSql->self;
|
||||
|
||||
tscDebug("0x%"PRIx64" metaRid from %" PRId64 " to %" PRId64 , pSql->self, pSql->metaRid, pNew->self);
|
||||
|
||||
pSql->metaRid = pNew->self;
|
||||
int32_t code = tscBuildAndSendRequest(pNew, NULL);
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
code = TSDB_CODE_TSC_ACTION_IN_PROGRESS; // notify application that current process needs to be terminated
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tscGetTableMeta(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo) {
|
||||
assert(tIsValidName(&pTableMetaInfo->name));
|
||||
|
||||
|
@ -2604,7 +2653,6 @@ void tscInitMsgsFp() {
|
|||
tscBuildMsg[TSDB_SQL_USE_DB] = tscBuildUseDbMsg;
|
||||
tscBuildMsg[TSDB_SQL_META] = tscBuildTableMetaMsg;
|
||||
tscBuildMsg[TSDB_SQL_STABLEVGROUP] = tscBuildSTableVgroupMsg;
|
||||
tscBuildMsg[TSDB_SQL_MULTI_META] = tscBuildMultiMeterMetaMsg;
|
||||
|
||||
tscBuildMsg[TSDB_SQL_HB] = tscBuildHeartBeatMsg;
|
||||
tscBuildMsg[TSDB_SQL_SHOW] = tscBuildShowMsg;
|
||||
|
|
|
@ -924,92 +924,6 @@ int taos_validate_sql(TAOS *taos, const char *sql) {
|
|||
return code;
|
||||
}
|
||||
|
||||
static int tscParseTblNameList(SSqlObj *pSql, const char *tblNameList, int32_t tblListLen) {
|
||||
// must before clean the sqlcmd object
|
||||
tscResetSqlCmd(&pSql->cmd, false);
|
||||
|
||||
SSqlCmd *pCmd = &pSql->cmd;
|
||||
|
||||
pCmd->command = TSDB_SQL_MULTI_META;
|
||||
pCmd->count = 0;
|
||||
|
||||
int code = TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
|
||||
char *str = (char *)tblNameList;
|
||||
|
||||
SQueryInfo *pQueryInfo = tscGetQueryInfoS(pCmd, pCmd->clauseIndex);
|
||||
if (pQueryInfo == NULL) {
|
||||
pSql->res.code = terrno;
|
||||
return terrno;
|
||||
}
|
||||
|
||||
STableMetaInfo *pTableMetaInfo = tscAddEmptyMetaInfo(pQueryInfo);
|
||||
|
||||
if ((code = tscAllocPayload(pCmd, tblListLen + 16)) != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
char *nextStr;
|
||||
char tblName[TSDB_TABLE_FNAME_LEN];
|
||||
int payloadLen = 0;
|
||||
char *pMsg = pCmd->payload;
|
||||
while (1) {
|
||||
nextStr = strchr(str, ',');
|
||||
if (nextStr == NULL) {
|
||||
break;
|
||||
}
|
||||
|
||||
memcpy(tblName, str, nextStr - str);
|
||||
int32_t len = (int32_t)(nextStr - str);
|
||||
tblName[len] = '\0';
|
||||
|
||||
str = nextStr + 1;
|
||||
len = (int32_t)strtrim(tblName);
|
||||
|
||||
SStrToken sToken = {.n = len, .type = TK_ID, .z = tblName};
|
||||
tSQLGetToken(tblName, &sToken.type);
|
||||
|
||||
// Check if the table name available or not
|
||||
if (tscValidateName(&sToken) != TSDB_CODE_SUCCESS) {
|
||||
code = TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
|
||||
sprintf(pCmd->payload, "table name is invalid");
|
||||
return code;
|
||||
}
|
||||
|
||||
if ((code = tscSetTableFullName(&pTableMetaInfo->name, &sToken, pSql)) != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
if (++pCmd->count > TSDB_MULTI_TABLEMETA_MAX_NUM) {
|
||||
code = TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
|
||||
sprintf(pCmd->payload, "tables over the max number");
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t xlen = tNameLen(&pTableMetaInfo->name);
|
||||
if (payloadLen + xlen + 128 >= pCmd->allocSize) {
|
||||
char *pNewMem = realloc(pCmd->payload, pCmd->allocSize + tblListLen);
|
||||
if (pNewMem == NULL) {
|
||||
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
sprintf(pCmd->payload, "failed to allocate memory");
|
||||
return code;
|
||||
}
|
||||
|
||||
pCmd->payload = pNewMem;
|
||||
pCmd->allocSize = pCmd->allocSize + tblListLen;
|
||||
pMsg = pCmd->payload;
|
||||
}
|
||||
|
||||
char n[TSDB_TABLE_FNAME_LEN] = {0};
|
||||
tNameExtractFullName(&pTableMetaInfo->name, n);
|
||||
payloadLen += sprintf(pMsg + payloadLen, "%s,", n);
|
||||
}
|
||||
|
||||
*(pMsg + payloadLen) = '\0';
|
||||
pCmd->payloadLen = payloadLen + 1;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int taos_load_table_info(TAOS *taos, const char *tableNameList) {
|
||||
const int32_t MAX_TABLE_NAME_LENGTH = 12 * 1024 * 1024; // 12MB list
|
||||
|
||||
|
@ -1022,52 +936,33 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
|
|||
SSqlObj* pSql = calloc(1, sizeof(SSqlObj));
|
||||
pSql->pTscObj = taos;
|
||||
pSql->signature = pSql;
|
||||
pSql->fp = NULL; // todo set the correct callback function pointer
|
||||
|
||||
SSqlRes *pRes = &pSql->res;
|
||||
|
||||
pRes->code = 0;
|
||||
pRes->numOfTotal = 0; // the number of getting table meta from server
|
||||
pRes->numOfClauseTotal = 0;
|
||||
|
||||
assert(pSql->fp == NULL);
|
||||
tscDebug("0x%"PRIx64" tableNameList: %s pObj:%p", pSql->self, tableNameList, pObj);
|
||||
|
||||
int32_t tblListLen = (int32_t)strlen(tableNameList);
|
||||
if (tblListLen > MAX_TABLE_NAME_LENGTH) {
|
||||
tscError("0x%"PRIx64" tableNameList too long, length:%d, maximum allowed:%d", pSql->self, tblListLen, MAX_TABLE_NAME_LENGTH);
|
||||
int32_t length = (int32_t)strlen(tableNameList);
|
||||
if (length > MAX_TABLE_NAME_LENGTH) {
|
||||
tscError("0x%"PRIx64" tableNameList too long, length:%d, maximum allowed:%d", pSql->self, length, MAX_TABLE_NAME_LENGTH);
|
||||
tscFreeSqlObj(pSql);
|
||||
return TSDB_CODE_TSC_INVALID_SQL;
|
||||
}
|
||||
|
||||
char *str = calloc(1, tblListLen + 1);
|
||||
char *str = calloc(1, length + 1);
|
||||
if (str == NULL) {
|
||||
tscError("0x%"PRIx64" failed to malloc sql string buffer", pSql->self);
|
||||
tscError("0x%"PRIx64" failed to allocate sql string buffer", pSql->self);
|
||||
tscFreeSqlObj(pSql);
|
||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
strtolower(str, tableNameList);
|
||||
int32_t code = (uint8_t) tscParseTblNameList(pSql, str, tblListLen);
|
||||
int32_t code = (uint8_t) tscTransferTableNameList(pSql, str, length);
|
||||
|
||||
/*
|
||||
* set the qhandle to 0 before return in order to erase the qhandle value assigned in the previous successful query.
|
||||
* If qhandle is NOT set 0, the function of taos_free_result() will send message to server by calling tscBuildAndSendRequest()
|
||||
* to free connection, which may cause segment fault, when the parse phrase is not even successfully executed.
|
||||
*/
|
||||
pRes->qId = 0;
|
||||
free(str);
|
||||
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tscFreeSqlObj(pSql);
|
||||
return code;
|
||||
}
|
||||
|
||||
tscDoQuery(pSql);
|
||||
|
||||
tscDebug("0x%"PRIx64" load multi-table meta result:%d %s pObj:%p", pSql->self, pRes->code, taos_errstr(pSql), pObj);
|
||||
if ((code = pRes->code) != TSDB_CODE_SUCCESS) {
|
||||
tscFreeSqlObj(pSql);
|
||||
}
|
||||
|
||||
registerSqlObj(pSql);
|
||||
tscDebug("0x%"PRIx64" load multiple table meta, tableNameList: %s pObj:%p", pSql->self, tableNameList, pObj);
|
||||
executeQuery(pSql, NULL);
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -2352,7 +2352,6 @@ int32_t tscAddQueryInfo(SSqlCmd* pCmd) {
|
|||
}
|
||||
|
||||
pCmd->active = pQueryInfo;
|
||||
// pCmd->pQueryInfo[pCmd->numOfClause++] = pQueryInfo;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -3737,3 +3736,87 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt
|
|||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int tscTransferTableNameList(SSqlObj *pSql, const char *pNameList, int32_t length) {
|
||||
SSqlCmd *pCmd = &pSql->cmd;
|
||||
|
||||
pCmd->command = TSDB_SQL_MULTI_META;
|
||||
pCmd->count = 0;
|
||||
|
||||
int code = TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
|
||||
char *str = (char *)pNameList;
|
||||
|
||||
SQueryInfo *pQueryInfo = tscGetQueryInfoS(pCmd, pCmd->clauseIndex);
|
||||
if (pQueryInfo == NULL) {
|
||||
pSql->res.code = terrno;
|
||||
return terrno;
|
||||
}
|
||||
|
||||
STableMetaInfo *pTableMetaInfo = tscAddEmptyMetaInfo(pQueryInfo);
|
||||
|
||||
if ((code = tscAllocPayload(pCmd, length + 16)) != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
char *nextStr;
|
||||
char tblName[TSDB_TABLE_FNAME_LEN];
|
||||
int payloadLen = 0;
|
||||
char *pMsg = pCmd->payload;
|
||||
while (1) {
|
||||
nextStr = strchr(str, ',');
|
||||
if (nextStr == NULL) {
|
||||
break;
|
||||
}
|
||||
|
||||
memcpy(tblName, str, nextStr - str);
|
||||
int32_t len = (int32_t)(nextStr - str);
|
||||
tblName[len] = '\0';
|
||||
|
||||
str = nextStr + 1;
|
||||
len = (int32_t)strtrim(tblName);
|
||||
|
||||
SStrToken sToken = {.n = len, .type = TK_ID, .z = tblName};
|
||||
tSQLGetToken(tblName, &sToken.type);
|
||||
|
||||
// Check if the table name available or not
|
||||
if (tscValidateName(&sToken) != TSDB_CODE_SUCCESS) {
|
||||
code = TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
|
||||
sprintf(pCmd->payload, "table name is invalid");
|
||||
return code;
|
||||
}
|
||||
|
||||
if ((code = tscSetTableFullName(&pTableMetaInfo->name, &sToken, pSql)) != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
if (++pCmd->count > TSDB_MULTI_TABLEMETA_MAX_NUM) {
|
||||
code = TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
|
||||
sprintf(pCmd->payload, "tables over the max number");
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t xlen = tNameLen(&pTableMetaInfo->name);
|
||||
if (payloadLen + xlen + 128 >= pCmd->allocSize) {
|
||||
char *pNewMem = realloc(pCmd->payload, pCmd->allocSize + length);
|
||||
if (pNewMem == NULL) {
|
||||
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
sprintf(pCmd->payload, "failed to allocate memory");
|
||||
return code;
|
||||
}
|
||||
|
||||
pCmd->payload = pNewMem;
|
||||
pCmd->allocSize = pCmd->allocSize + length;
|
||||
pMsg = pCmd->payload;
|
||||
}
|
||||
|
||||
char n[TSDB_TABLE_FNAME_LEN] = {0};
|
||||
tNameExtractFullName(&pTableMetaInfo->name, n);
|
||||
payloadLen += sprintf(pMsg + payloadLen, "%s,", n);
|
||||
}
|
||||
|
||||
*(pMsg + payloadLen) = '\0';
|
||||
pCmd->payloadLen = payloadLen + 1;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
|
@ -84,7 +84,7 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_DROP_TABLE, "drop-table" )
|
|||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_ALTER_TABLE, "alter-table" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_TABLE_META, "table-meta" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_STABLE_VGROUP, "stable-vgroup" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_TABLES_META, "tables-meta" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_TABLES_META, "multiTable-meta" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_ALTER_STREAM, "alter-stream" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_SHOW, "show" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_RETRIEVE, "retrieve" )
|
||||
|
@ -703,8 +703,9 @@ typedef struct {
|
|||
} STableInfoMsg;
|
||||
|
||||
typedef struct {
|
||||
int32_t loadVgroup;
|
||||
int32_t numOfTables;
|
||||
char tableIds[];
|
||||
char tableNames[];
|
||||
} SMultiTableInfoMsg;
|
||||
|
||||
typedef struct SSTableVgroupMsg {
|
||||
|
@ -754,7 +755,7 @@ typedef struct STableMetaMsg {
|
|||
typedef struct SMultiTableMeta {
|
||||
int32_t numOfTables;
|
||||
int32_t contLen;
|
||||
char metas[];
|
||||
char meta[];
|
||||
} SMultiTableMeta;
|
||||
|
||||
typedef struct {
|
||||
|
|
|
@ -1674,12 +1674,9 @@ static int32_t mnodeSetSchemaFromSuperTable(SSchema *pSchema, SSTableObj *pTable
|
|||
return (pTable->numOfColumns + pTable->numOfTags) * sizeof(SSchema);
|
||||
}
|
||||
|
||||
static int32_t mnodeGetSuperTableMeta(SMnodeMsg *pMsg) {
|
||||
static int32_t mnodeDoGetSuperTableMeta(SMnodeMsg *pMsg, STableMetaMsg* pMeta) {
|
||||
SSTableObj *pTable = (SSTableObj *)pMsg->pTable;
|
||||
STableMetaMsg *pMeta = rpcMallocCont(sizeof(STableMetaMsg) + sizeof(SSchema) * (TSDB_MAX_TAGS + TSDB_MAX_COLUMNS + 16));
|
||||
if (pMeta == NULL) {
|
||||
return TSDB_CODE_MND_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
pMeta->uid = htobe64(pTable->uid);
|
||||
pMeta->sversion = htons(pTable->sversion);
|
||||
pMeta->tversion = htons(pTable->tversion);
|
||||
|
@ -1690,6 +1687,18 @@ static int32_t mnodeGetSuperTableMeta(SMnodeMsg *pMsg) {
|
|||
pMeta->contLen = sizeof(STableMetaMsg) + mnodeSetSchemaFromSuperTable(pMeta->schema, pTable);
|
||||
tstrncpy(pMeta->tableFname, pTable->info.tableId, sizeof(pMeta->tableFname));
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeGetSuperTableMeta(SMnodeMsg *pMsg) {
|
||||
SSTableObj *pTable = (SSTableObj *)pMsg->pTable;
|
||||
STableMetaMsg *pMeta = rpcMallocCont(sizeof(STableMetaMsg) + sizeof(SSchema) * (TSDB_MAX_TAGS + TSDB_MAX_COLUMNS + 16));
|
||||
if (pMeta == NULL) {
|
||||
return TSDB_CODE_MND_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
mnodeDoGetSuperTableMeta(pMsg, pMeta);
|
||||
|
||||
pMsg->rpcRsp.len = pMeta->contLen;
|
||||
pMeta->contLen = htons(pMeta->contLen);
|
||||
|
||||
|
@ -2417,7 +2426,7 @@ static int32_t mnodeDoGetChildTableMeta(SMnodeMsg *pMsg, STableMetaMsg *pMeta) {
|
|||
}
|
||||
pMeta->vgroup.vgId = htonl(pMsg->pVgroup->vgId);
|
||||
|
||||
mDebug("msg:%p, app:%p table:%s, uid:%" PRIu64 " table meta is retrieved, vgId:%d sid:%d", pMsg, pMsg->rpcMsg.ahandle,
|
||||
mDebug("msg:%p, app:%p table:%s, uid:%" PRIu64 " table meta is retrieved, vgId:%d tid:%d", pMsg, pMsg->rpcMsg.ahandle,
|
||||
pTable->info.tableId, pTable->uid, pTable->vgId, pTable->tid);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -2809,11 +2818,15 @@ static void mnodeProcessAlterTableRsp(SRpcMsg *rpcMsg) {
|
|||
}
|
||||
}
|
||||
|
||||
//TODO. set the vgroup info for the super table
|
||||
static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) {
|
||||
SMultiTableInfoMsg *pInfo = pMsg->rpcMsg.pCont;
|
||||
pInfo->numOfTables = htonl(pInfo->numOfTables);
|
||||
pInfo->loadVgroup = htonl(pInfo->loadVgroup);
|
||||
|
||||
// first malloc 4KB, subsequent reallocation will expand the size as twice of the original size
|
||||
int32_t totalMallocLen = 4 * 1024;
|
||||
|
||||
int32_t totalMallocLen = 4 * 1024 * 1024; // first malloc 4 MB, subsequent reallocation as twice
|
||||
SMultiTableMeta *pMultiMeta = rpcMallocCont(totalMallocLen);
|
||||
if (pMultiMeta == NULL) {
|
||||
return TSDB_CODE_MND_OUT_OF_MEMORY;
|
||||
|
@ -2823,13 +2836,21 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) {
|
|||
pMultiMeta->numOfTables = 0;
|
||||
|
||||
for (int32_t t = 0; t < pInfo->numOfTables; ++t) {
|
||||
char * tableId = (char *)(pInfo->tableIds + t * TSDB_TABLE_FNAME_LEN);
|
||||
SCTableObj *pTable = mnodeGetChildTable(tableId);
|
||||
if (pTable == NULL) continue;
|
||||
char *fullName = (char *)(pInfo->tableNames + t * TSDB_TABLE_FNAME_LEN);
|
||||
|
||||
if (pMsg->pTable == NULL) {
|
||||
pMsg->pTable = mnodeGetTable(fullName);
|
||||
if (pMsg->pTable == NULL) { // TODO: return error to client?
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
if (pMsg->pDb == NULL) {
|
||||
pMsg->pDb = mnodeGetDbByTableName(fullName);
|
||||
}
|
||||
|
||||
if (pMsg->pDb == NULL) pMsg->pDb = mnodeGetDbByTableName(tableId);
|
||||
if (pMsg->pDb == NULL || pMsg->pDb->status != TSDB_DB_STATUS_READY) {
|
||||
mnodeDecTableRef(pTable);
|
||||
mnodeDecTableRef(pMsg->pTable); // TODO: return error to client?
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -2838,23 +2859,26 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) {
|
|||
totalMallocLen *= 2;
|
||||
pMultiMeta = rpcReallocCont(pMultiMeta, totalMallocLen);
|
||||
if (pMultiMeta == NULL) {
|
||||
mnodeDecTableRef(pTable);
|
||||
mnodeDecTableRef(pMsg->pTable);
|
||||
return TSDB_CODE_MND_OUT_OF_MEMORY;
|
||||
} else {
|
||||
t--;
|
||||
mnodeDecTableRef(pTable);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
STableMetaMsg *pMeta = (STableMetaMsg *)(pMultiMeta->metas + pMultiMeta->contLen);
|
||||
int32_t code = mnodeDoGetChildTableMeta(pMsg, pMeta);
|
||||
STableMetaMsg *pMeta = (STableMetaMsg *)(pMultiMeta->meta + pMultiMeta->contLen);
|
||||
|
||||
int32_t code = 0;
|
||||
if (pMsg->pTable->type != TSDB_SUPER_TABLE) {
|
||||
code = mnodeDoGetChildTableMeta(pMsg, pMeta);
|
||||
} else {
|
||||
code = mnodeDoGetSuperTableMeta(pMsg, pMeta);
|
||||
}
|
||||
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
pMultiMeta->numOfTables ++;
|
||||
pMultiMeta->contLen += pMeta->contLen;
|
||||
}
|
||||
|
||||
mnodeDecTableRef(pTable);
|
||||
mnodeDecTableRef(pMsg->pTable);
|
||||
}
|
||||
|
||||
pMsg->rpcRsp.rsp = pMultiMeta;
|
||||
|
|
Loading…
Reference in New Issue