[TD-9] fix crash while create table
This commit is contained in:
parent
b6313b346f
commit
66de5c58c9
|
@ -1752,7 +1752,7 @@ int tscBuildMetricMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
||||||
|
|
||||||
msgLen = pMsg - pStart;
|
msgLen = pMsg - pStart;
|
||||||
pCmd->payloadLen = msgLen;
|
pCmd->payloadLen = msgLen;
|
||||||
pCmd->msgType = TSDB_MSG_TYPE_CM_STABLE_META;
|
pCmd->msgType = TSDB_MSG_TYPE_CM_STABLE_VGROUP;
|
||||||
assert(msgLen + minMsgSize() <= size);
|
assert(msgLen + minMsgSize() <= size);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
|
@ -738,8 +738,8 @@ typedef struct {
|
||||||
} SDMConfigTableMsg;
|
} SDMConfigTableMsg;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
uint32_t dnode;
|
uint32_t dnodeId;
|
||||||
int32_t vnode;
|
int32_t vgId;
|
||||||
} SDMConfigVnodeMsg;
|
} SDMConfigVnodeMsg;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
|
|
@ -22,22 +22,23 @@ extern "C" {
|
||||||
|
|
||||||
#include "mnode.h"
|
#include "mnode.h"
|
||||||
|
|
||||||
void mgmtAddVgroupIntoDb(SVgObj *pVgroup);
|
// api
|
||||||
void mgmtAddVgroupIntoDbTail(SVgObj *pVgroup);
|
|
||||||
void mgmtRemoveVgroupFromDb(SVgObj *pVgroup);
|
|
||||||
void mgmtMoveVgroupToTail(SVgObj *pVgroup);
|
|
||||||
void mgmtMoveVgroupToHead(SVgObj *pVgroup);
|
|
||||||
|
|
||||||
int32_t mgmtInitDbs();
|
int32_t mgmtInitDbs();
|
||||||
void mgmtCleanUpDbs();
|
void mgmtCleanUpDbs();
|
||||||
SDbObj *mgmtGetDb(char *db);
|
SDbObj *mgmtGetDb(char *db);
|
||||||
SDbObj *mgmtGetDbByTableId(char *db);
|
SDbObj *mgmtGetDbByTableId(char *db);
|
||||||
bool mgmtCheckIsMonitorDB(char *db, char *monitordb);
|
bool mgmtCheckIsMonitorDB(char *db, char *monitordb);
|
||||||
|
|
||||||
|
// util func
|
||||||
void mgmtAddSuperTableIntoDb(SDbObj *pDb);
|
void mgmtAddSuperTableIntoDb(SDbObj *pDb);
|
||||||
void mgmtRemoveSuperTableFromDb(SDbObj *pDb);
|
void mgmtRemoveSuperTableFromDb(SDbObj *pDb);
|
||||||
void mgmtAddTableIntoDb(SDbObj *pDb);
|
void mgmtAddTableIntoDb(SDbObj *pDb);
|
||||||
void mgmtRemoveTableFromDb(SDbObj *pDb);
|
void mgmtRemoveTableFromDb(SDbObj *pDb);
|
||||||
|
void mgmtAddVgroupIntoDb(SVgObj *pVgroup);
|
||||||
|
void mgmtAddVgroupIntoDbTail(SVgObj *pVgroup);
|
||||||
|
void mgmtRemoveVgroupFromDb(SVgObj *pVgroup);
|
||||||
|
void mgmtMoveVgroupToTail(SVgObj *pVgroup);
|
||||||
|
void mgmtMoveVgroupToHead(SVgObj *pVgroup);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -53,7 +53,7 @@ typedef struct {
|
||||||
int32_t (*destroyFp)(SSdbOperDesc *pDesc);
|
int32_t (*destroyFp)(SSdbOperDesc *pDesc);
|
||||||
} SSdbTableDesc;
|
} SSdbTableDesc;
|
||||||
|
|
||||||
void *sdbOpenTable(SSdbTableDesc *desc);
|
void * sdbOpenTable(SSdbTableDesc *desc);
|
||||||
void sdbCloseTable(void *handle);
|
void sdbCloseTable(void *handle);
|
||||||
|
|
||||||
int32_t sdbInsertRow(SSdbOperDesc *pOper);
|
int32_t sdbInsertRow(SSdbOperDesc *pOper);
|
||||||
|
|
|
@ -27,12 +27,10 @@ extern "C" {
|
||||||
int32_t mgmtInitVgroups();
|
int32_t mgmtInitVgroups();
|
||||||
void mgmtCleanUpVgroups();
|
void mgmtCleanUpVgroups();
|
||||||
SVgObj *mgmtGetVgroup(int32_t vgId);
|
SVgObj *mgmtGetVgroup(int32_t vgId);
|
||||||
SVgObj *mgmtGetVgroupByVnode(uint32_t dnode, int32_t vnode);
|
|
||||||
void mgmtDropAllVgroups(SDbObj *pDropDb);
|
void mgmtDropAllVgroups(SDbObj *pDropDb);
|
||||||
|
|
||||||
void mgmtCreateVgroup(SQueuedMsg *pMsg);
|
void mgmtCreateVgroup(SQueuedMsg *pMsg);
|
||||||
void mgmtDropVgroup(SVgObj *pVgroup, void *ahandle);
|
void mgmtDropVgroup(SVgObj *pVgroup, void *ahandle);
|
||||||
void mgmtUpdateVgroup(SVgObj *pVgroup);
|
|
||||||
void mgmtAlterVgroup(SVgObj *pVgroup, void *ahandle);
|
void mgmtAlterVgroup(SVgObj *pVgroup, void *ahandle);
|
||||||
SVgObj *mgmtGetAvailableVgroup(SDbObj *pDb);
|
SVgObj *mgmtGetAvailableVgroup(SDbObj *pDb);
|
||||||
|
|
||||||
|
|
|
@ -332,7 +332,6 @@ static void *mgmtBuildCreateChildTableMsg(SCMCreateTableMsg *pMsg, SChildTableOb
|
||||||
}
|
}
|
||||||
|
|
||||||
memcpy(pCreate->tableId, pTable->info.tableId, TSDB_TABLE_ID_LEN + 1);
|
memcpy(pCreate->tableId, pTable->info.tableId, TSDB_TABLE_ID_LEN + 1);
|
||||||
memcpy(pCreate->superTableId, pTable->superTable->info.tableId, TSDB_TABLE_ID_LEN + 1);
|
|
||||||
pCreate->contLen = htonl(contLen);
|
pCreate->contLen = htonl(contLen);
|
||||||
pCreate->vgId = htonl(pTable->vgId);
|
pCreate->vgId = htonl(pTable->vgId);
|
||||||
pCreate->tableType = pTable->info.type;
|
pCreate->tableType = pTable->info.type;
|
||||||
|
@ -342,6 +341,7 @@ static void *mgmtBuildCreateChildTableMsg(SCMCreateTableMsg *pMsg, SChildTableOb
|
||||||
pCreate->uid = htobe64(pTable->uid);
|
pCreate->uid = htobe64(pTable->uid);
|
||||||
|
|
||||||
if (pTable->info.type == TSDB_CHILD_TABLE) {
|
if (pTable->info.type == TSDB_CHILD_TABLE) {
|
||||||
|
memcpy(pCreate->superTableId, pTable->superTable->info.tableId, TSDB_TABLE_ID_LEN + 1);
|
||||||
pCreate->numOfColumns = htons(pTable->superTable->numOfColumns);
|
pCreate->numOfColumns = htons(pTable->superTable->numOfColumns);
|
||||||
pCreate->numOfTags = htons(pTable->superTable->numOfTags);
|
pCreate->numOfTags = htons(pTable->superTable->numOfTags);
|
||||||
pCreate->sversion = htonl(pTable->superTable->sversion);
|
pCreate->sversion = htonl(pTable->superTable->sversion);
|
||||||
|
|
|
@ -41,7 +41,6 @@ static int32_t tsDbUpdateSize;
|
||||||
static int32_t mgmtCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate);
|
static int32_t mgmtCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate);
|
||||||
static void mgmtDropDb(void *handle, void *tmrId);
|
static void mgmtDropDb(void *handle, void *tmrId);
|
||||||
static int32_t mgmtSetDbDirty(SDbObj *pDb);
|
static int32_t mgmtSetDbDirty(SDbObj *pDb);
|
||||||
|
|
||||||
static int32_t mgmtGetDbMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
|
static int32_t mgmtGetDbMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
|
||||||
static int32_t mgmtRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void *pConn);
|
static int32_t mgmtRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void *pConn);
|
||||||
static void mgmtProcessCreateDbMsg(SQueuedMsg *pMsg);
|
static void mgmtProcessCreateDbMsg(SQueuedMsg *pMsg);
|
||||||
|
|
|
@ -25,7 +25,7 @@
|
||||||
#include "mgmtShell.h"
|
#include "mgmtShell.h"
|
||||||
#include "mgmtUser.h"
|
#include "mgmtUser.h"
|
||||||
|
|
||||||
void *tsUserSdb = NULL;
|
static void *tsUserSdb = NULL;
|
||||||
static int32_t tsUserUpdateSize = 0;
|
static int32_t tsUserUpdateSize = 0;
|
||||||
|
|
||||||
static int32_t mgmtCreateUser(SAcctObj *pAcct, char *name, char *pass);
|
static int32_t mgmtCreateUser(SAcctObj *pAcct, char *name, char *pass);
|
||||||
|
@ -313,7 +313,9 @@ static int32_t mgmtRetrieveUsers(SShowObj *pShow, char *data, int32_t rows, void
|
||||||
SUserObj *mgmtGetUserFromConn(void *pConn, bool *usePublicIp) {
|
SUserObj *mgmtGetUserFromConn(void *pConn, bool *usePublicIp) {
|
||||||
SRpcConnInfo connInfo;
|
SRpcConnInfo connInfo;
|
||||||
if (rpcGetConnInfo(pConn, &connInfo) == 0) {
|
if (rpcGetConnInfo(pConn, &connInfo) == 0) {
|
||||||
|
if (usePublicIp) {
|
||||||
*usePublicIp = (connInfo.serverIp == tsPublicIpInt);
|
*usePublicIp = (connInfo.serverIp == tsPublicIpInt);
|
||||||
|
}
|
||||||
return mgmtGetUser(connInfo.user);
|
return mgmtGetUser(connInfo.user);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -67,8 +67,8 @@ static int32_t mgmtVgroupActionInsert(SSdbOperDesc *pOper) {
|
||||||
pVgroup->prev = NULL;
|
pVgroup->prev = NULL;
|
||||||
pVgroup->next = NULL;
|
pVgroup->next = NULL;
|
||||||
|
|
||||||
int32_t size = sizeof(STableInfo *) * pDb->cfg.maxSessions;
|
int32_t size = sizeof(SChildTableObj *) * pDb->cfg.maxSessions;
|
||||||
pVgroup->tableList = (STableInfo **)calloc(pDb->cfg.maxSessions, sizeof(STableInfo *));
|
pVgroup->tableList = calloc(pDb->cfg.maxSessions, sizeof(SChildTableObj *));
|
||||||
if (pVgroup->tableList == NULL) {
|
if (pVgroup->tableList == NULL) {
|
||||||
mError("vgroup:%d, failed to malloc(size:%d) for the tableList of vgroups", pVgroup->vgId, size);
|
mError("vgroup:%d, failed to malloc(size:%d) for the tableList of vgroups", pVgroup->vgId, size);
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -112,8 +112,8 @@ static int32_t mgmtVgroupActionUpdate(SSdbOperDesc *pOper) {
|
||||||
if (pDb->cfg.maxSessions != oldTables) {
|
if (pDb->cfg.maxSessions != oldTables) {
|
||||||
mPrint("vgroup:%d tables change from %d to %d", pVgroup->vgId, oldTables, pDb->cfg.maxSessions);
|
mPrint("vgroup:%d tables change from %d to %d", pVgroup->vgId, oldTables, pDb->cfg.maxSessions);
|
||||||
taosUpdateIdPool(pVgroup->idPool, pDb->cfg.maxSessions);
|
taosUpdateIdPool(pVgroup->idPool, pDb->cfg.maxSessions);
|
||||||
int32_t size = sizeof(STableInfo *) * pDb->cfg.maxSessions;
|
int32_t size = sizeof(SChildTableObj *) * pDb->cfg.maxSessions;
|
||||||
pVgroup->tableList = (STableInfo **)realloc(pVgroup->tableList, size);
|
pVgroup->tableList = (SChildTableObj **)realloc(pVgroup->tableList, size);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -483,17 +483,14 @@ SMDCreateVnodeMsg *mgmtBuildCreateVnodeMsg(SVgObj *pVgroup) {
|
||||||
return pVnode;
|
return pVnode;
|
||||||
}
|
}
|
||||||
|
|
||||||
SVgObj *mgmtGetVgroupByVnode(uint32_t dnode, int32_t vnode) {
|
static SVgObj *mgmtGetVgroupInDnode(uint32_t dnodeId, int32_t vgId) {
|
||||||
if (vnode < 0 || vnode >= TSDB_MAX_VNODES) {
|
if (vnovgId < 1 || dnodeId < 1) return NULL;
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
SDnodeObj *pDnode = mgmtGetDnode(dnode);
|
SDnodeObj *pDnode = mgmtGetDnode(dnodeId);
|
||||||
if (pDnode == NULL) {
|
if (pDnode == NULL) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t vgId = pDnode->vload[vnode].vgId;
|
|
||||||
return mgmtGetVgroup(vgId);
|
return mgmtGetVgroup(vgId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -660,10 +657,10 @@ static void mgmtProcessVnodeCfgMsg(SRpcMsg *rpcMsg) {
|
||||||
if (mgmtCheckRedirect(rpcMsg->handle)) return;
|
if (mgmtCheckRedirect(rpcMsg->handle)) return;
|
||||||
|
|
||||||
SDMConfigVnodeMsg *pCfg = (SDMConfigVnodeMsg *) rpcMsg->pCont;
|
SDMConfigVnodeMsg *pCfg = (SDMConfigVnodeMsg *) rpcMsg->pCont;
|
||||||
pCfg->dnode = htonl(pCfg->dnode);
|
pCfg->dnodeId = htonl(pCfg->dnode);
|
||||||
pCfg->vnode = htonl(pCfg->vnode);
|
pCfg->vgId = htonl(pCfg->vnode);
|
||||||
|
|
||||||
SVgObj *pVgroup = mgmtGetVgroupByVnode(pCfg->dnode, pCfg->vnode);
|
SVgObj *pVgroup = mgmtGetVgroupInDnode(pCfg->dnodeId, pCfg->vgId);
|
||||||
if (pVgroup == NULL) {
|
if (pVgroup == NULL) {
|
||||||
mTrace("dnode:%s, vnode:%d, no vgroup info", taosIpStr(pCfg->dnode), pCfg->vnode);
|
mTrace("dnode:%s, vnode:%d, no vgroup info", taosIpStr(pCfg->dnode), pCfg->vnode);
|
||||||
mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_NOT_ACTIVE_VNODE);
|
mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_NOT_ACTIVE_VNODE);
|
||||||
|
|
|
@ -360,7 +360,7 @@ void rpcSendRequest(void *shandle, SRpcIpSet *pIpSet, SRpcMsg *pMsg) {
|
||||||
// for TDengine, all the query, show commands shall have TCP connection
|
// for TDengine, all the query, show commands shall have TCP connection
|
||||||
char type = pMsg->msgType;
|
char type = pMsg->msgType;
|
||||||
if (type == TSDB_MSG_TYPE_QUERY || type == TSDB_MSG_TYPE_RETRIEVE ||
|
if (type == TSDB_MSG_TYPE_QUERY || type == TSDB_MSG_TYPE_RETRIEVE ||
|
||||||
type == TSDB_MSG_TYPE_CM_STABLE_META || type == TSDB_MSG_TYPE_CM_TABLES_META ||
|
type == TSDB_MSG_TYPE_CM_STABLE_VGROUP || type == TSDB_MSG_TYPE_CM_TABLES_META ||
|
||||||
type == TSDB_MSG_TYPE_CM_SHOW )
|
type == TSDB_MSG_TYPE_CM_SHOW )
|
||||||
pContext->connType = RPC_CONN_TCPC;
|
pContext->connType = RPC_CONN_TCPC;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue