commit
6b32b5e567
|
@ -353,7 +353,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
|
|||
pRes->numOfGroups = 0;
|
||||
|
||||
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
|
||||
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);;
|
||||
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
|
||||
|
||||
TSKEY stime = MIN(pQueryInfo->window.skey, pQueryInfo->window.ekey);
|
||||
int64_t revisedSTime =
|
||||
|
|
|
@ -1473,7 +1473,7 @@ int tscBuildTableMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|||
pMsg += len;
|
||||
}
|
||||
|
||||
pCmd->payloadLen = pMsg - (char*)pInfoMsg;;
|
||||
pCmd->payloadLen = pMsg - (char*)pInfoMsg;
|
||||
pCmd->msgType = TSDB_MSG_TYPE_CM_TABLE_META;
|
||||
|
||||
tfree(tmpData);
|
||||
|
|
|
@ -499,7 +499,7 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
|
|||
if (pSql->sqlstr == NULL) {
|
||||
tscError("%p failed to malloc sql string buffer", pSql);
|
||||
tscFreeSqlObj(pSql);
|
||||
return NULL;;
|
||||
return NULL;
|
||||
}
|
||||
strtolower(pSql->sqlstr, sqlstr);
|
||||
|
||||
|
|
|
@ -118,6 +118,8 @@ void dnodeDispatchToMnodeWriteQueue(SRpcMsg *pMsg) {
|
|||
|
||||
SMnodeMsg *pWrite = (SMnodeMsg *)taosAllocateQitem(sizeof(SMnodeMsg));
|
||||
mnodeCreateMsg(pWrite, pMsg);
|
||||
|
||||
dTrace("app:%p:%p, msg:%s is put into mwrite queue", pWrite->rpcMsg.ahandle, pWrite, taosMsg[pWrite->rpcMsg.msgType]);
|
||||
taosWriteQitem(tsMWriteQueue, TAOS_QTYPE_RPC, pWrite);
|
||||
}
|
||||
|
||||
|
@ -128,6 +130,7 @@ static void dnodeFreeMnodeWriteMsg(SMnodeMsg *pWrite) {
|
|||
|
||||
void dnodeSendRpcMnodeWriteRsp(void *pRaw, int32_t code) {
|
||||
SMnodeMsg *pWrite = pRaw;
|
||||
if (pWrite == NULL) return;
|
||||
if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) return;
|
||||
if (code == TSDB_CODE_MND_ACTION_NEED_REPROCESSED) {
|
||||
dnodeReprocessMnodeWriteMsg(pWrite);
|
||||
|
@ -146,19 +149,21 @@ void dnodeSendRpcMnodeWriteRsp(void *pRaw, int32_t code) {
|
|||
}
|
||||
|
||||
static void *dnodeProcessMnodeWriteQueue(void *param) {
|
||||
SMnodeMsg *pWriteMsg;
|
||||
SMnodeMsg *pWrite;
|
||||
int32_t type;
|
||||
void * unUsed;
|
||||
|
||||
while (1) {
|
||||
if (taosReadQitemFromQset(tsMWriteQset, &type, (void **)&pWriteMsg, &unUsed) == 0) {
|
||||
if (taosReadQitemFromQset(tsMWriteQset, &type, (void **)&pWrite, &unUsed) == 0) {
|
||||
dTrace("dnodeProcessMnodeWriteQueue: got no message from qset, exiting...");
|
||||
break;
|
||||
}
|
||||
|
||||
dTrace("%p, msg:%s will be processed in mwrite queue", pWriteMsg->rpcMsg.ahandle, taosMsg[pWriteMsg->rpcMsg.msgType]);
|
||||
int32_t code = mnodeProcessWrite(pWriteMsg);
|
||||
dnodeSendRpcMnodeWriteRsp(pWriteMsg, code);
|
||||
dTrace("app:%p:%p, msg:%s will be processed in mwrite queue", pWrite->rpcMsg.ahandle, pWrite,
|
||||
taosMsg[pWrite->rpcMsg.msgType]);
|
||||
|
||||
int32_t code = mnodeProcessWrite(pWrite);
|
||||
dnodeSendRpcMnodeWriteRsp(pWrite, code);
|
||||
}
|
||||
|
||||
return NULL;
|
||||
|
@ -168,9 +173,15 @@ void dnodeReprocessMnodeWriteMsg(void *pMsg) {
|
|||
SMnodeMsg *pWrite = pMsg;
|
||||
|
||||
if (!mnodeIsRunning() || tsMWriteQueue == NULL) {
|
||||
dTrace("app:%p:%p, msg:%s is redirected for mnode not running, retry times:%d", pWrite->rpcMsg.ahandle, pWrite,
|
||||
taosMsg[pWrite->rpcMsg.msgType], pWrite->retry);
|
||||
|
||||
dnodeSendRedirectMsg(pMsg, true);
|
||||
dnodeFreeMnodeWriteMsg(pWrite);
|
||||
} else {
|
||||
dTrace("app:%p:%p, msg:%s is reput into mwrite queue, retry times:%d", pWrite->rpcMsg.ahandle, pWrite,
|
||||
taosMsg[pWrite->rpcMsg.msgType], pWrite->retry);
|
||||
|
||||
taosWriteQitem(tsMWriteQueue, TAOS_QTYPE_RPC, pWrite);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -113,8 +113,12 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_QUERY_ID, 0, 0x030C, "mnode inva
|
|||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_STREAM_ID, 0, 0x030D, "mnode invalid stream id")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_CONN_ID, 0, 0x030E, "mnode invalid connection")
|
||||
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SDB_OBJ_ALREADY_THERE, 0, 0x0320, "mnode object already there")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SDB_ERROR, 0, 0x0321, "mnode sdb error")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SDB_OBJ_ALREADY_THERE, 0, 0x0320, "[sdb] object already there")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SDB_ERROR, 0, 0x0321, "[sdb] app error")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE, 0, 0x0322, "[sdb] invalid table type")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SDB_OBJ_NOT_THERE, 0, 0x0323, "[sdb] object not there")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SDB_INVAID_META_ROW, 0, 0x0324, "[sdb] invalid meta row")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SDB_INVAID_KEY_TYPE, 0, 0x0325, "[sdb] invalid key type")
|
||||
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_DNODE_ALREADY_EXIST, 0, 0x0330, "mnode dnode already exist")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_DNODE_NOT_EXIST, 0, 0x0331, "mnode dnode not exist")
|
||||
|
|
|
@ -47,7 +47,7 @@ void mnodeDecDnodeRef(SDnodeObj *pDnode);
|
|||
void * mnodeGetDnode(int32_t dnodeId);
|
||||
void * mnodeGetDnodeByEp(char *ep);
|
||||
void mnodeUpdateDnode(SDnodeObj *pDnode);
|
||||
int32_t mnodeDropDnode(SDnodeObj *pDnode);
|
||||
int32_t mnodeDropDnode(SDnodeObj *pDnode, void *pMsg);
|
||||
|
||||
extern int32_t tsAccessSquence;
|
||||
|
||||
|
|
|
@ -36,10 +36,10 @@ extern int32_t sdbDebugFlag;
|
|||
#define mLWarn(...) { monitorSaveLog(1, __VA_ARGS__); mWarn(__VA_ARGS__) }
|
||||
#define mLPrint(...) { monitorSaveLog(0, __VA_ARGS__); mPrint(__VA_ARGS__) }
|
||||
|
||||
#define sdbError(...) { if (sdbDebugFlag & DEBUG_ERROR) { taosPrintLog("ERROR MND-SDB ", 255, __VA_ARGS__); }}
|
||||
#define sdbWarn(...) { if (sdbDebugFlag & DEBUG_WARN) { taosPrintLog("WARN MND-SDB ", sdbDebugFlag, __VA_ARGS__); }}
|
||||
#define sdbTrace(...) { if (sdbDebugFlag & DEBUG_TRACE) { taosPrintLog("MND-SDB ", sdbDebugFlag, __VA_ARGS__);}}
|
||||
#define sdbPrint(...) { taosPrintLog("MND-SDB ", 255, __VA_ARGS__); }
|
||||
#define sdbError(...) { if (sdbDebugFlag & DEBUG_ERROR) { taosPrintLog("ERROR SDB ", 255, __VA_ARGS__); }}
|
||||
#define sdbWarn(...) { if (sdbDebugFlag & DEBUG_WARN) { taosPrintLog("WARN SDB ", sdbDebugFlag, __VA_ARGS__); }}
|
||||
#define sdbTrace(...) { if (sdbDebugFlag & DEBUG_TRACE) { taosPrintLog("SDB ", sdbDebugFlag, __VA_ARGS__);}}
|
||||
#define sdbPrint(...) { taosPrintLog("SDB ", 255, __VA_ARGS__); }
|
||||
|
||||
#define sdbLError(...) { monitorSaveLog(2, __VA_ARGS__); sdbError(__VA_ARGS__) }
|
||||
#define sdbLWarn(...) { monitorSaveLog(1, __VA_ARGS__); sdbWarn(__VA_ARGS__) }
|
||||
|
|
|
@ -20,6 +20,8 @@
|
|||
extern "C" {
|
||||
#endif
|
||||
|
||||
struct SMnodeMsg;
|
||||
|
||||
typedef enum {
|
||||
SDB_TABLE_DNODE = 0,
|
||||
SDB_TABLE_MNODE = 1,
|
||||
|
@ -48,8 +50,11 @@ typedef struct {
|
|||
ESdbOper type;
|
||||
void * table;
|
||||
void * pObj;
|
||||
int32_t rowSize;
|
||||
void * rowData;
|
||||
int32_t rowSize;
|
||||
int32_t retCode; // for callback in sdb queue
|
||||
int32_t (*cb)(struct SMnodeMsg *pMsg, int32_t code);
|
||||
struct SMnodeMsg *pMsg;
|
||||
} SSdbOper;
|
||||
|
||||
typedef struct {
|
||||
|
|
|
@ -28,7 +28,8 @@ void * mnodeGetNextUser(void *pIter, SUserObj **pUser);
|
|||
void mnodeIncUserRef(SUserObj *pUser);
|
||||
void mnodeDecUserRef(SUserObj *pUser);
|
||||
SUserObj *mnodeGetUserFromConn(void *pConn);
|
||||
int32_t mnodeCreateUser(SAcctObj *pAcct, char *name, char *pass);
|
||||
char * mnodeGetUserFromMsg(void *pMnodeMsg);
|
||||
int32_t mnodeCreateUser(SAcctObj *pAcct, char *name, char *pass, void *pMsg);
|
||||
void mnodeDropAllUsers(SAcctObj *pAcct);
|
||||
|
||||
#ifdef __cplusplus
|
||||
|
|
|
@ -78,7 +78,9 @@ static int32_t mnodeAcctActionDecode(SSdbOper *pOper) {
|
|||
}
|
||||
|
||||
static int32_t mnodeAcctActionRestored() {
|
||||
if (dnodeIsFirstDeploy()) {
|
||||
int32_t numOfRows = sdbGetNumOfRows(tsAcctSdb);
|
||||
if (numOfRows <= 0 && dnodeIsFirstDeploy()) {
|
||||
mPrint("dnode first deploy, create root acct");
|
||||
int32_t code = mnodeCreateRootAcct();
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
mError("failed to create root account, reason:%s", tstrerror(code));
|
||||
|
|
|
@ -41,7 +41,7 @@
|
|||
static void * tsDbSdb = NULL;
|
||||
static int32_t tsDbUpdateSize;
|
||||
|
||||
static int32_t mnodeCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate);
|
||||
static int32_t mnodeCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate, void *pMsg);
|
||||
static int32_t mnodeDropDb(SMnodeMsg *newMsg);
|
||||
static int32_t mnodeSetDbDropping(SDbObj *pDb);
|
||||
static int32_t mnodeGetDbMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
|
||||
|
@ -311,7 +311,7 @@ static void mnodeSetDefaultDbCfg(SDbCfg *pCfg) {
|
|||
if (pCfg->replications < 0) pCfg->replications = tsReplications;
|
||||
}
|
||||
|
||||
static int32_t mnodeCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate) {
|
||||
static int32_t mnodeCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate, void *pMsg) {
|
||||
int32_t code = acctCheck(pAcct, ACCT_GRANT_DB);
|
||||
if (code != 0) return code;
|
||||
|
||||
|
@ -364,12 +364,15 @@ static int32_t mnodeCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate) {
|
|||
.table = tsDbSdb,
|
||||
.pObj = pDb,
|
||||
.rowSize = sizeof(SDbObj),
|
||||
.pMsg = pMsg
|
||||
};
|
||||
|
||||
code = sdbInsertRow(&oper);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tfree(pDb);
|
||||
code = TSDB_CODE_MND_SDB_ERROR;
|
||||
} else {
|
||||
mLPrint("db:%s, is created by %s", pDb->name, mnodeGetUserFromMsg(pMsg));
|
||||
if (pMsg != NULL) code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
||||
}
|
||||
|
||||
return code;
|
||||
|
@ -771,12 +774,7 @@ static int32_t mnodeProcessCreateDbMsg(SMnodeMsg *pMsg) {
|
|||
} else if (!pMsg->pUser->writeAuth) {
|
||||
code = TSDB_CODE_MND_NO_RIGHTS;
|
||||
} else {
|
||||
code = mnodeCreateDb(pMsg->pUser->pAcct, pCreate);
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
mLPrint("db:%s, is created by %s", pCreate->db, pMsg->pUser->user);
|
||||
} else {
|
||||
mError("db:%s, failed to create, reason:%s", pCreate->db, tstrerror(code));
|
||||
}
|
||||
code = mnodeCreateDb(pMsg->pUser->pAcct, pCreate, pMsg);
|
||||
}
|
||||
|
||||
return code;
|
||||
|
@ -893,7 +891,31 @@ static SDbCfg mnodeGetAlterDbOption(SDbObj *pDb, SCMAlterDbMsg *pAlter) {
|
|||
return newCfg;
|
||||
}
|
||||
|
||||
static int32_t mnodeAlterDb(SDbObj *pDb, SCMAlterDbMsg *pAlter) {
|
||||
static int32_t mnodeAlterDbCb(SMnodeMsg *pMsg, int32_t code) {
|
||||
if (code != TSDB_CODE_SUCCESS) return code;
|
||||
SDbObj *pDb = pMsg->pDb;
|
||||
|
||||
void *pIter = NULL;
|
||||
while (1) {
|
||||
SVgObj *pVgroup = NULL;
|
||||
pIter = mnodeGetNextVgroup(pIter, &pVgroup);
|
||||
if (pVgroup == NULL) break;
|
||||
if (pVgroup->pDb == pDb) {
|
||||
mnodeSendCreateVgroupMsg(pVgroup, NULL);
|
||||
}
|
||||
mnodeDecVgroupRef(pVgroup);
|
||||
}
|
||||
sdbFreeIter(pIter);
|
||||
|
||||
mTrace("db:%s, all vgroups is altered", pDb->name);
|
||||
mLPrint("db:%s, is alterd by %s", pDb->name, mnodeGetUserFromMsg(pMsg));
|
||||
|
||||
balanceNotify();
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeAlterDb(SDbObj *pDb, SCMAlterDbMsg *pAlter, void *pMsg) {
|
||||
SDbCfg newCfg = mnodeGetAlterDbOption(pDb, pAlter);
|
||||
if (terrno != TSDB_CODE_SUCCESS) {
|
||||
return terrno;
|
||||
|
@ -904,38 +926,24 @@ static int32_t mnodeAlterDb(SDbObj *pDb, SCMAlterDbMsg *pAlter) {
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t oldReplica = pDb->cfg.replications;
|
||||
|
||||
if (memcmp(&newCfg, &pDb->cfg, sizeof(SDbCfg)) != 0) {
|
||||
pDb->cfg = newCfg;
|
||||
pDb->cfgVersion++;
|
||||
SSdbOper oper = {
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.table = tsDbSdb,
|
||||
.pObj = pDb
|
||||
.pObj = pDb,
|
||||
.pMsg = pMsg,
|
||||
.cb = mnodeAlterDbCb
|
||||
};
|
||||
|
||||
int32_t code = sdbUpdateRow(&oper);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return TSDB_CODE_MND_SDB_ERROR;
|
||||
code = sdbUpdateRow(&oper);
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
if (pMsg != NULL) code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
||||
}
|
||||
}
|
||||
|
||||
void *pIter = NULL;
|
||||
while (1) {
|
||||
SVgObj *pVgroup = NULL;
|
||||
pIter = mnodeGetNextVgroup(pIter, &pVgroup);
|
||||
if (pVgroup == NULL) break;
|
||||
mnodeSendCreateVgroupMsg(pVgroup, NULL);
|
||||
mnodeDecVgroupRef(pVgroup);
|
||||
}
|
||||
sdbFreeIter(pIter);
|
||||
|
||||
if (oldReplica != pDb->cfg.replications) {
|
||||
balanceNotify();
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t mnodeProcessAlterDbMsg(SMnodeMsg *pMsg) {
|
||||
|
@ -948,14 +956,7 @@ static int32_t mnodeProcessAlterDbMsg(SMnodeMsg *pMsg) {
|
|||
return TSDB_CODE_MND_INVALID_DB;
|
||||
}
|
||||
|
||||
int32_t code = mnodeAlterDb(pMsg->pDb, pAlter);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
mError("db:%s, failed to alter, invalid db option", pAlter->db);
|
||||
return code;
|
||||
}
|
||||
|
||||
mTrace("db:%s, all vgroups is altered", pMsg->pDb->name);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return mnodeAlterDb(pMsg->pDb, pAlter, pMsg);
|
||||
}
|
||||
|
||||
static int32_t mnodeDropDb(SMnodeMsg *pMsg) {
|
||||
|
@ -965,11 +966,14 @@ static int32_t mnodeDropDb(SMnodeMsg *pMsg) {
|
|||
SSdbOper oper = {
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.table = tsDbSdb,
|
||||
.pObj = pDb
|
||||
.pObj = pDb,
|
||||
.pMsg = pMsg
|
||||
};
|
||||
|
||||
int32_t code = sdbDeleteRow(&oper);
|
||||
if (code != 0) {
|
||||
code = TSDB_CODE_MND_SDB_ERROR;
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
mLPrint("db:%s, is dropped by %s", pDb->name, mnodeGetUserFromMsg(pMsg));
|
||||
if (pMsg != NULL) code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
||||
}
|
||||
|
||||
return code;
|
||||
|
|
|
@ -44,7 +44,7 @@ static int32_t tsDnodeUpdateSize = 0;
|
|||
extern void * tsMnodeSdb;
|
||||
extern void * tsVgroupSdb;
|
||||
|
||||
static int32_t mnodeCreateDnode(char *ep);
|
||||
static int32_t mnodeCreateDnode(char *ep, SMnodeMsg *pMsg);
|
||||
static int32_t mnodeProcessCreateDnodeMsg(SMnodeMsg *pMsg);
|
||||
static int32_t mnodeProcessDropDnodeMsg(SMnodeMsg *pMsg);
|
||||
static int32_t mnodeProcessCfgDnodeMsg(SMnodeMsg *pMsg);
|
||||
|
@ -117,7 +117,8 @@ static int32_t mnodeDnodeActionDecode(SSdbOper *pOper) {
|
|||
static int32_t mnodeDnodeActionRestored() {
|
||||
int32_t numOfRows = sdbGetNumOfRows(tsDnodeSdb);
|
||||
if (numOfRows <= 0 && dnodeIsFirstDeploy()) {
|
||||
mnodeCreateDnode(tsLocalEp);
|
||||
mPrint("dnode first deploy, create dnode:%s", tsLocalEp);
|
||||
mnodeCreateDnode(tsLocalEp, NULL);
|
||||
SDnodeObj *pDnode = mnodeGetDnodeByEp(tsLocalEp);
|
||||
mnodeAddMnode(pDnode->dnodeId);
|
||||
mnodeDecDnodeRef(pDnode);
|
||||
|
@ -391,7 +392,7 @@ static int32_t mnodeProcessDnodeStatusMsg(SMnodeMsg *pMsg) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeCreateDnode(char *ep) {
|
||||
static int32_t mnodeCreateDnode(char *ep, SMnodeMsg *pMsg) {
|
||||
int32_t grantCode = grantCheck(TSDB_GRANT_DNODE);
|
||||
if (grantCode != TSDB_CODE_SUCCESS) {
|
||||
return grantCode;
|
||||
|
@ -415,7 +416,8 @@ static int32_t mnodeCreateDnode(char *ep) {
|
|||
.type = SDB_OPER_GLOBAL,
|
||||
.table = tsDnodeSdb,
|
||||
.pObj = pDnode,
|
||||
.rowSize = sizeof(SDnodeObj)
|
||||
.rowSize = sizeof(SDnodeObj),
|
||||
.pMsg = pMsg
|
||||
};
|
||||
|
||||
int32_t code = sdbInsertRow(&oper);
|
||||
|
@ -423,30 +425,32 @@ static int32_t mnodeCreateDnode(char *ep) {
|
|||
int dnodeId = pDnode->dnodeId;
|
||||
tfree(pDnode);
|
||||
mError("failed to create dnode:%d, result:%s", dnodeId, tstrerror(code));
|
||||
return TSDB_CODE_MND_SDB_ERROR;
|
||||
} else {
|
||||
mPrint("dnode:%d is created, result:%s", pDnode->dnodeId, tstrerror(code));
|
||||
if (pMsg != NULL) code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
||||
}
|
||||
|
||||
mPrint("dnode:%d is created, result:%s", pDnode->dnodeId, tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t mnodeDropDnode(SDnodeObj *pDnode) {
|
||||
int32_t mnodeDropDnode(SDnodeObj *pDnode, void *pMsg) {
|
||||
SSdbOper oper = {
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.table = tsDnodeSdb,
|
||||
.pObj = pDnode
|
||||
.pObj = pDnode,
|
||||
.pMsg = pMsg
|
||||
};
|
||||
|
||||
int32_t code = sdbDeleteRow(&oper);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
code = TSDB_CODE_MND_SDB_ERROR;
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
mLPrint("dnode:%d, is dropped from cluster, result:%s", pDnode->dnodeId, tstrerror(code));
|
||||
if (pMsg != NULL) code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
||||
}
|
||||
|
||||
mLPrint("dnode:%d, is dropped from cluster, result:%s", pDnode->dnodeId, tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t mnodeDropDnodeByEp(char *ep) {
|
||||
static int32_t mnodeDropDnodeByEp(char *ep, SMnodeMsg *pMsg) {
|
||||
SDnodeObj *pDnode = mnodeGetDnodeByEp(ep);
|
||||
if (pDnode == NULL) {
|
||||
mError("dnode:%s, is not exist", ep);
|
||||
|
@ -461,7 +465,7 @@ static int32_t mnodeDropDnodeByEp(char *ep) {
|
|||
|
||||
mPrint("dnode:%d, start to drop it", pDnode->dnodeId);
|
||||
#ifndef _SYNC
|
||||
return mnodeDropDnode(pDnode);
|
||||
return mnodeDropDnode(pDnode, pMsg);
|
||||
#else
|
||||
return balanceDropDnode(pDnode);
|
||||
#endif
|
||||
|
@ -473,17 +477,7 @@ static int32_t mnodeProcessCreateDnodeMsg(SMnodeMsg *pMsg) {
|
|||
if (strcmp(pMsg->pUser->user, "root") != 0) {
|
||||
return TSDB_CODE_MND_NO_RIGHTS;
|
||||
} else {
|
||||
int32_t code = mnodeCreateDnode(pCreate->ep);
|
||||
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
SDnodeObj *pDnode = mnodeGetDnodeByEp(pCreate->ep);
|
||||
mLPrint("dnode:%d, %s is created by %s", pDnode->dnodeId, pCreate->ep, pMsg->pUser->user);
|
||||
mnodeDecDnodeRef(pDnode);
|
||||
} else {
|
||||
mError("failed to create dnode:%s, reason:%s", pCreate->ep, tstrerror(code));
|
||||
}
|
||||
|
||||
return code;
|
||||
return mnodeCreateDnode(pCreate->ep, pMsg);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -493,15 +487,7 @@ static int32_t mnodeProcessDropDnodeMsg(SMnodeMsg *pMsg) {
|
|||
if (strcmp(pMsg->pUser->user, "root") != 0) {
|
||||
return TSDB_CODE_MND_NO_RIGHTS;
|
||||
} else {
|
||||
int32_t code = mnodeDropDnodeByEp(pDrop->ep);
|
||||
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
mLPrint("dnode:%s is dropped by %s", pDrop->ep, pMsg->pUser->user);
|
||||
} else {
|
||||
mError("failed to drop dnode:%s, reason:%s", pDrop->ep, tstrerror(code));
|
||||
}
|
||||
|
||||
return code;
|
||||
return mnodeDropDnodeByEp(pDrop->ep, pMsg);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -24,6 +24,7 @@
|
|||
#include "tsync.h"
|
||||
#include "tglobal.h"
|
||||
#include "dnode.h"
|
||||
#include "mnode.h"
|
||||
#include "mnodeDef.h"
|
||||
#include "mnodeInt.h"
|
||||
#include "mnodeMnode.h"
|
||||
|
@ -31,6 +32,7 @@
|
|||
#include "mnodeSdb.h"
|
||||
|
||||
#define SDB_TABLE_LEN 12
|
||||
#define SDB_SYNC_HACK 16
|
||||
|
||||
typedef enum {
|
||||
SDB_ACTION_INSERT,
|
||||
|
@ -83,8 +85,29 @@ typedef struct {
|
|||
void * row;
|
||||
} SSdbRow;
|
||||
|
||||
typedef struct {
|
||||
pthread_t thread;
|
||||
int32_t workerId;
|
||||
} SSdbWriteWorker;
|
||||
|
||||
typedef struct {
|
||||
int32_t num;
|
||||
SSdbWriteWorker *writeWorker;
|
||||
} SSdbWriteWorkerPool;
|
||||
|
||||
static SSdbObject tsSdbObj = {0};
|
||||
static taos_qset tsSdbWriteQset;
|
||||
static taos_qall tsSdbWriteQall;
|
||||
static taos_queue tsSdbWriteQueue;
|
||||
static SSdbWriteWorkerPool tsSdbPool;
|
||||
|
||||
static int sdbWrite(void *param, void *data, int type);
|
||||
static int sdbWriteToQueue(void *param, void *data, int type);
|
||||
static void * sdbWorkerFp(void *param);
|
||||
static int32_t sdbInitWriteWorker();
|
||||
static void sdbCleanupWriteWorker();
|
||||
static int32_t sdbAllocWriteQueue();
|
||||
static void sdbFreeWritequeue();
|
||||
|
||||
int32_t sdbGetId(void *handle) {
|
||||
return ((SSdbTable *)handle)->autoIndex;
|
||||
|
@ -302,7 +325,7 @@ void sdbUpdateSync() {
|
|||
syncInfo.ahandle = NULL;
|
||||
syncInfo.getWalInfo = sdbGetWalInfo;
|
||||
syncInfo.getFileInfo = sdbGetFileInfo;
|
||||
syncInfo.writeToCache = sdbWrite;
|
||||
syncInfo.writeToCache = sdbWriteToQueue;
|
||||
syncInfo.confirmForward = sdbConfirmForward;
|
||||
syncInfo.notifyRole = sdbNotifyRole;
|
||||
tsSdbObj.cfg = syncCfg;
|
||||
|
@ -319,6 +342,10 @@ int32_t sdbInit() {
|
|||
pthread_mutex_init(&tsSdbObj.mutex, NULL);
|
||||
sem_init(&tsSdbObj.sem, 0, 0);
|
||||
|
||||
if (sdbInitWriteWorker() != 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (sdbInitWal() != 0) {
|
||||
return -1;
|
||||
}
|
||||
|
@ -340,6 +367,8 @@ void sdbCleanUp() {
|
|||
|
||||
tsSdbObj.status = SDB_STATUS_CLOSING;
|
||||
|
||||
sdbCleanupWriteWorker();
|
||||
|
||||
if (tsSdbObj.sync) {
|
||||
syncStop(tsSdbObj.sync);
|
||||
tsSdbObj.sync = NULL;
|
||||
|
@ -475,7 +504,7 @@ static int32_t sdbDeleteHash(SSdbTable *pTable, SSdbOper *pOper) {
|
|||
pTable->numOfRows--;
|
||||
pthread_mutex_unlock(&pTable->mutex);
|
||||
|
||||
sdbTrace("table:%s, delete record:%s from hash, numOfRows:%" PRId64 "version:%" PRIu64, pTable->tableName,
|
||||
sdbTrace("table:%s, delete record:%s from hash, numOfRows:%" PRId64 " version:%" PRIu64, pTable->tableName,
|
||||
sdbGetKeyStrFromObj(pTable, pOper->pObj), pTable->numOfRows, sdbGetVersion());
|
||||
|
||||
int8_t *updateEnd = pOper->pObj + pTable->refCountPos - 1;
|
||||
|
@ -494,6 +523,7 @@ static int32_t sdbUpdateHash(SSdbTable *pTable, SSdbOper *pOper) {
|
|||
}
|
||||
|
||||
static int sdbWrite(void *param, void *data, int type) {
|
||||
SSdbOper *pOper = param;
|
||||
SWalHead *pHead = data;
|
||||
int32_t tableId = pHead->msgType / 10;
|
||||
int32_t action = pHead->msgType % 10;
|
||||
|
@ -531,21 +561,22 @@ static int sdbWrite(void *param, void *data, int type) {
|
|||
pthread_mutex_unlock(&tsSdbObj.mutex);
|
||||
return code;
|
||||
}
|
||||
walFsync(tsSdbObj.wal);
|
||||
|
||||
code = sdbForwardToPeer(pHead);
|
||||
pthread_mutex_unlock(&tsSdbObj.mutex);
|
||||
|
||||
// from app, oper is created
|
||||
if (param != NULL) {
|
||||
//sdbTrace("request from app is disposed, version:%" PRIu64 " code:%s", pHead->version, tstrerror(code));
|
||||
if (pOper != NULL) {
|
||||
sdbTrace("record from app is disposed, version:%" PRIu64 " result:%s", pHead->version, tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
// from wal or forward msg, oper not created, should add into hash
|
||||
if (tsSdbObj.sync != NULL) {
|
||||
sdbTrace("forward request is received, version:%" PRIu64 " result:%s, confirm it", pHead->version, tstrerror(code));
|
||||
sdbTrace("record from wal forward is disposed, version:%" PRIu64 " confirm it", pHead->version);
|
||||
syncConfirmForward(tsSdbObj.sync, pHead->version, code);
|
||||
} else {
|
||||
sdbTrace("record from wal restore is disposed, version:%" PRIu64 , pHead->version);
|
||||
}
|
||||
|
||||
if (action == SDB_ACTION_INSERT) {
|
||||
|
@ -568,7 +599,7 @@ static int sdbWrite(void *param, void *data, int type) {
|
|||
|
||||
int32_t sdbInsertRow(SSdbOper *pOper) {
|
||||
SSdbTable *pTable = (SSdbTable *)pOper->table;
|
||||
if (pTable == NULL) return -1;
|
||||
if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE;
|
||||
|
||||
if (sdbGetRowFromObj(pTable, pOper->pObj)) {
|
||||
sdbError("table:%s, failed to insert record:%s, already exist", pTable->tableName, sdbGetKeyStrFromObj(pTable, pOper->pObj));
|
||||
|
@ -587,9 +618,21 @@ int32_t sdbInsertRow(SSdbOper *pOper) {
|
|||
pthread_mutex_unlock(&pTable->mutex);
|
||||
}
|
||||
|
||||
if (pOper->type == SDB_OPER_GLOBAL) {
|
||||
int32_t size = sizeof(SWalHead) + pTable->maxRowSize;
|
||||
SWalHead *pHead = taosAllocateQitem(size);
|
||||
int32_t code = sdbInsertHash(pTable, pOper);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
sdbError("table:%s, failed to insert into hash", pTable->tableName);
|
||||
return code;
|
||||
}
|
||||
|
||||
// just insert data into memory
|
||||
if (pOper->type != SDB_OPER_GLOBAL) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t size = sizeof(SSdbOper) + sizeof(SWalHead) + pTable->maxRowSize + SDB_SYNC_HACK;
|
||||
SSdbOper *pNewOper = taosAllocateQitem(size);
|
||||
|
||||
SWalHead *pHead = (void *)pNewOper + sizeof(SSdbOper) + SDB_SYNC_HACK;
|
||||
pHead->version = 0;
|
||||
pHead->len = pOper->rowSize;
|
||||
pHead->msgType = pTable->tableId * 10 + SDB_ACTION_INSERT;
|
||||
|
@ -598,28 +641,43 @@ int32_t sdbInsertRow(SSdbOper *pOper) {
|
|||
(*pTable->encodeFp)(pOper);
|
||||
pHead->len = pOper->rowSize;
|
||||
|
||||
int32_t code = sdbWrite(pOper, pHead, pHead->msgType);
|
||||
taosFreeQitem(pHead);
|
||||
if (code < 0) return code;
|
||||
memcpy(pNewOper, pOper, sizeof(SSdbOper));
|
||||
|
||||
if (pNewOper->pMsg != NULL) {
|
||||
sdbTrace("app:%p:%p, insert action is add to sdb queue", pNewOper->pMsg->rpcMsg.ahandle, pNewOper->pMsg);
|
||||
}
|
||||
|
||||
return sdbInsertHash(pTable, pOper);
|
||||
taosWriteQitem(tsSdbWriteQueue, TAOS_QTYPE_RPC, pNewOper);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t sdbDeleteRow(SSdbOper *pOper) {
|
||||
SSdbTable *pTable = (SSdbTable *)pOper->table;
|
||||
if (pTable == NULL) return -1;
|
||||
if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE;
|
||||
|
||||
SSdbRow *pMeta = sdbGetRowMetaFromObj(pTable, pOper->pObj);
|
||||
if (pMeta == NULL) {
|
||||
sdbTrace("table:%s, record is not there, delete failed", pTable->tableName);
|
||||
return -1;
|
||||
return TSDB_CODE_MND_SDB_OBJ_NOT_THERE;
|
||||
}
|
||||
|
||||
void * pMetaRow = pMeta->row;
|
||||
assert(pMetaRow != NULL);
|
||||
void *pMetaRow = pMeta->row;
|
||||
if (pMetaRow == NULL) {
|
||||
sdbError("table:%s, record meta is null", pTable->tableName);
|
||||
return TSDB_CODE_MND_SDB_INVAID_META_ROW;
|
||||
}
|
||||
|
||||
int32_t code = sdbDeleteHash(pTable, pOper);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
sdbError("table:%s, failed to delete from hash", pTable->tableName);
|
||||
return code;
|
||||
}
|
||||
|
||||
// just delete data from memory
|
||||
if (pOper->type != SDB_OPER_GLOBAL) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (pOper->type == SDB_OPER_GLOBAL) {
|
||||
void * key = sdbGetObjKey(pTable, pOper->pObj);
|
||||
int32_t keySize = 0;
|
||||
switch (pTable->keyType) {
|
||||
|
@ -632,40 +690,59 @@ int32_t sdbDeleteRow(SSdbOper *pOper) {
|
|||
keySize = sizeof(uint32_t);
|
||||
break;
|
||||
default:
|
||||
return -1;
|
||||
return TSDB_CODE_MND_SDB_INVAID_KEY_TYPE;
|
||||
}
|
||||
|
||||
int32_t size = sizeof(SWalHead) + keySize;
|
||||
SWalHead *pHead = taosAllocateQitem(size);
|
||||
int32_t size = sizeof(SSdbOper) + sizeof(SWalHead) + keySize + SDB_SYNC_HACK;
|
||||
SSdbOper *pNewOper = taosAllocateQitem(size);
|
||||
|
||||
SWalHead *pHead = (void *)pNewOper + sizeof(SSdbOper) + SDB_SYNC_HACK;
|
||||
pHead->version = 0;
|
||||
pHead->len = keySize;
|
||||
pHead->msgType = pTable->tableId * 10 + SDB_ACTION_DELETE;
|
||||
memcpy(pHead->cont, key, keySize);
|
||||
|
||||
int32_t code = sdbWrite(pOper, pHead, pHead->msgType);
|
||||
taosFreeQitem(pHead);
|
||||
if (code < 0) return code;
|
||||
memcpy(pNewOper, pOper, sizeof(SSdbOper));
|
||||
|
||||
if (pNewOper->pMsg != NULL) {
|
||||
sdbTrace("app:%p:%p, delete action is add to sdb queue", pNewOper->pMsg->rpcMsg.ahandle, pNewOper->pMsg);
|
||||
}
|
||||
|
||||
return sdbDeleteHash(pTable, pOper);
|
||||
taosWriteQitem(tsSdbWriteQueue, TAOS_QTYPE_RPC, pNewOper);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t sdbUpdateRow(SSdbOper *pOper) {
|
||||
SSdbTable *pTable = (SSdbTable *)pOper->table;
|
||||
if (pTable == NULL) return -1;
|
||||
if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE;
|
||||
|
||||
SSdbRow *pMeta = sdbGetRowMetaFromObj(pTable, pOper->pObj);
|
||||
if (pMeta == NULL) {
|
||||
sdbTrace("table:%s, record is not there, delete failed", pTable->tableName);
|
||||
return -1;
|
||||
sdbTrace("table:%s, record is not there, update failed", pTable->tableName);
|
||||
return TSDB_CODE_MND_SDB_OBJ_NOT_THERE;
|
||||
}
|
||||
|
||||
void * pMetaRow = pMeta->row;
|
||||
assert(pMetaRow != NULL);
|
||||
void *pMetaRow = pMeta->row;
|
||||
if (pMetaRow == NULL) {
|
||||
sdbError("table:%s, record meta is null", pTable->tableName);
|
||||
return TSDB_CODE_MND_SDB_INVAID_META_ROW;
|
||||
}
|
||||
|
||||
if (pOper->type == SDB_OPER_GLOBAL) {
|
||||
int32_t size = sizeof(SWalHead) + pTable->maxRowSize;
|
||||
SWalHead *pHead = taosAllocateQitem(size);
|
||||
int32_t code = sdbUpdateHash(pTable, pOper);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
sdbError("table:%s, failed to update hash", pTable->tableName);
|
||||
return code;
|
||||
}
|
||||
|
||||
// just update data in memory
|
||||
if (pOper->type != SDB_OPER_GLOBAL) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t size = sizeof(SSdbOper) + sizeof(SWalHead) + pTable->maxRowSize + SDB_SYNC_HACK;
|
||||
SSdbOper *pNewOper = taosAllocateQitem(size);
|
||||
|
||||
SWalHead *pHead = (void *)pNewOper + sizeof(SSdbOper) + SDB_SYNC_HACK;
|
||||
pHead->version = 0;
|
||||
pHead->msgType = pTable->tableId * 10 + SDB_ACTION_UPDATE;
|
||||
|
||||
|
@ -673,12 +750,14 @@ int32_t sdbUpdateRow(SSdbOper *pOper) {
|
|||
(*pTable->encodeFp)(pOper);
|
||||
pHead->len = pOper->rowSize;
|
||||
|
||||
int32_t code = sdbWrite(pOper, pHead, pHead->msgType);
|
||||
taosFreeQitem(pHead);
|
||||
if (code < 0) return code;
|
||||
memcpy(pNewOper, pOper, sizeof(SSdbOper));
|
||||
|
||||
if (pNewOper->pMsg != NULL) {
|
||||
sdbTrace("app:%p:%p, update action is add to sdb queue", pNewOper->pMsg->rpcMsg.ahandle, pNewOper->pMsg);
|
||||
}
|
||||
|
||||
return sdbUpdateHash(pTable, pOper);
|
||||
taosWriteQitem(tsSdbWriteQueue, TAOS_QTYPE_RPC, pNewOper);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
void *sdbFetchRow(void *handle, void *pNode, void **ppRow) {
|
||||
|
@ -775,3 +854,158 @@ void sdbCloseTable(void *handle) {
|
|||
free(pTable);
|
||||
}
|
||||
|
||||
int32_t sdbInitWriteWorker() {
|
||||
tsSdbPool.num = 1;
|
||||
tsSdbPool.writeWorker = (SSdbWriteWorker *)calloc(sizeof(SSdbWriteWorker), tsSdbPool.num);
|
||||
|
||||
if (tsSdbPool.writeWorker == NULL) return -1;
|
||||
for (int32_t i = 0; i < tsSdbPool.num; ++i) {
|
||||
SSdbWriteWorker *pWorker = tsSdbPool.writeWorker + i;
|
||||
pWorker->workerId = i;
|
||||
}
|
||||
|
||||
sdbAllocWriteQueue();
|
||||
|
||||
mPrint("sdb write is opened");
|
||||
return 0;
|
||||
}
|
||||
|
||||
void sdbCleanupWriteWorker() {
|
||||
for (int32_t i = 0; i < tsSdbPool.num; ++i) {
|
||||
SSdbWriteWorker *pWorker = tsSdbPool.writeWorker + i;
|
||||
if (pWorker->thread) {
|
||||
taosQsetThreadResume(tsSdbWriteQset);
|
||||
}
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < tsSdbPool.num; ++i) {
|
||||
SSdbWriteWorker *pWorker = tsSdbPool.writeWorker + i;
|
||||
if (pWorker->thread) {
|
||||
pthread_join(pWorker->thread, NULL);
|
||||
}
|
||||
}
|
||||
|
||||
sdbFreeWritequeue();
|
||||
|
||||
mPrint("sdb write is closed");
|
||||
}
|
||||
|
||||
int32_t sdbAllocWriteQueue() {
|
||||
tsSdbWriteQueue = taosOpenQueue();
|
||||
if (tsSdbWriteQueue == NULL) return TSDB_CODE_MND_OUT_OF_MEMORY;
|
||||
|
||||
tsSdbWriteQset = taosOpenQset();
|
||||
if (tsSdbWriteQset == NULL) {
|
||||
taosCloseQueue(tsSdbWriteQueue);
|
||||
return TSDB_CODE_MND_OUT_OF_MEMORY;
|
||||
}
|
||||
taosAddIntoQset(tsSdbWriteQset, tsSdbWriteQueue, NULL);
|
||||
|
||||
tsSdbWriteQall = taosAllocateQall();
|
||||
if (tsSdbWriteQall == NULL) {
|
||||
taosCloseQset(tsSdbWriteQset);
|
||||
taosCloseQueue(tsSdbWriteQueue);
|
||||
return TSDB_CODE_MND_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < tsSdbPool.num; ++i) {
|
||||
SSdbWriteWorker *pWorker = tsSdbPool.writeWorker + i;
|
||||
pWorker->workerId = i;
|
||||
|
||||
pthread_attr_t thAttr;
|
||||
pthread_attr_init(&thAttr);
|
||||
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
|
||||
|
||||
if (pthread_create(&pWorker->thread, &thAttr, sdbWorkerFp, pWorker) != 0) {
|
||||
mError("failed to create thread to process sdb write queue, reason:%s", strerror(errno));
|
||||
taosFreeQall(tsSdbWriteQall);
|
||||
taosCloseQset(tsSdbWriteQset);
|
||||
taosCloseQueue(tsSdbWriteQueue);
|
||||
return TSDB_CODE_MND_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
pthread_attr_destroy(&thAttr);
|
||||
mTrace("sdb write worker:%d is launched, total:%d", pWorker->workerId, tsSdbPool.num);
|
||||
}
|
||||
|
||||
mTrace("sdb write queue:%p is allocated", tsSdbWriteQueue);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
void sdbFreeWritequeue() {
|
||||
taosCloseQset(tsSdbWriteQueue);
|
||||
taosFreeQall(tsSdbWriteQall);
|
||||
taosCloseQset(tsSdbWriteQset);
|
||||
tsSdbWriteQall = NULL;
|
||||
tsSdbWriteQset = NULL;
|
||||
tsSdbWriteQueue = NULL;
|
||||
}
|
||||
|
||||
int sdbWriteToQueue(void *param, void *data, int type) {
|
||||
SWalHead *pHead = data;
|
||||
int size = sizeof(SWalHead) + pHead->len;
|
||||
SWalHead *pWal = (SWalHead *)taosAllocateQitem(size);
|
||||
memcpy(pWal, pHead, size);
|
||||
|
||||
taosWriteQitem(tsSdbWriteQueue, type, pWal);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void *sdbWorkerFp(void *param) {
|
||||
SWalHead *pHead;
|
||||
SSdbOper *pOper;
|
||||
int32_t type;
|
||||
int32_t numOfMsgs;
|
||||
void * item;
|
||||
void * unUsed;
|
||||
|
||||
while (1) {
|
||||
numOfMsgs = taosReadAllQitemsFromQset(tsSdbWriteQset, tsSdbWriteQall, &unUsed);
|
||||
if (numOfMsgs == 0) {
|
||||
sdbTrace("sdbWorkerFp: got no message from qset, exiting...");
|
||||
break;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < numOfMsgs; ++i) {
|
||||
taosGetQitem(tsSdbWriteQall, &type, &item);
|
||||
if (type == TAOS_QTYPE_RPC) {
|
||||
pOper = (SSdbOper *)item;
|
||||
pHead = (void *)pOper + sizeof(SSdbOper) + SDB_SYNC_HACK;
|
||||
} else {
|
||||
pHead = (SWalHead *)item;
|
||||
pOper = NULL;
|
||||
}
|
||||
|
||||
if (pOper != NULL && pOper->pMsg != NULL) {
|
||||
sdbTrace("app:%p:%p, will be processed in sdb queue", pOper->pMsg->rpcMsg.ahandle, pOper->pMsg);
|
||||
}
|
||||
|
||||
int32_t code = sdbWrite(pOper, pHead, type);
|
||||
if (pOper) pOper->retCode = code;
|
||||
}
|
||||
|
||||
walFsync(tsSdbObj.wal);
|
||||
|
||||
// browse all items, and process them one by one
|
||||
taosResetQitems(tsSdbWriteQall);
|
||||
for (int32_t i = 0; i < numOfMsgs; ++i) {
|
||||
taosGetQitem(tsSdbWriteQall, &type, &item);
|
||||
if (type == TAOS_QTYPE_RPC) {
|
||||
pOper = (SSdbOper *)item;
|
||||
if (pOper != NULL && pOper->cb != NULL) {
|
||||
pOper->retCode = (*pOper->cb)(pOper->pMsg, pOper->retCode);
|
||||
}
|
||||
|
||||
if (pOper != NULL && pOper->pMsg != NULL) {
|
||||
sdbTrace("app:%p:%p, msg is processed, result:%s", pOper->pMsg->rpcMsg.ahandle, pOper->pMsg,
|
||||
tstrerror(pOper->retCode));
|
||||
}
|
||||
|
||||
dnodeSendRpcMnodeWriteRsp(pOper->pMsg, pOper->retCode);
|
||||
}
|
||||
taosFreeQitem(item);
|
||||
}
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
|
|
@ -669,29 +669,32 @@ static int32_t mnodeProcessCreateTableMsg(SMnodeMsg *pMsg) {
|
|||
|
||||
if (pMsg->pDb == NULL) pMsg->pDb = mnodeGetDb(pCreate->db);
|
||||
if (pMsg->pDb == NULL || pMsg->pDb->status != TSDB_DB_STATUS_READY) {
|
||||
mError("table:%s, failed to create, db not selected", pCreate->tableId);
|
||||
mError("app:%p:%p, table:%s, failed to create, db not selected", pMsg->rpcMsg.ahandle, pMsg, pCreate->tableId);
|
||||
return TSDB_CODE_MND_DB_NOT_SELECTED;
|
||||
}
|
||||
|
||||
if (pMsg->pTable == NULL) pMsg->pTable = mnodeGetTable(pCreate->tableId);
|
||||
if (pMsg->pTable != NULL && pMsg->retry == 0) {
|
||||
if (pCreate->getMeta) {
|
||||
mTrace("table:%s, continue to get meta", pCreate->tableId);
|
||||
mTrace("app:%p:%p, table:%s, continue to get meta", pMsg->rpcMsg.ahandle, pMsg, pCreate->tableId);
|
||||
return mnodeGetChildTableMeta(pMsg);
|
||||
} else if (pCreate->igExists) {
|
||||
mTrace("table:%s, is already exist", pCreate->tableId);
|
||||
mTrace("app:%p:%p, table:%s, is already exist", pMsg->rpcMsg.ahandle, pMsg, pCreate->tableId);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
} else {
|
||||
mError("table:%s, failed to create, table already exist", pCreate->tableId);
|
||||
mError("app:%p:%p, table:%s, failed to create, table already exist", pMsg->rpcMsg.ahandle, pMsg,
|
||||
pCreate->tableId);
|
||||
return TSDB_CODE_MND_TABLE_ALREADY_EXIST;
|
||||
}
|
||||
}
|
||||
|
||||
if (pCreate->numOfTags != 0) {
|
||||
mTrace("table:%s, create stable msg is received from thandle:%p", pCreate->tableId, pMsg->rpcMsg.handle);
|
||||
mTrace("app:%p:%p, table:%s, create stable msg is received from thandle:%p", pMsg->rpcMsg.ahandle, pMsg,
|
||||
pCreate->tableId, pMsg->rpcMsg.handle);
|
||||
return mnodeProcessCreateSuperTableMsg(pMsg);
|
||||
} else {
|
||||
mTrace("table:%s, create ctable msg is received from thandle:%p", pCreate->tableId, pMsg->rpcMsg.handle);
|
||||
mTrace("app:%p:%p, table:%s, create ctable msg is received from thandle:%p", pMsg->rpcMsg.ahandle, pMsg,
|
||||
pCreate->tableId, pMsg->rpcMsg.handle);
|
||||
return mnodeProcessCreateChildTableMsg(pMsg);
|
||||
}
|
||||
}
|
||||
|
@ -700,31 +703,32 @@ static int32_t mnodeProcessDropTableMsg(SMnodeMsg *pMsg) {
|
|||
SCMDropTableMsg *pDrop = pMsg->rpcMsg.pCont;
|
||||
if (pMsg->pDb == NULL) pMsg->pDb = mnodeGetDbByTableId(pDrop->tableId);
|
||||
if (pMsg->pDb == NULL || pMsg->pDb->status != TSDB_DB_STATUS_READY) {
|
||||
mError("table:%s, failed to drop table, db not selected", pDrop->tableId);
|
||||
mError("app:%p:%p, table:%s, failed to drop table, db not selected", pMsg->rpcMsg.ahandle, pMsg, pDrop->tableId);
|
||||
return TSDB_CODE_MND_DB_NOT_SELECTED;
|
||||
}
|
||||
|
||||
if (mnodeCheckIsMonitorDB(pMsg->pDb->name, tsMonitorDbName)) {
|
||||
mError("table:%s, failed to drop table, in monitor database", pDrop->tableId);
|
||||
mError("app:%p:%p, table:%s, failed to drop table, in monitor database", pMsg->rpcMsg.ahandle, pMsg,
|
||||
pDrop->tableId);
|
||||
return TSDB_CODE_MND_MONITOR_DB_FORBIDDEN;
|
||||
}
|
||||
|
||||
if (pMsg->pTable == NULL) pMsg->pTable = mnodeGetTable(pDrop->tableId);
|
||||
if (pMsg->pTable == NULL) {
|
||||
if (pDrop->igNotExists) {
|
||||
mTrace("table:%s, table is not exist, think drop success", pDrop->tableId);
|
||||
mTrace("app:%p:%p, table:%s, table is not exist, think drop success", pMsg->rpcMsg.ahandle, pMsg, pDrop->tableId);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
} else {
|
||||
mError("table:%s, failed to drop table, table not exist", pDrop->tableId);
|
||||
mError("app:%p:%p, table:%s, failed to drop table, table not exist", pMsg->rpcMsg.ahandle, pMsg, pDrop->tableId);
|
||||
return TSDB_CODE_MND_INVALID_TABLE_NAME;
|
||||
}
|
||||
}
|
||||
|
||||
if (pMsg->pTable->type == TSDB_SUPER_TABLE) {
|
||||
mPrint("table:%s, start to drop stable", pDrop->tableId);
|
||||
mPrint("app:%p:%p, table:%s, start to drop stable", pMsg->rpcMsg.ahandle, pMsg, pDrop->tableId);
|
||||
return mnodeProcessDropSuperTableMsg(pMsg);
|
||||
} else {
|
||||
mPrint("table:%s, start to drop ctable", pDrop->tableId);
|
||||
mPrint("app:%p:%p, table:%s, start to drop ctable", pMsg->rpcMsg.ahandle, pMsg, pDrop->tableId);
|
||||
return mnodeProcessDropChildTableMsg(pMsg);
|
||||
}
|
||||
}
|
||||
|
@ -732,21 +736,25 @@ static int32_t mnodeProcessDropTableMsg(SMnodeMsg *pMsg) {
|
|||
static int32_t mnodeProcessTableMetaMsg(SMnodeMsg *pMsg) {
|
||||
SCMTableInfoMsg *pInfo = pMsg->rpcMsg.pCont;
|
||||
pInfo->createFlag = htons(pInfo->createFlag);
|
||||
mTrace("table:%s, table meta msg is received from thandle:%p, createFlag:%d", pInfo->tableId, pMsg->rpcMsg.handle, pInfo->createFlag);
|
||||
mTrace("app:%p:%p, table:%s, table meta msg is received from thandle:%p, createFlag:%d", pMsg->rpcMsg.ahandle, pMsg,
|
||||
pInfo->tableId, pMsg->rpcMsg.handle, pInfo->createFlag);
|
||||
|
||||
if (pMsg->pDb == NULL) pMsg->pDb = mnodeGetDbByTableId(pInfo->tableId);
|
||||
if (pMsg->pDb == NULL || pMsg->pDb->status != TSDB_DB_STATUS_READY) {
|
||||
mError("table:%s, failed to get table meta, db not selected", pInfo->tableId);
|
||||
mError("app:%p:%p, table:%s, failed to get table meta, db not selected", pMsg->rpcMsg.ahandle, pMsg,
|
||||
pInfo->tableId);
|
||||
return TSDB_CODE_MND_DB_NOT_SELECTED;
|
||||
}
|
||||
|
||||
if (pMsg->pTable == NULL) pMsg->pTable = mnodeGetTable(pInfo->tableId);
|
||||
if (pMsg->pTable == NULL) {
|
||||
if (!pInfo->createFlag) {
|
||||
mError("table:%s, failed to get table meta, table not exist", pInfo->tableId);
|
||||
mError("app:%p:%p, table:%s, failed to get table meta, table not exist", pMsg->rpcMsg.ahandle, pMsg,
|
||||
pInfo->tableId);
|
||||
return TSDB_CODE_MND_INVALID_TABLE_NAME;
|
||||
} else {
|
||||
mTrace("table:%s, failed to get table meta, start auto create table ", pInfo->tableId);
|
||||
mTrace("app:%p:%p, table:%s, failed to get table meta, start auto create table ", pMsg->rpcMsg.ahandle, pMsg,
|
||||
pInfo->tableId);
|
||||
return mnodeAutoCreateChildTable(pMsg);
|
||||
}
|
||||
} else {
|
||||
|
@ -760,9 +768,9 @@ static int32_t mnodeProcessTableMetaMsg(SMnodeMsg *pMsg) {
|
|||
|
||||
static int32_t mnodeProcessCreateSuperTableMsg(SMnodeMsg *pMsg) {
|
||||
SCMCreateTableMsg *pCreate = pMsg->rpcMsg.pCont;
|
||||
SSuperTableObj *pStable = calloc(1, sizeof(SSuperTableObj));
|
||||
SSuperTableObj * pStable = calloc(1, sizeof(SSuperTableObj));
|
||||
if (pStable == NULL) {
|
||||
mError("table:%s, failed to create, no enough memory", pCreate->tableId);
|
||||
mError("app:%p:%p, table:%s, failed to create, no enough memory", pMsg->rpcMsg.ahandle, pMsg, pCreate->tableId);
|
||||
return TSDB_CODE_MND_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
|
@ -780,7 +788,7 @@ static int32_t mnodeProcessCreateSuperTableMsg(SMnodeMsg *pMsg) {
|
|||
pStable->schema = (SSchema *)calloc(1, schemaSize);
|
||||
if (pStable->schema == NULL) {
|
||||
free(pStable);
|
||||
mError("table:%s, failed to create, no schema input", pCreate->tableId);
|
||||
mError("app:%p:%p, table:%s, failed to create, no schema input", pMsg->rpcMsg.ahandle, pMsg, pCreate->tableId);
|
||||
return TSDB_CODE_MND_INVALID_TABLE_NAME;
|
||||
}
|
||||
memcpy(pStable->schema, pCreate->schema, numOfCols * sizeof(SSchema));
|
||||
|
@ -799,18 +807,21 @@ static int32_t mnodeProcessCreateSuperTableMsg(SMnodeMsg *pMsg) {
|
|||
.type = SDB_OPER_GLOBAL,
|
||||
.table = tsSuperTableSdb,
|
||||
.pObj = pStable,
|
||||
.rowSize = sizeof(SSuperTableObj) + schemaSize
|
||||
.rowSize = sizeof(SSuperTableObj) + schemaSize,
|
||||
.pMsg = pMsg
|
||||
};
|
||||
|
||||
int32_t code = sdbInsertRow(&oper);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
mnodeDestroySuperTable(pStable);
|
||||
mError("table:%s, failed to create, sdb error", pCreate->tableId);
|
||||
return TSDB_CODE_MND_SDB_ERROR;
|
||||
mError("app:%p:%p, table:%s, failed to create, sdb error", pMsg->rpcMsg.ahandle, pMsg, pCreate->tableId);
|
||||
} else {
|
||||
mLPrint("table:%s, is created, tags:%d fields:%d", pStable->info.tableId, pStable->numOfTags, pStable->numOfColumns);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
mLPrint("app:%p:%p, table:%s, is created, tags:%d fields:%d", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId,
|
||||
pStable->numOfTags, pStable->numOfColumns);
|
||||
if (pMsg != NULL) code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t mnodeProcessDropSuperTableMsg(SMnodeMsg *pMsg) {
|
||||
|
@ -828,7 +839,8 @@ static int32_t mnodeProcessDropSuperTableMsg(SMnodeMsg *pMsg) {
|
|||
pDrop->uid = htobe64(pStable->uid);
|
||||
mnodeExtractTableName(pStable->info.tableId, pDrop->tableId);
|
||||
|
||||
mPrint("stable:%s, send drop stable msg to vgId:%d", pStable->info.tableId, pVgroup->vgId);
|
||||
mPrint("app:%p:%p, stable:%s, send drop stable msg to vgId:%d", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId,
|
||||
pVgroup->vgId);
|
||||
SRpcIpSet ipSet = mnodeGetIpSetFromVgroup(pVgroup);
|
||||
SRpcMsg rpcMsg = {.pCont = pDrop, .contLen = sizeof(SMDDropSTableMsg), .msgType = TSDB_MSG_TYPE_MD_DROP_STABLE};
|
||||
dnodeSendMsgToDnode(&ipSet, &rpcMsg);
|
||||
|
@ -842,11 +854,16 @@ static int32_t mnodeProcessDropSuperTableMsg(SMnodeMsg *pMsg) {
|
|||
SSdbOper oper = {
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.table = tsSuperTableSdb,
|
||||
.pObj = pStable
|
||||
.pObj = pStable,
|
||||
.pMsg = pMsg
|
||||
};
|
||||
|
||||
int32_t code = sdbDeleteRow(&oper);
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
mLPrint("stable:%s, is dropped from sdb, result:%s", pStable->info.tableId, tstrerror(code));
|
||||
if (pMsg != NULL) code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -861,20 +878,22 @@ static int32_t mnodeFindSuperTableTagIndex(SSuperTableObj *pStable, const char *
|
|||
return -1;
|
||||
}
|
||||
|
||||
static int32_t mnodeAddSuperTableTag(SSuperTableObj *pStable, SSchema schema[], int32_t ntags) {
|
||||
static int32_t mnodeAddSuperTableTag(SMnodeMsg *pMsg, SSuperTableObj *pStable, SSchema schema[], int32_t ntags) {
|
||||
if (pStable->numOfTags + ntags > TSDB_MAX_TAGS) {
|
||||
mError("stable:%s, add tag, too many tags", pStable->info.tableId);
|
||||
mError("app:%p:%p, stable:%s, add tag, too many tags", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId);
|
||||
return TSDB_CODE_MND_TOO_MANY_TAGS;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < ntags; i++) {
|
||||
if (mnodeFindSuperTableColumnIndex(pStable, schema[i].name) > 0) {
|
||||
mError("stable:%s, add tag, column:%s already exist", pStable->info.tableId, schema[i].name);
|
||||
mError("app:%p:%p, stable:%s, add tag, column:%s already exist", pMsg->rpcMsg.ahandle, pMsg,
|
||||
pStable->info.tableId, schema[i].name);
|
||||
return TSDB_CODE_MND_TAG_ALREAY_EXIST;
|
||||
}
|
||||
|
||||
if (mnodeFindSuperTableTagIndex(pStable, schema[i].name) > 0) {
|
||||
mError("stable:%s, add tag, tag:%s already exist", pStable->info.tableId, schema[i].name);
|
||||
mError("app:%p:%p, stable:%s, add tag, tag:%s already exist", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId,
|
||||
schema[i].name);
|
||||
return TSDB_CODE_MND_FIELD_ALREAY_EXIST;
|
||||
}
|
||||
}
|
||||
|
@ -895,22 +914,25 @@ static int32_t mnodeAddSuperTableTag(SSuperTableObj *pStable, SSchema schema[],
|
|||
SSdbOper oper = {
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.table = tsSuperTableSdb,
|
||||
.pObj = pStable
|
||||
.pObj = pStable,
|
||||
.pMsg = pMsg
|
||||
};
|
||||
|
||||
int32_t code = sdbUpdateRow(&oper);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return TSDB_CODE_MND_SDB_ERROR;
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
mPrint("app:%p:%p, stable %s, succeed to add tag %s", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId,
|
||||
schema[0].name);
|
||||
code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
||||
}
|
||||
|
||||
mPrint("stable %s, succeed to add tag %s", pStable->info.tableId, schema[0].name);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t mnodeDropSuperTableTag(SSuperTableObj *pStable, char *tagName) {
|
||||
static int32_t mnodeDropSuperTableTag(SMnodeMsg *pMsg, SSuperTableObj *pStable, char *tagName) {
|
||||
int32_t col = mnodeFindSuperTableTagIndex(pStable, tagName);
|
||||
if (col < 0) {
|
||||
mError("stable:%s, drop tag, tag:%s not exist", pStable->info.tableId, tagName);
|
||||
mError("app:%p:%p, stable:%s, drop tag, tag:%s not exist", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId,
|
||||
tagName);
|
||||
return TSDB_CODE_MND_TAG_NOT_EXIST;
|
||||
}
|
||||
|
||||
|
@ -922,22 +944,25 @@ static int32_t mnodeDropSuperTableTag(SSuperTableObj *pStable, char *tagName) {
|
|||
SSdbOper oper = {
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.table = tsSuperTableSdb,
|
||||
.pObj = pStable
|
||||
.pObj = pStable,
|
||||
.pMsg = pMsg
|
||||
};
|
||||
|
||||
int32_t code = sdbUpdateRow(&oper);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return TSDB_CODE_MND_SDB_ERROR;
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
mPrint("app:%p:%p, stable %s, succeed to drop tag %s", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId, tagName);
|
||||
code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
||||
}
|
||||
|
||||
mPrint("stable %s, succeed to drop tag %s", pStable->info.tableId, tagName);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t mnodeModifySuperTableTagName(SSuperTableObj *pStable, char *oldTagName, char *newTagName) {
|
||||
static int32_t mnodeModifySuperTableTagName(SMnodeMsg *pMsg, SSuperTableObj *pStable, char *oldTagName,
|
||||
char *newTagName) {
|
||||
int32_t col = mnodeFindSuperTableTagIndex(pStable, oldTagName);
|
||||
if (col < 0) {
|
||||
mError("stable:%s, failed to modify table tag, oldName: %s, newName: %s", pStable->info.tableId, oldTagName, newTagName);
|
||||
mError("app:%p:%p, stable:%s, failed to modify table tag, oldName: %s, newName: %s", pMsg->rpcMsg.ahandle, pMsg,
|
||||
pStable->info.tableId, oldTagName, newTagName);
|
||||
return TSDB_CODE_MND_TAG_NOT_EXIST;
|
||||
}
|
||||
|
||||
|
@ -958,16 +983,18 @@ static int32_t mnodeModifySuperTableTagName(SSuperTableObj *pStable, char *oldTa
|
|||
SSdbOper oper = {
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.table = tsSuperTableSdb,
|
||||
.pObj = pStable
|
||||
.pObj = pStable,
|
||||
.pMsg = pMsg
|
||||
};
|
||||
|
||||
int32_t code = sdbUpdateRow(&oper);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return TSDB_CODE_MND_SDB_ERROR;
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
mPrint("app:%p:%p, stable %s, succeed to modify tag %s to %s", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId,
|
||||
oldTagName, newTagName);
|
||||
code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
||||
}
|
||||
|
||||
mPrint("stable %s, succeed to modify tag %s to %s", pStable->info.tableId, oldTagName, newTagName);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t mnodeFindSuperTableColumnIndex(SSuperTableObj *pStable, char *colName) {
|
||||
|
@ -981,20 +1008,23 @@ static int32_t mnodeFindSuperTableColumnIndex(SSuperTableObj *pStable, char *col
|
|||
return -1;
|
||||
}
|
||||
|
||||
static int32_t mnodeAddSuperTableColumn(SDbObj *pDb, SSuperTableObj *pStable, SSchema schema[], int32_t ncols) {
|
||||
static int32_t mnodeAddSuperTableColumn(SMnodeMsg *pMsg, SSuperTableObj *pStable, SSchema schema[], int32_t ncols) {
|
||||
SDbObj *pDb = pMsg->pDb;
|
||||
if (ncols <= 0) {
|
||||
mError("stable:%s, add column, ncols:%d <= 0", pStable->info.tableId);
|
||||
mError("app:%p:%p, stable:%s, add column, ncols:%d <= 0", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId);
|
||||
return TSDB_CODE_MND_APP_ERROR;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < ncols; i++) {
|
||||
if (mnodeFindSuperTableColumnIndex(pStable, schema[i].name) > 0) {
|
||||
mError("stable:%s, add column, column:%s already exist", pStable->info.tableId, schema[i].name);
|
||||
mError("app:%p:%p, stable:%s, add column, column:%s already exist", pMsg->rpcMsg.ahandle, pMsg,
|
||||
pStable->info.tableId, schema[i].name);
|
||||
return TSDB_CODE_MND_FIELD_ALREAY_EXIST;
|
||||
}
|
||||
|
||||
if (mnodeFindSuperTableTagIndex(pStable, schema[i].name) > 0) {
|
||||
mError("stable:%s, add column, tag:%s already exist", pStable->info.tableId, schema[i].name);
|
||||
mError("app:%p:%p, stable:%s, add column, tag:%s already exist", pMsg->rpcMsg.ahandle, pMsg,
|
||||
pStable->info.tableId, schema[i].name);
|
||||
return TSDB_CODE_MND_TAG_ALREAY_EXIST;
|
||||
}
|
||||
}
|
||||
|
@ -1023,22 +1053,25 @@ static int32_t mnodeAddSuperTableColumn(SDbObj *pDb, SSuperTableObj *pStable, SS
|
|||
SSdbOper oper = {
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.table = tsSuperTableSdb,
|
||||
.pObj = pStable
|
||||
.pObj = pStable,
|
||||
.pMsg = pMsg
|
||||
};
|
||||
|
||||
int32_t code = sdbUpdateRow(&oper);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return TSDB_CODE_MND_SDB_ERROR;
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
mPrint("app:%p:%p, stable %s, succeed to add column", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId);
|
||||
code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
||||
}
|
||||
|
||||
mPrint("stable %s, succeed to add column", pStable->info.tableId);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t mnodeDropSuperTableColumn(SDbObj *pDb, SSuperTableObj *pStable, char *colName) {
|
||||
static int32_t mnodeDropSuperTableColumn(SMnodeMsg *pMsg, SSuperTableObj *pStable, char *colName) {
|
||||
SDbObj *pDb = pMsg->pDb;
|
||||
int32_t col = mnodeFindSuperTableColumnIndex(pStable, colName);
|
||||
if (col <= 0) {
|
||||
mError("stable:%s, drop column, column:%s not exist", pStable->info.tableId, colName);
|
||||
mError("app:%p:%p, stable:%s, drop column, column:%s not exist", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId,
|
||||
colName);
|
||||
return TSDB_CODE_MND_FIELD_NOT_EXIST;
|
||||
}
|
||||
|
||||
|
@ -1060,16 +1093,17 @@ static int32_t mnodeDropSuperTableColumn(SDbObj *pDb, SSuperTableObj *pStable, c
|
|||
SSdbOper oper = {
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.table = tsSuperTableSdb,
|
||||
.pObj = pStable
|
||||
.pObj = pStable,
|
||||
.pMsg = pMsg
|
||||
};
|
||||
|
||||
int32_t code = sdbUpdateRow(&oper);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return TSDB_CODE_MND_SDB_ERROR;
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
mPrint("app:%p:%p, stable %s, succeed to delete column", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId);
|
||||
code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
||||
}
|
||||
|
||||
mPrint("stable %s, succeed to delete column", pStable->info.tableId);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return code;
|
||||
}
|
||||
|
||||
// show super tables
|
||||
|
@ -1259,7 +1293,8 @@ static int32_t mnodeGetSuperTableMeta(SMnodeMsg *pMsg) {
|
|||
|
||||
pMsg->rpcRsp.rsp = pMeta;
|
||||
|
||||
mTrace("stable:%s, uid:%" PRIu64 " table meta is retrieved", pTable->info.tableId, pTable->uid);
|
||||
mTrace("app:%p:%p, stable:%s, uid:%" PRIu64 " table meta is retrieved", pMsg->rpcMsg.ahandle, pMsg,
|
||||
pTable->info.tableId, pTable->uid);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -1290,12 +1325,13 @@ static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *pMsg) {
|
|||
char * stableName = (char *)pInfo + sizeof(SCMSTableVgroupMsg) + (TSDB_TABLE_ID_LEN)*i;
|
||||
SSuperTableObj *pTable = mnodeGetSuperTable(stableName);
|
||||
if (pTable == NULL) {
|
||||
mError("stable:%s, not exist while get stable vgroup info", stableName);
|
||||
mError("app:%p:%p, stable:%s, not exist while get stable vgroup info", pMsg->rpcMsg.ahandle, pMsg, stableName);
|
||||
mnodeDecTableRef(pTable);
|
||||
continue;
|
||||
}
|
||||
if (pTable->vgHash == NULL) {
|
||||
mError("stable:%s, not vgroup exist while get stable vgroup info", stableName);
|
||||
mError("app:%p:%p, stable:%s, not vgroup exist while get stable vgroup info", pMsg->rpcMsg.ahandle, pMsg,
|
||||
stableName);
|
||||
mnodeDecTableRef(pTable);
|
||||
|
||||
// even this super table has no corresponding table, still return
|
||||
|
@ -1430,12 +1466,35 @@ static void *mnodeBuildCreateChildTableMsg(SCMCreateTableMsg *pMsg, SChildTableO
|
|||
return pCreate;
|
||||
}
|
||||
|
||||
static SChildTableObj* mnodeDoCreateChildTable(SCMCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t tid) {
|
||||
static int32_t mnodeDoCreateChildTableCb(SMnodeMsg *pMsg, int32_t code) {
|
||||
if (code != TSDB_CODE_SUCCESS) return code;
|
||||
|
||||
SCMCreateTableMsg *pCreate = pMsg->rpcMsg.pCont;
|
||||
SMDCreateTableMsg *pMDCreate = mnodeBuildCreateChildTableMsg(pCreate, (SChildTableObj *)pMsg->pTable);
|
||||
if (pMDCreate == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
SRpcIpSet ipSet = mnodeGetIpSetFromVgroup(pMsg->pVgroup);
|
||||
SRpcMsg rpcMsg = {
|
||||
.handle = pMsg,
|
||||
.pCont = pMDCreate,
|
||||
.contLen = htonl(pMDCreate->contLen),
|
||||
.code = 0,
|
||||
.msgType = TSDB_MSG_TYPE_MD_CREATE_TABLE
|
||||
};
|
||||
|
||||
dnodeSendMsgToDnode(&ipSet, &rpcMsg);
|
||||
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeDoCreateChildTable(SMnodeMsg *pMsg, int32_t tid) {
|
||||
SVgObj *pVgroup = pMsg->pVgroup;
|
||||
SCMCreateTableMsg *pCreate = pMsg->rpcMsg.pCont;
|
||||
SChildTableObj *pTable = calloc(1, sizeof(SChildTableObj));
|
||||
if (pTable == NULL) {
|
||||
mError("table:%s, failed to alloc memory", pCreate->tableId);
|
||||
terrno = TSDB_CODE_MND_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
mError("app:%p:%p, table:%s, failed to alloc memory", pMsg->rpcMsg.ahandle, pMsg, pCreate->tableId);
|
||||
return TSDB_CODE_MND_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
if (pCreate->numOfColumns == 0) {
|
||||
|
@ -1453,10 +1512,10 @@ static SChildTableObj* mnodeDoCreateChildTable(SCMCreateTableMsg *pCreate, SVgOb
|
|||
STagData *pTagData = (STagData *) pCreate->schema; // it is a tag key
|
||||
SSuperTableObj *pSuperTable = mnodeGetSuperTable(pTagData->name);
|
||||
if (pSuperTable == NULL) {
|
||||
mError("table:%s, corresponding super table:%s does not exist", pCreate->tableId, pTagData->name);
|
||||
mError("app:%p:%p, table:%s, corresponding super table:%s does not exist", pMsg->rpcMsg.ahandle, pMsg,
|
||||
pCreate->tableId, pTagData->name);
|
||||
mnodeDestroyChildTable(pTable);
|
||||
terrno = TSDB_CODE_MND_INVALID_TABLE_NAME;
|
||||
return NULL;
|
||||
return TSDB_CODE_MND_INVALID_TABLE_NAME;
|
||||
}
|
||||
mnodeDecTableRef(pSuperTable);
|
||||
|
||||
|
@ -1475,8 +1534,7 @@ static SChildTableObj* mnodeDoCreateChildTable(SCMCreateTableMsg *pCreate, SVgOb
|
|||
pTable->schema = (SSchema *) calloc(1, schemaSize);
|
||||
if (pTable->schema == NULL) {
|
||||
free(pTable);
|
||||
terrno = TSDB_CODE_MND_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
return TSDB_CODE_MND_OUT_OF_MEMORY;
|
||||
}
|
||||
memcpy(pTable->schema, pCreate->schema, numOfCols * sizeof(SSchema));
|
||||
|
||||
|
@ -1492,42 +1550,51 @@ static SChildTableObj* mnodeDoCreateChildTable(SCMCreateTableMsg *pCreate, SVgOb
|
|||
pTable->sql = calloc(1, pTable->sqlLen);
|
||||
if (pTable->sql == NULL) {
|
||||
free(pTable);
|
||||
terrno = TSDB_CODE_MND_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
return TSDB_CODE_MND_OUT_OF_MEMORY;
|
||||
}
|
||||
memcpy(pTable->sql, (char *) (pCreate->schema) + numOfCols * sizeof(SSchema), pTable->sqlLen);
|
||||
pTable->sql[pTable->sqlLen - 1] = 0;
|
||||
mTrace("table:%s, stream sql len:%d sql:%s", pTable->info.tableId, pTable->sqlLen, pTable->sql);
|
||||
mTrace("app:%p:%p, table:%s, stream sql len:%d sql:%s", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId,
|
||||
pTable->sqlLen, pTable->sql);
|
||||
}
|
||||
}
|
||||
|
||||
pMsg->pTable = (STableObj *)pTable;
|
||||
mnodeIncTableRef(pMsg->pTable);
|
||||
|
||||
SSdbOper desc = {0};
|
||||
desc.type = SDB_OPER_GLOBAL;
|
||||
desc.pObj = pTable;
|
||||
desc.table = tsChildTableSdb;
|
||||
desc.pMsg = pMsg;
|
||||
desc.cb = mnodeDoCreateChildTableCb;
|
||||
|
||||
if (sdbInsertRow(&desc) != TSDB_CODE_SUCCESS) {
|
||||
int32_t code = sdbInsertRow(&desc);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
free(pTable);
|
||||
mError("table:%s, update sdb error", pCreate->tableId);
|
||||
terrno = TSDB_CODE_MND_SDB_ERROR;
|
||||
return NULL;
|
||||
mError("app:%p:%p, table:%s, update sdb error, reason:%s", pMsg->rpcMsg.ahandle, pMsg, pCreate->tableId,
|
||||
tstrerror(code));
|
||||
pMsg->pTable = NULL;
|
||||
return code;
|
||||
} else {
|
||||
mTrace("app:%p:%p, table:%s, create table in vgroup:%d, id:%d, uid:%" PRIu64, pMsg->rpcMsg.ahandle, pMsg,
|
||||
pTable->info.tableId, pVgroup->vgId, pTable->sid, pTable->uid);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
mTrace("table:%s, create table in vgroup:%d, id:%d, uid:%" PRIu64 , pTable->info.tableId, pVgroup->vgId, pTable->sid, pTable->uid);
|
||||
return pTable;
|
||||
}
|
||||
|
||||
static int32_t mnodeProcessCreateChildTableMsg(SMnodeMsg *pMsg) {
|
||||
SCMCreateTableMsg *pCreate = pMsg->rpcMsg.pCont;
|
||||
int32_t code = grantCheck(TSDB_GRANT_TIMESERIES);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
mError("table:%s, failed to create, grant timeseries failed", pCreate->tableId);
|
||||
mError("app:%p:%p, table:%s, failed to create, grant timeseries failed", pMsg->rpcMsg.ahandle, pMsg,
|
||||
pCreate->tableId);
|
||||
return code;
|
||||
}
|
||||
|
||||
SVgObj *pVgroup = mnodeGetAvailableVgroup(pMsg->pDb);
|
||||
if (pVgroup == NULL) {
|
||||
mTrace("table:%s, start to create a new vgroup", pCreate->tableId);
|
||||
mTrace("app:%p:%p, table:%s, start to create a new vgroup", pMsg->rpcMsg.ahandle, pMsg, pCreate->tableId);
|
||||
return mnodeCreateVgroup(pMsg, pMsg->pDb);
|
||||
}
|
||||
|
||||
|
@ -1535,55 +1602,53 @@ static int32_t mnodeProcessCreateChildTableMsg(SMnodeMsg *pMsg) {
|
|||
if (pMsg->pTable == NULL) {
|
||||
int32_t sid = taosAllocateId(pVgroup->idPool);
|
||||
if (sid <= 0) {
|
||||
mTrace("tables:%s, no enough sid in vgId:%d", pCreate->tableId, pVgroup->vgId);
|
||||
mTrace("app:%p:%p, table:%s, no enough sid in vgId:%d", pMsg->rpcMsg.ahandle, pMsg, pCreate->tableId,
|
||||
pVgroup->vgId);
|
||||
return mnodeCreateVgroup(pMsg, pMsg->pDb);
|
||||
}
|
||||
|
||||
pMsg->pTable = (STableObj *)mnodeDoCreateChildTable(pCreate, pVgroup, sid);
|
||||
if (pMsg->pTable == NULL) {
|
||||
return terrno;
|
||||
if (pMsg->pVgroup == NULL) {
|
||||
pMsg->pVgroup = pVgroup;
|
||||
mnodeIncVgroupRef(pVgroup);
|
||||
}
|
||||
|
||||
mnodeIncTableRef(pMsg->pTable);
|
||||
mTrace("app:%p:%p, table:%s, create table in vgroup, vgId:%d sid:%d", pMsg->rpcMsg.ahandle, pMsg, pCreate->tableId,
|
||||
pVgroup->vgId, sid);
|
||||
|
||||
code = mnodeDoCreateChildTable(pMsg, sid);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
} else {
|
||||
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if (pMsg->pTable == NULL) pMsg->pTable = mnodeGetTable(pCreate->tableId);
|
||||
}
|
||||
|
||||
if (pMsg->pTable == NULL) {
|
||||
mError("app:%p:%p, table:%s, object not found, retry:%d reason:%s", pMsg->rpcMsg.ahandle, pMsg, pCreate->tableId,
|
||||
tstrerror(terrno));
|
||||
return terrno;
|
||||
} else {
|
||||
mTrace("app:%p:%p, table:%s, send create msg to vnode again", pMsg->rpcMsg.ahandle, pMsg, pCreate->tableId);
|
||||
return mnodeDoCreateChildTableCb(pMsg, TSDB_CODE_SUCCESS);
|
||||
}
|
||||
|
||||
SMDCreateTableMsg *pMDCreate = mnodeBuildCreateChildTableMsg(pCreate, (SChildTableObj *)pMsg->pTable);
|
||||
if (pMDCreate == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
SRpcIpSet ipSet = mnodeGetIpSetFromVgroup(pVgroup);
|
||||
SRpcMsg rpcMsg = {
|
||||
.handle = pMsg,
|
||||
.pCont = pMDCreate,
|
||||
.contLen = htonl(pMDCreate->contLen),
|
||||
.code = 0,
|
||||
.msgType = TSDB_MSG_TYPE_MD_CREATE_TABLE
|
||||
};
|
||||
|
||||
dnodeSendMsgToDnode(&ipSet, &rpcMsg);
|
||||
|
||||
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
||||
}
|
||||
|
||||
static int32_t mnodeProcessDropChildTableMsg(SMnodeMsg *pMsg) {
|
||||
SChildTableObj *pTable = (SChildTableObj *)pMsg->pTable;
|
||||
if (pMsg->pVgroup == NULL) pMsg->pVgroup = mnodeGetVgroup(pTable->vgId);
|
||||
if (pMsg->pVgroup == NULL) {
|
||||
mError("table:%s, failed to drop ctable, vgroup not exist", pTable->info.tableId);
|
||||
mError("app:%p:%p, table:%s, failed to drop ctable, vgroup not exist", pMsg->rpcMsg.ahandle, pMsg,
|
||||
pTable->info.tableId);
|
||||
return TSDB_CODE_MND_APP_ERROR;
|
||||
}
|
||||
|
||||
SMDDropTableMsg *pDrop = rpcMallocCont(sizeof(SMDDropTableMsg));
|
||||
if (pDrop == NULL) {
|
||||
mError("table:%s, failed to drop ctable, no enough memory", pTable->info.tableId);
|
||||
mError("app:%p:%p, table:%s, failed to drop ctable, no enough memory", pMsg->rpcMsg.ahandle, pMsg,
|
||||
pTable->info.tableId);
|
||||
return TSDB_CODE_MND_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
|
@ -1595,7 +1660,7 @@ static int32_t mnodeProcessDropChildTableMsg(SMnodeMsg *pMsg) {
|
|||
|
||||
SRpcIpSet ipSet = mnodeGetIpSetFromVgroup(pMsg->pVgroup);
|
||||
|
||||
mPrint("table:%s, send drop ctable msg", pDrop->tableId);
|
||||
mPrint("app:%p:%p, table:%s, send drop ctable msg", pMsg->rpcMsg.ahandle, pMsg, pDrop->tableId);
|
||||
SRpcMsg rpcMsg = {
|
||||
.handle = pMsg,
|
||||
.pCont = pDrop,
|
||||
|
@ -1624,15 +1689,17 @@ static int32_t mnodeFindNormalTableColumnIndex(SChildTableObj *pTable, char *col
|
|||
return -1;
|
||||
}
|
||||
|
||||
static int32_t mnodeAddNormalTableColumn(SDbObj *pDb, SChildTableObj *pTable, SSchema schema[], int32_t ncols) {
|
||||
static int32_t mnodeAddNormalTableColumn(SMnodeMsg *pMsg, SChildTableObj *pTable, SSchema schema[], int32_t ncols) {
|
||||
SDbObj *pDb = pMsg->pDb;
|
||||
if (ncols <= 0) {
|
||||
mError("table:%s, add column, ncols:%d <= 0", pTable->info.tableId);
|
||||
mError("app:%p:%p, table:%s, add column, ncols:%d <= 0", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId);
|
||||
return TSDB_CODE_MND_APP_ERROR;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < ncols; i++) {
|
||||
if (mnodeFindNormalTableColumnIndex(pTable, schema[i].name) > 0) {
|
||||
mError("table:%s, add column, column:%s already exist", pTable->info.tableId, schema[i].name);
|
||||
mError("app:%p:%p, table:%s, add column, column:%s already exist", pMsg->rpcMsg.ahandle, pMsg,
|
||||
pTable->info.tableId, schema[i].name);
|
||||
return TSDB_CODE_MND_FIELD_ALREAY_EXIST;
|
||||
}
|
||||
}
|
||||
|
@ -1659,22 +1726,25 @@ static int32_t mnodeAddNormalTableColumn(SDbObj *pDb, SChildTableObj *pTable, SS
|
|||
SSdbOper oper = {
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.table = tsChildTableSdb,
|
||||
.pObj = pTable
|
||||
.pObj = pTable,
|
||||
.pMsg = pMsg
|
||||
};
|
||||
|
||||
int32_t code = sdbUpdateRow(&oper);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return TSDB_CODE_MND_SDB_ERROR;
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
mPrint("app:%p:%p, table %s, succeed to add column", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId);
|
||||
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
||||
}
|
||||
|
||||
mPrint("table %s, succeed to add column", pTable->info.tableId);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t mnodeDropNormalTableColumn(SDbObj *pDb, SChildTableObj *pTable, char *colName) {
|
||||
static int32_t mnodeDropNormalTableColumn(SMnodeMsg *pMsg, SChildTableObj *pTable, char *colName) {
|
||||
SDbObj *pDb = pMsg->pDb;
|
||||
int32_t col = mnodeFindNormalTableColumnIndex(pTable, colName);
|
||||
if (col <= 0) {
|
||||
mError("table:%s, drop column, column:%s not exist", pTable->info.tableId, colName);
|
||||
mError("app:%p:%p, table:%s, drop column, column:%s not exist", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId,
|
||||
colName);
|
||||
return TSDB_CODE_MND_FIELD_NOT_EXIST;
|
||||
}
|
||||
|
||||
|
@ -1691,16 +1761,17 @@ static int32_t mnodeDropNormalTableColumn(SDbObj *pDb, SChildTableObj *pTable, c
|
|||
SSdbOper oper = {
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.table = tsChildTableSdb,
|
||||
.pObj = pTable
|
||||
.pObj = pTable,
|
||||
.pMsg = pMsg
|
||||
};
|
||||
|
||||
int32_t code = sdbUpdateRow(&oper);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return TSDB_CODE_MND_SDB_ERROR;
|
||||
mPrint("app:%p:%p, table %s, succeed to drop column %s", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId, colName);
|
||||
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
||||
}
|
||||
|
||||
mPrint("table %s, succeed to drop column %s", pTable->info.tableId, colName);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t mnodeSetSchemaFromNormalTable(SSchema *pSchema, SChildTableObj *pTable) {
|
||||
|
@ -1742,7 +1813,8 @@ static int32_t mnodeDoGetChildTableMeta(SMnodeMsg *pMsg, STableMetaMsg *pMeta) {
|
|||
|
||||
if (pMsg->pVgroup == NULL) pMsg->pVgroup = mnodeGetVgroup(pTable->vgId);
|
||||
if (pMsg->pVgroup == NULL) {
|
||||
mError("table:%s, failed to get table meta, vgroup not exist", pTable->info.tableId);
|
||||
mError("app:%p:%p, table:%s, failed to get table meta, vgroup not exist", pMsg->rpcMsg.ahandle, pMsg,
|
||||
pTable->info.tableId);
|
||||
return TSDB_CODE_MND_VGROUP_NOT_EXIST;
|
||||
}
|
||||
|
||||
|
@ -1756,7 +1828,8 @@ static int32_t mnodeDoGetChildTableMeta(SMnodeMsg *pMsg, STableMetaMsg *pMeta) {
|
|||
}
|
||||
pMeta->vgroup.vgId = htonl(pMsg->pVgroup->vgId);
|
||||
|
||||
mTrace("table:%s, uid:%" PRIu64 " table meta is retrieved", pTable->info.tableId, pTable->uid);
|
||||
mTrace("app:%p:%p, table:%s, uid:%" PRIu64 " table meta is retrieved", pMsg->rpcMsg.ahandle, pMsg,
|
||||
pTable->info.tableId, pTable->uid);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -1768,7 +1841,8 @@ static int32_t mnodeAutoCreateChildTable(SMnodeMsg *pMsg) {
|
|||
int32_t contLen = sizeof(SCMCreateTableMsg) + offsetof(STagData, data) + ntohl(pTag->dataLen);
|
||||
SCMCreateTableMsg *pCreateMsg = rpcMallocCont(contLen);
|
||||
if (pCreateMsg == NULL) {
|
||||
mError("table:%s, failed to create table while get meta info, no enough memory", pInfo->tableId);
|
||||
mError("app:%p:%p, table:%s, failed to create table while get meta info, no enough memory", pMsg->rpcMsg.ahandle,
|
||||
pMsg, pInfo->tableId);
|
||||
return TSDB_CODE_MND_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
|
@ -1780,7 +1854,8 @@ static int32_t mnodeAutoCreateChildTable(SMnodeMsg *pMsg) {
|
|||
pCreateMsg->contLen = htonl(contLen);
|
||||
|
||||
memcpy(pCreateMsg->schema, pInfo->tags, contLen - sizeof(SCMCreateTableMsg));
|
||||
mTrace("table:%s, start to create on demand, stable:%s", pInfo->tableId, ((STagData *)(pCreateMsg->schema))->name);
|
||||
mTrace("app:%p:%p, table:%s, start to create on demand, stable:%s", pMsg->rpcMsg.ahandle, pMsg, pInfo->tableId,
|
||||
((STagData *)(pCreateMsg->schema))->name);
|
||||
|
||||
rpcFreeCont(pMsg->rpcMsg.pCont);
|
||||
pMsg->rpcMsg.msgType = TSDB_MSG_TYPE_CM_CREATE_TABLE;
|
||||
|
@ -1791,9 +1866,11 @@ static int32_t mnodeAutoCreateChildTable(SMnodeMsg *pMsg) {
|
|||
}
|
||||
|
||||
static int32_t mnodeGetChildTableMeta(SMnodeMsg *pMsg) {
|
||||
STableMetaMsg *pMeta = rpcMallocCont(sizeof(STableMetaMsg) + sizeof(SSchema) * (TSDB_MAX_TAGS + TSDB_MAX_COLUMNS + 16));
|
||||
STableMetaMsg *pMeta =
|
||||
rpcMallocCont(sizeof(STableMetaMsg) + sizeof(SSchema) * (TSDB_MAX_TAGS + TSDB_MAX_COLUMNS + 16));
|
||||
if (pMeta == NULL) {
|
||||
mError("table:%s, failed to get table meta, no enough memory", pMsg->pTable->tableId);
|
||||
mError("app:%p:%p, table:%s, failed to get table meta, no enough memory", pMsg->rpcMsg.ahandle, pMsg,
|
||||
pMsg->pTable->tableId);
|
||||
return TSDB_CODE_MND_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
|
@ -1908,11 +1985,13 @@ static int32_t mnodeProcessTableCfgMsg(SMnodeMsg *pMsg) {
|
|||
pCfg->dnodeId = htonl(pCfg->dnodeId);
|
||||
pCfg->vgId = htonl(pCfg->vgId);
|
||||
pCfg->sid = htonl(pCfg->sid);
|
||||
mTrace("dnode:%d, vgId:%d sid:%d, receive table config msg", pCfg->dnodeId, pCfg->vgId, pCfg->sid);
|
||||
mTrace("app:%p:%p, dnode:%d, vgId:%d sid:%d, receive table config msg", pMsg->rpcMsg.ahandle, pMsg, pCfg->dnodeId,
|
||||
pCfg->vgId, pCfg->sid);
|
||||
|
||||
SChildTableObj *pTable = mnodeGetTableByPos(pCfg->vgId, pCfg->sid);
|
||||
if (pTable == NULL) {
|
||||
mError("dnode:%d, vgId:%d sid:%d, table not found", pCfg->dnodeId, pCfg->vgId, pCfg->sid);
|
||||
mError("app:%p:%p, dnode:%d, vgId:%d sid:%d, table not found", pMsg->rpcMsg.ahandle, pMsg, pCfg->dnodeId,
|
||||
pCfg->vgId, pCfg->sid);
|
||||
return TSDB_CODE_MND_INVALID_TABLE_ID;
|
||||
}
|
||||
|
||||
|
@ -1936,17 +2015,19 @@ static void mnodeProcessDropChildTableRsp(SRpcMsg *rpcMsg) {
|
|||
|
||||
SChildTableObj *pTable = (SChildTableObj *)mnodeMsg->pTable;
|
||||
assert(pTable);
|
||||
mPrint("table:%s, drop table rsp received, thandle:%p result:%s", pTable->info.tableId, mnodeMsg->rpcMsg.handle, tstrerror(rpcMsg->code));
|
||||
mPrint("app:%p:%p, table:%s, drop table rsp received, thandle:%p result:%s", mnodeMsg->rpcMsg.ahandle, mnodeMsg,
|
||||
pTable->info.tableId, mnodeMsg->rpcMsg.handle, tstrerror(rpcMsg->code));
|
||||
|
||||
if (rpcMsg->code != TSDB_CODE_SUCCESS) {
|
||||
mError("table:%s, failed to drop in dnode, reason:%s", pTable->info.tableId, tstrerror(rpcMsg->code));
|
||||
mError("app:%p:%p, table:%s, failed to drop in dnode, reason:%s", mnodeMsg->rpcMsg.ahandle, mnodeMsg,
|
||||
pTable->info.tableId, tstrerror(rpcMsg->code));
|
||||
dnodeSendRpcMnodeWriteRsp(mnodeMsg, rpcMsg->code);
|
||||
return;
|
||||
}
|
||||
|
||||
if (mnodeMsg->pVgroup == NULL) mnodeMsg->pVgroup = mnodeGetVgroup(pTable->vgId);
|
||||
if (mnodeMsg->pVgroup == NULL) {
|
||||
mError("table:%s, failed to get vgroup", pTable->info.tableId);
|
||||
mError("app:%p:%p, table:%s, failed to get vgroup", mnodeMsg->rpcMsg.ahandle, mnodeMsg, pTable->info.tableId);
|
||||
dnodeSendRpcMnodeWriteRsp(mnodeMsg, TSDB_CODE_MND_VGROUP_NOT_EXIST);
|
||||
return;
|
||||
}
|
||||
|
@ -1959,21 +2040,24 @@ static void mnodeProcessDropChildTableRsp(SRpcMsg *rpcMsg) {
|
|||
|
||||
int32_t code = sdbDeleteRow(&oper);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
mError("table:%s, update ctables sdb error", pTable->info.tableId);
|
||||
mError("app:%p:%p, table:%s, update ctables sdb error", mnodeMsg->rpcMsg.ahandle, mnodeMsg, pTable->info.tableId);
|
||||
dnodeSendRpcMnodeWriteRsp(mnodeMsg, TSDB_CODE_MND_SDB_ERROR);
|
||||
return;
|
||||
}
|
||||
|
||||
if (mnodeMsg->pVgroup->numOfTables <= 0) {
|
||||
mPrint("vgId:%d, all tables is dropped, drop vgroup", mnodeMsg->pVgroup->vgId);
|
||||
mPrint("app:%p:%p, vgId:%d, all tables is dropped, drop vgroup", mnodeMsg->rpcMsg.ahandle, mnodeMsg,
|
||||
mnodeMsg->pVgroup->vgId);
|
||||
mnodeDropVgroup(mnodeMsg->pVgroup, NULL);
|
||||
}
|
||||
|
||||
dnodeSendRpcMnodeWriteRsp(mnodeMsg, TSDB_CODE_SUCCESS);
|
||||
}
|
||||
|
||||
// handle create table response from dnode
|
||||
// if failed, drop the table cached
|
||||
/*
|
||||
* handle create table response from dnode
|
||||
* if failed, drop the table cached
|
||||
*/
|
||||
static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg) {
|
||||
if (rpcMsg->handle == NULL) return;
|
||||
|
||||
|
@ -1982,36 +2066,36 @@ static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg) {
|
|||
|
||||
SChildTableObj *pTable = (SChildTableObj *)mnodeMsg->pTable;
|
||||
assert(pTable);
|
||||
mTrace("table:%s, create table rsp received, thandle:%p result:%s", pTable->info.tableId, mnodeMsg->rpcMsg.handle,
|
||||
tstrerror(rpcMsg->code));
|
||||
|
||||
if (rpcMsg->code != TSDB_CODE_SUCCESS) {
|
||||
if (mnodeMsg->retry++ < 10) {
|
||||
mTrace("table:%s, create table rsp received, retry:%d thandle:%p result:%s", pTable->info.tableId,
|
||||
mnodeMsg->retry, mnodeMsg->rpcMsg.handle, tstrerror(rpcMsg->code));
|
||||
dnodeDelayReprocessMnodeWriteMsg(mnodeMsg);
|
||||
} else {
|
||||
mError("table:%s, failed to create in dnode, thandle:%p result:%s", pTable->info.tableId,
|
||||
mnodeMsg->rpcMsg.handle, tstrerror(rpcMsg->code));
|
||||
|
||||
SSdbOper oper = {
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.table = tsChildTableSdb,
|
||||
.pObj = pTable
|
||||
};
|
||||
sdbDeleteRow(&oper);
|
||||
|
||||
dnodeSendRpcMnodeWriteRsp(mnodeMsg, rpcMsg->code);
|
||||
}
|
||||
} else {
|
||||
mTrace("table:%s, created in dnode, thandle:%p result:%s", pTable->info.tableId, mnodeMsg->rpcMsg.handle,
|
||||
tstrerror(rpcMsg->code));
|
||||
if (rpcMsg->code == TSDB_CODE_SUCCESS || rpcMsg->code == TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
|
||||
SCMCreateTableMsg *pCreate = mnodeMsg->rpcMsg.pCont;
|
||||
if (pCreate->getMeta) {
|
||||
mTrace("table:%s, continue to get meta", pTable->info.tableId);
|
||||
mTrace("app:%p:%p, table:%s, created in dnode and continue to get meta, thandle:%p result:%s",
|
||||
mnodeMsg->rpcMsg.ahandle, mnodeMsg, pTable->info.tableId, mnodeMsg->rpcMsg.handle,
|
||||
tstrerror(rpcMsg->code));
|
||||
|
||||
mnodeMsg->retry = 0;
|
||||
dnodeReprocessMnodeWriteMsg(mnodeMsg);
|
||||
} else {
|
||||
mTrace("app:%p:%p, table:%s, created in dnode, thandle:%p result:%s", mnodeMsg->rpcMsg.ahandle, mnodeMsg,
|
||||
pTable->info.tableId, mnodeMsg->rpcMsg.handle, tstrerror(rpcMsg->code));
|
||||
|
||||
dnodeSendRpcMnodeWriteRsp(mnodeMsg, TSDB_CODE_SUCCESS);
|
||||
}
|
||||
} else {
|
||||
if (mnodeMsg->retry++ < 10) {
|
||||
mTrace("app:%p:%p, table:%s, create table rsp received, need retry, times:%d result:%s thandle:%p",
|
||||
mnodeMsg->rpcMsg.ahandle, mnodeMsg, pTable->info.tableId, mnodeMsg->retry, tstrerror(rpcMsg->code),
|
||||
mnodeMsg->rpcMsg.handle);
|
||||
|
||||
dnodeDelayReprocessMnodeWriteMsg(mnodeMsg);
|
||||
} else {
|
||||
mError("app:%p:%p, table:%s, failed to create in dnode, result:%s thandle:%p", mnodeMsg->rpcMsg.ahandle, mnodeMsg,
|
||||
pTable->info.tableId, tstrerror(rpcMsg->code), mnodeMsg->rpcMsg.handle);
|
||||
|
||||
SSdbOper oper = {.type = SDB_OPER_GLOBAL, .table = tsChildTableSdb, .pObj = pTable};
|
||||
sdbDeleteRow(&oper);
|
||||
|
||||
dnodeSendRpcMnodeWriteRsp(mnodeMsg, rpcMsg->code);
|
||||
}
|
||||
}
|
||||
|
@ -2202,22 +2286,24 @@ static int32_t mnodeRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows
|
|||
|
||||
static int32_t mnodeProcessAlterTableMsg(SMnodeMsg *pMsg) {
|
||||
SCMAlterTableMsg *pAlter = pMsg->rpcMsg.pCont;
|
||||
mTrace("table:%s, alter table msg is received from thandle:%p", pAlter->tableId, pMsg->rpcMsg.handle);
|
||||
mTrace("app:%p:%p, table:%s, alter table msg is received from thandle:%p", pMsg->rpcMsg.ahandle, pMsg,
|
||||
pAlter->tableId, pMsg->rpcMsg.handle);
|
||||
|
||||
if (pMsg->pDb == NULL) pMsg->pDb = mnodeGetDbByTableId(pAlter->tableId);
|
||||
if (pMsg->pDb == NULL || pMsg->pDb->status != TSDB_DB_STATUS_READY) {
|
||||
mError("table:%s, failed to alter table, db not selected", pAlter->tableId);
|
||||
mError("app:%p:%p, table:%s, failed to alter table, db not selected", pMsg->rpcMsg.ahandle, pMsg, pAlter->tableId);
|
||||
return TSDB_CODE_MND_DB_NOT_SELECTED;
|
||||
}
|
||||
|
||||
if (mnodeCheckIsMonitorDB(pMsg->pDb->name, tsMonitorDbName)) {
|
||||
mError("table:%s, failed to alter table, its log db", pAlter->tableId);
|
||||
mError("app:%p:%p, table:%s, failed to alter table, its log db", pMsg->rpcMsg.ahandle, pMsg, pAlter->tableId);
|
||||
return TSDB_CODE_MND_MONITOR_DB_FORBIDDEN;
|
||||
}
|
||||
|
||||
if (pMsg->pTable == NULL) pMsg->pTable = mnodeGetTable(pAlter->tableId);
|
||||
if (pMsg->pTable == NULL) {
|
||||
mError("table:%s, failed to alter table, table not exist", pMsg->pTable->tableId);
|
||||
mError("app:%p:%p, table:%s, failed to alter table, table not exist", pMsg->rpcMsg.ahandle, pMsg,
|
||||
pMsg->pTable->tableId);
|
||||
return TSDB_CODE_MND_INVALID_TABLE_NAME;
|
||||
}
|
||||
|
||||
|
@ -2226,7 +2312,8 @@ static int32_t mnodeProcessAlterTableMsg(SMnodeMsg *pMsg) {
|
|||
pAlter->tagValLen = htonl(pAlter->tagValLen);
|
||||
|
||||
if (pAlter->numOfCols > 2) {
|
||||
mError("table:%s, error numOfCols:%d in alter table", pAlter->tableId, pAlter->numOfCols);
|
||||
mError("app:%p:%p, table:%s, error numOfCols:%d in alter table", pMsg->rpcMsg.ahandle, pMsg, pAlter->tableId,
|
||||
pAlter->numOfCols);
|
||||
return TSDB_CODE_MND_APP_ERROR;
|
||||
}
|
||||
|
||||
|
@ -2237,29 +2324,29 @@ static int32_t mnodeProcessAlterTableMsg(SMnodeMsg *pMsg) {
|
|||
int32_t code = TSDB_CODE_COM_OPS_NOT_SUPPORT;
|
||||
if (pMsg->pTable->type == TSDB_SUPER_TABLE) {
|
||||
SSuperTableObj *pTable = (SSuperTableObj *)pMsg->pTable;
|
||||
mTrace("table:%s, start to alter stable", pAlter->tableId);
|
||||
mTrace("app:%p:%p, table:%s, start to alter stable", pMsg->rpcMsg.ahandle, pMsg, pAlter->tableId);
|
||||
if (pAlter->type == TSDB_ALTER_TABLE_ADD_TAG_COLUMN) {
|
||||
code = mnodeAddSuperTableTag(pTable, pAlter->schema, 1);
|
||||
code = mnodeAddSuperTableTag(pMsg, pTable, pAlter->schema, 1);
|
||||
} else if (pAlter->type == TSDB_ALTER_TABLE_DROP_TAG_COLUMN) {
|
||||
code = mnodeDropSuperTableTag(pTable, pAlter->schema[0].name);
|
||||
code = mnodeDropSuperTableTag(pMsg, pTable, pAlter->schema[0].name);
|
||||
} else if (pAlter->type == TSDB_ALTER_TABLE_CHANGE_TAG_COLUMN) {
|
||||
code = mnodeModifySuperTableTagName(pTable, pAlter->schema[0].name, pAlter->schema[1].name);
|
||||
code = mnodeModifySuperTableTagName(pMsg, pTable, pAlter->schema[0].name, pAlter->schema[1].name);
|
||||
} else if (pAlter->type == TSDB_ALTER_TABLE_ADD_COLUMN) {
|
||||
code = mnodeAddSuperTableColumn(pMsg->pDb, pTable, pAlter->schema, 1);
|
||||
code = mnodeAddSuperTableColumn(pMsg, pTable, pAlter->schema, 1);
|
||||
} else if (pAlter->type == TSDB_ALTER_TABLE_DROP_COLUMN) {
|
||||
code = mnodeDropSuperTableColumn(pMsg->pDb, pTable, pAlter->schema[0].name);
|
||||
code = mnodeDropSuperTableColumn(pMsg, pTable, pAlter->schema[0].name);
|
||||
} else {
|
||||
}
|
||||
} else {
|
||||
mTrace("table:%s, start to alter ctable", pAlter->tableId);
|
||||
mTrace("app:%p:%p, table:%s, start to alter ctable", pMsg->rpcMsg.ahandle, pMsg, pAlter->tableId);
|
||||
SChildTableObj *pTable = (SChildTableObj *)pMsg->pTable;
|
||||
if (pAlter->type == TSDB_ALTER_TABLE_UPDATE_TAG_VAL) {
|
||||
char *tagVal = (char*)(pAlter->schema + pAlter->numOfCols);
|
||||
char *tagVal = (char *)(pAlter->schema + pAlter->numOfCols);
|
||||
code = mnodeModifyChildTableTagValue(pTable, pAlter->schema[0].name, tagVal);
|
||||
} else if (pAlter->type == TSDB_ALTER_TABLE_ADD_COLUMN) {
|
||||
code = mnodeAddNormalTableColumn(pMsg->pDb, pTable, pAlter->schema, 1);
|
||||
code = mnodeAddNormalTableColumn(pMsg, pTable, pAlter->schema, 1);
|
||||
} else if (pAlter->type == TSDB_ALTER_TABLE_DROP_COLUMN) {
|
||||
code = mnodeDropNormalTableColumn(pMsg->pDb, pTable, pAlter->schema[0].name);
|
||||
code = mnodeDropNormalTableColumn(pMsg, pTable, pAlter->schema[0].name);
|
||||
} else {
|
||||
}
|
||||
}
|
||||
|
|
|
@ -102,11 +102,13 @@ static int32_t mnodeUserActionDecode(SSdbOper *pOper) {
|
|||
}
|
||||
|
||||
static int32_t mnodeUserActionRestored() {
|
||||
if (dnodeIsFirstDeploy()) {
|
||||
int32_t numOfRows = sdbGetNumOfRows(tsUserSdb);
|
||||
if (numOfRows <= 0 && dnodeIsFirstDeploy()) {
|
||||
mPrint("dnode first deploy, create root user");
|
||||
SAcctObj *pAcct = mnodeGetAcct("root");
|
||||
mnodeCreateUser(pAcct, "root", "taosdata");
|
||||
mnodeCreateUser(pAcct, "monitor", tsInternalPass);
|
||||
mnodeCreateUser(pAcct, "_root", tsInternalPass);
|
||||
mnodeCreateUser(pAcct, "root", "taosdata", NULL);
|
||||
mnodeCreateUser(pAcct, "monitor", tsInternalPass, NULL);
|
||||
mnodeCreateUser(pAcct, "_root", tsInternalPass, NULL);
|
||||
mnodeDecAcctRef(pAcct);
|
||||
}
|
||||
|
||||
|
@ -170,22 +172,24 @@ void mnodeDecUserRef(SUserObj *pUser) {
|
|||
return sdbDecRef(tsUserSdb, pUser);
|
||||
}
|
||||
|
||||
static int32_t mnodeUpdateUser(SUserObj *pUser) {
|
||||
static int32_t mnodeUpdateUser(SUserObj *pUser, void *pMsg) {
|
||||
SSdbOper oper = {
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.table = tsUserSdb,
|
||||
.pObj = pUser
|
||||
.pObj = pUser,
|
||||
.pMsg = pMsg
|
||||
};
|
||||
|
||||
int32_t code = sdbUpdateRow(&oper);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
code = TSDB_CODE_MND_SDB_ERROR;
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
mLPrint("user:%s, is altered by %s", pUser->user, mnodeGetUserFromMsg(pMsg));
|
||||
if (pMsg != NULL) code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t mnodeCreateUser(SAcctObj *pAcct, char *name, char *pass) {
|
||||
int32_t mnodeCreateUser(SAcctObj *pAcct, char *name, char *pass, void *pMsg) {
|
||||
int32_t code = acctCheck(pAcct, ACCT_GRANT_USER);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
|
@ -226,28 +230,33 @@ int32_t mnodeCreateUser(SAcctObj *pAcct, char *name, char *pass) {
|
|||
.type = SDB_OPER_GLOBAL,
|
||||
.table = tsUserSdb,
|
||||
.pObj = pUser,
|
||||
.rowSize = sizeof(SUserObj)
|
||||
.rowSize = sizeof(SUserObj),
|
||||
.pMsg = pMsg
|
||||
};
|
||||
|
||||
code = sdbInsertRow(&oper);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tfree(pUser);
|
||||
code = TSDB_CODE_MND_SDB_ERROR;
|
||||
} else {
|
||||
mLPrint("user:%s, is created by %s", pUser->user, mnodeGetUserFromMsg(pMsg));
|
||||
if (pMsg != NULL) code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t mnodeDropUser(SUserObj *pUser) {
|
||||
static int32_t mnodeDropUser(SUserObj *pUser, void *pMsg) {
|
||||
SSdbOper oper = {
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.table = tsUserSdb,
|
||||
.pObj = pUser
|
||||
.pObj = pUser,
|
||||
.pMsg = pMsg
|
||||
};
|
||||
|
||||
int32_t code = sdbDeleteRow(&oper);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
code = TSDB_CODE_MND_SDB_ERROR;
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
mLPrint("user:%s, is dropped by %s", pUser->user, mnodeGetUserFromMsg(pMsg));
|
||||
if (pMsg != NULL) code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
||||
}
|
||||
|
||||
return code;
|
||||
|
@ -357,22 +366,25 @@ SUserObj *mnodeGetUserFromConn(void *pConn) {
|
|||
}
|
||||
}
|
||||
|
||||
char *mnodeGetUserFromMsg(void *pMsg) {
|
||||
SMnodeMsg *pMnodeMsg = pMsg;
|
||||
if (pMnodeMsg != NULL &&pMnodeMsg->pUser != NULL) {
|
||||
return pMnodeMsg->pUser->user;
|
||||
} else {
|
||||
return "system";
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t mnodeProcessCreateUserMsg(SMnodeMsg *pMsg) {
|
||||
int32_t code;
|
||||
SUserObj *pOperUser = pMsg->pUser;
|
||||
|
||||
if (pOperUser->superAuth) {
|
||||
SCMCreateUserMsg *pCreate = pMsg->rpcMsg.pCont;
|
||||
code = mnodeCreateUser(pOperUser->pAcct, pCreate->user, pCreate->pass);
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
mLPrint("user:%s, is created by %s", pCreate->user, pOperUser->user);
|
||||
}
|
||||
return mnodeCreateUser(pOperUser->pAcct, pCreate->user, pCreate->pass, pMsg);
|
||||
} else {
|
||||
mError("user:%s, no rights to create user", pOperUser->user);
|
||||
code = TSDB_CODE_MND_NO_RIGHTS;
|
||||
return TSDB_CODE_MND_NO_RIGHTS;
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t mnodeProcessAlterUserMsg(SMnodeMsg *pMsg) {
|
||||
|
@ -409,8 +421,7 @@ static int32_t mnodeProcessAlterUserMsg(SMnodeMsg *pMsg) {
|
|||
if (hasRight) {
|
||||
memset(pUser->pass, 0, sizeof(pUser->pass));
|
||||
taosEncryptPass((uint8_t*)pAlter->pass, strlen(pAlter->pass), pUser->pass);
|
||||
code = mnodeUpdateUser(pUser);
|
||||
mLPrint("user:%s, password is altered by %s, result:%s", pUser->user, pOperUser->user, tstrerror(code));
|
||||
code = mnodeUpdateUser(pUser, pMsg);
|
||||
} else {
|
||||
mError("user:%s, no rights to alter user", pOperUser->user);
|
||||
code = TSDB_CODE_MND_NO_RIGHTS;
|
||||
|
@ -450,8 +461,7 @@ static int32_t mnodeProcessAlterUserMsg(SMnodeMsg *pMsg) {
|
|||
pUser->writeAuth = 1;
|
||||
}
|
||||
|
||||
code = mnodeUpdateUser(pUser);
|
||||
mLPrint("user:%s, privilege is altered by %s, result:%s", pUser->user, pOperUser->user, tstrerror(code));
|
||||
code = mnodeUpdateUser(pUser, pMsg);
|
||||
} else {
|
||||
mError("user:%s, no rights to alter user", pOperUser->user);
|
||||
code = TSDB_CODE_MND_NO_RIGHTS;
|
||||
|
@ -497,10 +507,7 @@ static int32_t mnodeProcessDropUserMsg(SMnodeMsg *pMsg) {
|
|||
}
|
||||
|
||||
if (hasRight) {
|
||||
code = mnodeDropUser(pUser);
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
mLPrint("user:%s, is dropped by %s, result:%s", pUser->user, pOperUser->user, tstrerror(code));
|
||||
}
|
||||
code = mnodeDropUser(pUser, pMsg);
|
||||
} else {
|
||||
code = TSDB_CODE_MND_NO_RIGHTS;
|
||||
}
|
||||
|
|
|
@ -299,6 +299,27 @@ void *mnodeGetNextVgroup(void *pIter, SVgObj **pVgroup) {
|
|||
return sdbFetchRow(tsVgroupSdb, pIter, (void **)pVgroup);
|
||||
}
|
||||
|
||||
static int32_t mnodeCreateVgroupCb(SMnodeMsg *pMsg, int32_t code) {
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
pMsg->pVgroup = NULL;
|
||||
return code;
|
||||
}
|
||||
|
||||
SVgObj *pVgroup = pMsg->pVgroup;
|
||||
SDbObj *pDb = pMsg->pDb;
|
||||
|
||||
mPrint("vgId:%d, is created in mnode, db:%s replica:%d", pVgroup->vgId, pDb->name, pVgroup->numOfVnodes);
|
||||
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
|
||||
mPrint("vgId:%d, index:%d, dnode:%d", pVgroup->vgId, i, pVgroup->vnodeGid[i].dnodeId);
|
||||
}
|
||||
|
||||
mnodeIncVgroupRef(pVgroup);
|
||||
pMsg->expected = pVgroup->numOfVnodes;
|
||||
mnodeSendCreateVgroupMsg(pVgroup, pMsg);
|
||||
|
||||
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
||||
}
|
||||
|
||||
int32_t mnodeCreateVgroup(SMnodeMsg *pMsg, SDbObj *pDb) {
|
||||
SVgObj *pVgroup = (SVgObj *)calloc(1, sizeof(SVgObj));
|
||||
strcpy(pVgroup->dbName, pDb->name);
|
||||
|
@ -314,26 +335,22 @@ int32_t mnodeCreateVgroup(SMnodeMsg *pMsg, SDbObj *pDb) {
|
|||
.type = SDB_OPER_GLOBAL,
|
||||
.table = tsVgroupSdb,
|
||||
.pObj = pVgroup,
|
||||
.rowSize = sizeof(SVgObj)
|
||||
.rowSize = sizeof(SVgObj),
|
||||
.pMsg = pMsg,
|
||||
.cb = mnodeCreateVgroupCb
|
||||
};
|
||||
|
||||
pMsg->pVgroup = pVgroup;
|
||||
|
||||
int32_t code = sdbInsertRow(&oper);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
pMsg->pVgroup = NULL;
|
||||
tfree(pVgroup);
|
||||
return TSDB_CODE_MND_SDB_ERROR;
|
||||
} else {
|
||||
if (pMsg != NULL) code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
||||
}
|
||||
|
||||
mPrint("vgId:%d, is created in mnode, db:%s replica:%d", pVgroup->vgId, pDb->name, pVgroup->numOfVnodes);
|
||||
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
|
||||
mPrint("vgId:%d, index:%d, dnode:%d", pVgroup->vgId, i, pVgroup->vnodeGid[i].dnodeId);
|
||||
}
|
||||
|
||||
mnodeIncVgroupRef(pVgroup);
|
||||
pMsg->pVgroup = pVgroup;
|
||||
pMsg->expected = pVgroup->numOfVnodes;
|
||||
mnodeSendCreateVgroupMsg(pVgroup, pMsg);
|
||||
|
||||
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
||||
return code;
|
||||
}
|
||||
|
||||
void mnodeDropVgroup(SVgObj *pVgroup, void *ahandle) {
|
||||
|
@ -596,7 +613,6 @@ SRpcIpSet mnodeGetIpSetFromIp(char *ep) {
|
|||
}
|
||||
|
||||
void mnodeSendCreateVnodeMsg(SVgObj *pVgroup, SRpcIpSet *ipSet, void *ahandle) {
|
||||
mTrace("vgId:%d, send create vnode:%d msg, ahandle:%p db:%s", pVgroup->vgId, pVgroup->vgId, ahandle, pVgroup->dbName);
|
||||
SMDCreateVnodeMsg *pCreate = mnodeBuildCreateVnodeMsg(pVgroup);
|
||||
SRpcMsg rpcMsg = {
|
||||
.handle = ahandle,
|
||||
|
@ -609,9 +625,12 @@ void mnodeSendCreateVnodeMsg(SVgObj *pVgroup, SRpcIpSet *ipSet, void *ahandle) {
|
|||
}
|
||||
|
||||
void mnodeSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle) {
|
||||
mTrace("vgId:%d, send create all vnodes msg, ahandle:%p", pVgroup->vgId, ahandle);
|
||||
mTrace("vgId:%d, send create all vnodes msg, numOfVnodes:%d db:%s", pVgroup->vgId, pVgroup->numOfVnodes,
|
||||
pVgroup->dbName);
|
||||
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
|
||||
SRpcIpSet ipSet = mnodeGetIpSetFromIp(pVgroup->vnodeGid[i].pDnode->dnodeEp);
|
||||
mTrace("vgId:%d, index:%d, send create vnode msg to dnode %s, ahandle:%p", pVgroup->vgId,
|
||||
i, pVgroup->vnodeGid[i].pDnode->dnodeEp, ahandle);
|
||||
mnodeSendCreateVnodeMsg(pVgroup, &ipSet, ahandle);
|
||||
}
|
||||
}
|
||||
|
@ -729,6 +748,7 @@ static int32_t mnodeProcessVnodeCfgMsg(SMnodeMsg *pMsg) {
|
|||
}
|
||||
mnodeDecVgroupRef(pVgroup);
|
||||
|
||||
mTrace("vgId:%d, send create vnode msg to dnode %s for vnode cfg msg", pVgroup->vgId, pDnode->dnodeEp);
|
||||
SRpcIpSet ipSet = mnodeGetIpSetFromIp(pDnode->dnodeEp);
|
||||
mnodeSendCreateVnodeMsg(pVgroup, &ipSet, NULL);
|
||||
|
||||
|
|
|
@ -43,7 +43,7 @@ void mnodeAddWriteMsgHandle(uint8_t msgType, int32_t (*fp)(SMnodeMsg *mnodeMsg))
|
|||
|
||||
int32_t mnodeProcessWrite(SMnodeMsg *pMsg) {
|
||||
if (pMsg->rpcMsg.pCont == NULL) {
|
||||
mError("%p, msg:%s in mwrite queue, content is null", pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType]);
|
||||
mError("app:%p:%p, msg:%s content is null", pMsg->rpcMsg.ahandle, pMsg, taosMsg[pMsg->rpcMsg.msgType]);
|
||||
return TSDB_CODE_MND_INVALID_MSG_LEN;
|
||||
}
|
||||
|
||||
|
@ -54,27 +54,31 @@ int32_t mnodeProcessWrite(SMnodeMsg *pMsg) {
|
|||
rpcRsp->rsp = ipSet;
|
||||
rpcRsp->len = sizeof(SRpcIpSet);
|
||||
|
||||
mTrace("%p, msg:%s in mwrite queue, will be redireced inUse:%d", pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType], ipSet->inUse);
|
||||
mTrace("app:%p:%p, msg:%s will be redireced inUse:%d", pMsg->rpcMsg.ahandle, pMsg, taosMsg[pMsg->rpcMsg.msgType],
|
||||
ipSet->inUse);
|
||||
for (int32_t i = 0; i < ipSet->numOfIps; ++i) {
|
||||
mTrace("mnode index:%d ip:%s:%d", i, ipSet->fqdn[i], htons(ipSet->port[i]));
|
||||
mTrace("app:%p:%p, mnode index:%d ip:%s:%d", pMsg->rpcMsg.ahandle, pMsg, i, ipSet->fqdn[i],
|
||||
htons(ipSet->port[i]));
|
||||
}
|
||||
|
||||
return TSDB_CODE_RPC_REDIRECT;
|
||||
}
|
||||
|
||||
if (tsMnodeProcessWriteMsgFp[pMsg->rpcMsg.msgType] == NULL) {
|
||||
mError("%p, msg:%s in mwrite queue, not processed", pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType]);
|
||||
mError("app:%p:%p, msg:%s not processed", pMsg->rpcMsg.ahandle, pMsg, taosMsg[pMsg->rpcMsg.msgType]);
|
||||
return TSDB_CODE_MND_MSG_NOT_PROCESSED;
|
||||
}
|
||||
|
||||
int32_t code = mnodeInitMsg(pMsg);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
mError("%p, msg:%s in mwrite queue, not processed reason:%s", pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType], tstrerror(code));
|
||||
mError("app:%p:%p, msg:%s not processed, reason:%s", pMsg->rpcMsg.ahandle, pMsg, taosMsg[pMsg->rpcMsg.msgType],
|
||||
tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
if (!pMsg->pUser->writeAuth) {
|
||||
mError("%p, msg:%s in mwrite queue, not processed, no write auth", pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType]);
|
||||
mError("app:%p:%p, msg:%s not processed, no write auth", pMsg->rpcMsg.ahandle, pMsg,
|
||||
taosMsg[pMsg->rpcMsg.msgType]);
|
||||
return TSDB_CODE_MND_NO_RIGHTS;
|
||||
}
|
||||
|
||||
|
|
|
@ -117,7 +117,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
|
|||
code = pthread_mutex_init(&(pThreadObj->mutex), NULL);
|
||||
if (code < 0) {
|
||||
tError("%s failed to init TCP process data mutex(%s)", label, strerror(errno));
|
||||
break;;
|
||||
break;
|
||||
}
|
||||
|
||||
pThreadObj->pollFd = epoll_create(10); // size does not matter
|
||||
|
@ -367,7 +367,7 @@ static void taosReportBrokenLink(SFdObj *pFdObj) {
|
|||
recvInfo.ip = 0;
|
||||
recvInfo.port = 0;
|
||||
recvInfo.shandle = pThreadObj->shandle;
|
||||
recvInfo.thandle = pFdObj->thandle;;
|
||||
recvInfo.thandle = pFdObj->thandle;
|
||||
recvInfo.chandle = NULL;
|
||||
recvInfo.connType = RPC_CONN_TCP;
|
||||
(*(pThreadObj->processData))(&recvInfo);
|
||||
|
@ -414,7 +414,7 @@ static int taosReadTcpData(SFdObj *pFdObj, SRecvInfo *pInfo) {
|
|||
pInfo->ip = pFdObj->ip;
|
||||
pInfo->port = pFdObj->port;
|
||||
pInfo->shandle = pThreadObj->shandle;
|
||||
pInfo->thandle = pFdObj->thandle;;
|
||||
pInfo->thandle = pFdObj->thandle;
|
||||
pInfo->chandle = pFdObj;
|
||||
pInfo->connType = RPC_CONN_TCP;
|
||||
|
||||
|
|
|
@ -29,7 +29,7 @@ static FILE* fpAllocLog = NULL;
|
|||
// memory allocator which fails randomly
|
||||
|
||||
extern int32_t taosGetTimestampSec();
|
||||
static int32_t startTime = INT32_MAX;;
|
||||
static int32_t startTime = INT32_MAX;
|
||||
|
||||
static bool random_alloc_fail(size_t size, const char* file, uint32_t line) {
|
||||
if (taosGetTimestampSec() < startTime) {
|
||||
|
|
|
@ -119,7 +119,7 @@ int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) {
|
|||
tsdbCfg.minRowsPerFileBlock = pVnodeCfg->cfg.minRowsPerFileBlock;
|
||||
tsdbCfg.maxRowsPerFileBlock = pVnodeCfg->cfg.maxRowsPerFileBlock;
|
||||
tsdbCfg.precision = pVnodeCfg->cfg.precision;
|
||||
tsdbCfg.compression = pVnodeCfg->cfg.compression;;
|
||||
tsdbCfg.compression = pVnodeCfg->cfg.compression;
|
||||
|
||||
char tsdbDir[TSDB_FILENAME_LEN] = {0};
|
||||
sprintf(tsdbDir, "%s/vnode%d/tsdb", tsVnodeDir, pVnodeCfg->cfg.vgId);
|
||||
|
|
|
@ -359,3 +359,4 @@ cd ../../../debug; make
|
|||
./test.sh -f unique/arbitrator/sync_replica3_alterTable_drop.sim
|
||||
./test.sh -f unique/arbitrator/sync_replica3_dropDb.sim
|
||||
./test.sh -f unique/arbitrator/sync_replica3_dropTable.sim
|
||||
|
||||
|
|
|
@ -20,10 +20,10 @@ system sh/cfg.sh -n dnode2 -c mnodeEqualVnodeNum -v 4
|
|||
system sh/cfg.sh -n dnode3 -c mnodeEqualVnodeNum -v 4
|
||||
system sh/cfg.sh -n dnode4 -c mnodeEqualVnodeNum -v 4
|
||||
|
||||
system sh/cfg.sh -n dnode1 -c numOfTotalVnodes -v 4
|
||||
system sh/cfg.sh -n dnode2 -c numOfTotalVnodes -v 4
|
||||
system sh/cfg.sh -n dnode3 -c numOfTotalVnodes -v 4
|
||||
system sh/cfg.sh -n dnode4 -c numOfTotalVnodes -v 4
|
||||
system sh/cfg.sh -n dnode1 -c numOfTotalVnodes -v 20
|
||||
system sh/cfg.sh -n dnode2 -c numOfTotalVnodes -v 20
|
||||
system sh/cfg.sh -n dnode3 -c numOfTotalVnodes -v 20
|
||||
system sh/cfg.sh -n dnode4 -c numOfTotalVnodes -v 20
|
||||
|
||||
system sh/cfg.sh -n dnode1 -c http -v 1
|
||||
system sh/cfg.sh -n dnode2 -c http -v 1
|
||||
|
|
|
@ -33,6 +33,7 @@ typedef struct {
|
|||
int threadIndex;
|
||||
char dbName[32];
|
||||
char stableName[64];
|
||||
float createTableSpeed;
|
||||
pthread_t thread;
|
||||
} SInfo;
|
||||
|
||||
|
@ -49,8 +50,8 @@ int64_t numOfThreads = 1;
|
|||
int64_t numOfTablesPerThread = 200;
|
||||
char dbName[32] = "db";
|
||||
char stableName[64] = "st";
|
||||
int32_t cache = 16384;
|
||||
int32_t tables = 1000;
|
||||
int32_t cache = 16;
|
||||
int32_t tables = 5000;
|
||||
|
||||
int main(int argc, char *argv[]) {
|
||||
shellParseArgument(argc, argv);
|
||||
|
@ -63,9 +64,8 @@ int main(int argc, char *argv[]) {
|
|||
void createDbAndTable() {
|
||||
pPrint("start to create table");
|
||||
|
||||
TAOS_RES * pSql;
|
||||
TAOS * con;
|
||||
struct timeval systemTime;
|
||||
int64_t st, et;
|
||||
char qstr[64000];
|
||||
|
||||
char fqdn[TSDB_FQDN_LEN];
|
||||
|
@ -77,21 +77,23 @@ void createDbAndTable() {
|
|||
exit(1);
|
||||
}
|
||||
|
||||
sprintf(qstr, "create database if not exists %s cache %d tables %d", dbName, cache, tables);
|
||||
if (taos_query(con, qstr)) {
|
||||
pError("failed to create database:%s, code:%d reason:%s", dbName, taos_errno(con), taos_errstr(con));
|
||||
sprintf(qstr, "create database if not exists %s cache %d maxtables %d", dbName, cache, tables);
|
||||
pSql = taos_query(con, qstr);
|
||||
int32_t code = taos_errno(pSql);
|
||||
if (code != 0) {
|
||||
pError("failed to create database:%s, sql:%s, code:%d reason:%s", dbName, qstr, taos_errno(con), taos_errstr(con));
|
||||
exit(0);
|
||||
}
|
||||
taos_free_result(pSql);
|
||||
|
||||
sprintf(qstr, "use %s", dbName);
|
||||
if (taos_query(con, qstr)) {
|
||||
pSql = taos_query(con, qstr);
|
||||
code = taos_errno(pSql);
|
||||
if (code != 0) {
|
||||
pError("failed to use db, code:%d reason:%s", taos_errno(con), taos_errstr(con));
|
||||
exit(0);
|
||||
}
|
||||
|
||||
gettimeofday(&systemTime, NULL);
|
||||
st = systemTime.tv_sec * 1000000 + systemTime.tv_usec;
|
||||
int64_t totalTables = numOfTablesPerThread * numOfThreads;
|
||||
taos_free_result(pSql);
|
||||
|
||||
if (strcmp(stableName, "no") != 0) {
|
||||
int len = sprintf(qstr, "create table if not exists %s(ts timestamp", stableName);
|
||||
|
@ -100,36 +102,14 @@ void createDbAndTable() {
|
|||
}
|
||||
sprintf(qstr + len, ") tags(t int)");
|
||||
|
||||
if (taos_query(con, qstr)) {
|
||||
pSql = taos_query(con, qstr);
|
||||
code = taos_errno(pSql);
|
||||
if (code != 0) {
|
||||
pError("failed to create stable, code:%d reason:%s", taos_errno(con), taos_errstr(con));
|
||||
exit(0);
|
||||
}
|
||||
|
||||
for (int64_t t = 0; t < totalTables; ++t) {
|
||||
sprintf(qstr, "create table if not exists %s%ld using %s tags(%ld)", stableName, t, stableName, t);
|
||||
if (taos_query(con, qstr)) {
|
||||
pError("failed to create table %s%d, reason:%s", stableName, t, taos_errstr(con));
|
||||
exit(0);
|
||||
taos_free_result(pSql);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
for (int64_t t = 0; t < totalTables; ++t) {
|
||||
int len = sprintf(qstr, "create table if not exists %s%ld(ts timestamp", stableName, t);
|
||||
for (int64_t f = 0; f < pointsPerTable; ++f) {
|
||||
len += sprintf(qstr + len, ", f%ld double", f);
|
||||
}
|
||||
sprintf(qstr + len, ")");
|
||||
|
||||
if (taos_query(con, qstr)) {
|
||||
pError("failed to create table %s%ld, reason:%s", stableName, t, taos_errstr(con));
|
||||
exit(0);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
gettimeofday(&systemTime, NULL);
|
||||
et = systemTime.tv_sec * 1000000 + systemTime.tv_usec;
|
||||
pPrint("%.1f seconds to create %ld tables", (et - st) / 1000.0 / 1000.0, totalTables);
|
||||
}
|
||||
|
||||
void insertData() {
|
||||
|
@ -144,7 +124,7 @@ void insertData() {
|
|||
pthread_attr_t thattr;
|
||||
pthread_attr_init(&thattr);
|
||||
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
|
||||
SInfo *pInfo = (SInfo *)malloc(sizeof(SInfo) * numOfThreads);
|
||||
SInfo *pInfo = (SInfo *)calloc(numOfThreads, sizeof(SInfo));
|
||||
|
||||
// Start threads to write
|
||||
for (int i = 0; i < numOfThreads; ++i) {
|
||||
|
@ -173,10 +153,15 @@ void insertData() {
|
|||
double speedOfRows = totalRows / seconds;
|
||||
double speedOfPoints = totalPoints / seconds;
|
||||
|
||||
float createTableSpeed = 0;
|
||||
for (int i = 0; i < numOfThreads; ++i) {
|
||||
createTableSpeed += pInfo[i].createTableSpeed;
|
||||
}
|
||||
|
||||
pPrint(
|
||||
"%sall threads:%ld finished, use %.1lf seconds, tables:%.ld rows:%ld points:%ld, speed RowsPerSecond:%.1lf "
|
||||
"PointsPerSecond:%.1lf%s",
|
||||
GREEN, numOfThreads, seconds, totalTables, totalRows, totalPoints, speedOfRows, speedOfPoints, NC);
|
||||
"PointsPerSecond:%.1lf CreateTableSpeed:%.1f t/s %s",
|
||||
GREEN, numOfThreads, seconds, totalTables, totalRows, totalPoints, speedOfRows, speedOfPoints, createTableSpeed, NC);
|
||||
|
||||
pPrint("threads exit");
|
||||
|
||||
|
@ -191,6 +176,7 @@ void *syncTest(void *param) {
|
|||
int64_t st, et;
|
||||
char qstr[65000];
|
||||
int maxBytes = 60000;
|
||||
int code;
|
||||
|
||||
pPrint("thread:%d, start to run", pInfo->threadIndex);
|
||||
|
||||
|
@ -210,6 +196,48 @@ void *syncTest(void *param) {
|
|||
gettimeofday(&systemTime, NULL);
|
||||
st = systemTime.tv_sec * 1000000 + systemTime.tv_usec;
|
||||
|
||||
if (strcmp(stableName, "no") != 0) {
|
||||
for (int64_t t = pInfo->tableBeginIndex; t < pInfo->tableEndIndex; ++t) {
|
||||
sprintf(qstr, "create table if not exists %s%ld using %s tags(%ld)", stableName, t, stableName, t);
|
||||
TAOS_RES *pSql = taos_query(con, qstr);
|
||||
code = taos_errno(pSql);
|
||||
if (code != 0) {
|
||||
pError("failed to create table %s%d, reason:%s", stableName, t, taos_errstr(con));
|
||||
exit(0);
|
||||
}
|
||||
taos_free_result(pSql);
|
||||
}
|
||||
} else {
|
||||
for (int64_t t = pInfo->tableBeginIndex; t < pInfo->tableEndIndex; ++t) {
|
||||
int len = sprintf(qstr, "create table if not exists %s%ld(ts timestamp", stableName, t);
|
||||
for (int64_t f = 0; f < pointsPerTable; ++f) {
|
||||
len += sprintf(qstr + len, ", f%ld double", f);
|
||||
}
|
||||
sprintf(qstr + len, ")");
|
||||
|
||||
TAOS_RES *pSql = taos_query(con, qstr);
|
||||
code = taos_errno(pSql);
|
||||
if (code != 0) {
|
||||
pError("failed to create table %s%ld, reason:%s", stableName, t, taos_errstr(con));
|
||||
exit(0);
|
||||
}
|
||||
taos_free_result(pSql);
|
||||
}
|
||||
}
|
||||
|
||||
gettimeofday(&systemTime, NULL);
|
||||
et = systemTime.tv_sec * 1000000 + systemTime.tv_usec;
|
||||
float seconds = (et - st) / 1000.0 / 1000.0;
|
||||
int64_t tables = pInfo->tableEndIndex - pInfo->tableBeginIndex;
|
||||
pInfo->createTableSpeed = (float)tables / seconds;
|
||||
pPrint("thread:%d, %.1f seconds to create %ld tables, speed:%.1f", pInfo->threadIndex, seconds, tables,
|
||||
pInfo->createTableSpeed);
|
||||
|
||||
if (pInfo->rowsPerTable == 0) return NULL;
|
||||
|
||||
gettimeofday(&systemTime, NULL);
|
||||
st = systemTime.tv_sec * 1000000 + systemTime.tv_usec;
|
||||
|
||||
int64_t start = 1430000000000;
|
||||
int64_t interval = 1000; // 1000 ms
|
||||
|
||||
|
@ -227,10 +255,13 @@ void *syncTest(void *param) {
|
|||
}
|
||||
len += sprintf(sql + len, ")");
|
||||
if (len > maxBytes) {
|
||||
if (taos_query(con, qstr)) {
|
||||
TAOS_RES *pSql = taos_query(con, qstr);
|
||||
int32_t code = taos_errno(pSql);
|
||||
if (code != 0) {
|
||||
pError("thread:%d, failed to insert table:%s%ld row:%ld, reason:%s", pInfo->threadIndex, pInfo->stableName,
|
||||
table, row, taos_errstr(con));
|
||||
}
|
||||
taos_free_result(pSql);
|
||||
|
||||
// "insert into"
|
||||
len = sprintf(sql, "%s", inserStr);
|
||||
|
@ -239,7 +270,8 @@ void *syncTest(void *param) {
|
|||
}
|
||||
|
||||
if (len != strlen(inserStr)) {
|
||||
taos_query(con, qstr);
|
||||
TAOS_RES *pSql = taos_query(con, qstr);
|
||||
taos_free_result(pSql);
|
||||
}
|
||||
|
||||
gettimeofday(&systemTime, NULL);
|
||||
|
|
|
@ -49,8 +49,8 @@ int64_t numOfThreads = 1;
|
|||
int64_t numOfTablesPerThread = 1;
|
||||
char dbName[32] = "db";
|
||||
char stableName[64] = "st";
|
||||
int32_t cache = 16384;
|
||||
int32_t tables = 1000;
|
||||
int32_t cache = 16;
|
||||
int32_t tables = 5000;
|
||||
|
||||
int main(int argc, char *argv[]) {
|
||||
shellParseArgument(argc, argv);
|
||||
|
@ -63,6 +63,7 @@ int main(int argc, char *argv[]) {
|
|||
void createDbAndTable() {
|
||||
pPrint("start to create table");
|
||||
|
||||
TAOS_RES * pSql;
|
||||
TAOS * con;
|
||||
struct timeval systemTime;
|
||||
int64_t st, et;
|
||||
|
@ -79,17 +80,22 @@ void createDbAndTable() {
|
|||
exit(1);
|
||||
}
|
||||
|
||||
sprintf(qstr, "create database if not exists %s cache %d tables %d", dbName, cache, tables);
|
||||
if (taos_query(con, qstr)) {
|
||||
pError("failed to create database:%s, code:%d reason:%s", dbName, taos_errno(con), taos_errstr(con));
|
||||
sprintf(qstr, "create database if not exists %s cache %d maxtables %d", dbName, cache, tables);
|
||||
pSql = taos_query(con, qstr);
|
||||
int32_t code = taos_errno(pSql);
|
||||
if (code != 0) {
|
||||
pError("failed to create database:%s, sql:%s, code:%d reason:%s", dbName, qstr, taos_errno(con), taos_errstr(con));
|
||||
exit(0);
|
||||
}
|
||||
|
||||
sprintf(qstr, "use %s", dbName);
|
||||
if (taos_query(con, qstr)) {
|
||||
pSql = taos_query(con, qstr);
|
||||
code = taos_errno(pSql);
|
||||
if (code != 0) {
|
||||
pError("failed to use db, code:%d reason:%s", taos_errno(con), taos_errstr(con));
|
||||
exit(0);
|
||||
}
|
||||
taos_stop_query(pSql);
|
||||
|
||||
gettimeofday(&systemTime, NULL);
|
||||
st = systemTime.tv_sec * 1000000 + systemTime.tv_usec;
|
||||
|
@ -102,17 +108,23 @@ void createDbAndTable() {
|
|||
}
|
||||
sprintf(qstr + len, ") tags(t int)");
|
||||
|
||||
if (taos_query(con, qstr)) {
|
||||
pSql = taos_query(con, qstr);
|
||||
code = taos_errno(pSql);
|
||||
if (code != 0) {
|
||||
pError("failed to create stable, code:%d reason:%s", taos_errno(con), taos_errstr(con));
|
||||
exit(0);
|
||||
}
|
||||
taos_stop_query(pSql);
|
||||
|
||||
for (int64_t t = 0; t < totalTables; ++t) {
|
||||
sprintf(qstr, "create table if not exists %s%ld using %s tags(%ld)", stableName, t, stableName, t);
|
||||
if (taos_query(con, qstr)) {
|
||||
pSql = taos_query(con, qstr);
|
||||
code = taos_errno(pSql);
|
||||
if (code != 0) {
|
||||
pError("failed to create table %s%d, reason:%s", stableName, t, taos_errstr(con));
|
||||
exit(0);
|
||||
}
|
||||
taos_stop_query(pSql);
|
||||
}
|
||||
} else {
|
||||
for (int64_t t = 0; t < totalTables; ++t) {
|
||||
|
@ -122,16 +134,20 @@ void createDbAndTable() {
|
|||
}
|
||||
sprintf(qstr + len, ")");
|
||||
|
||||
if (taos_query(con, qstr)) {
|
||||
pSql = taos_query(con, qstr);
|
||||
code = taos_errno(pSql);
|
||||
if (code != 0) {
|
||||
pError("failed to create table %s%ld, reason:%s", stableName, t, taos_errstr(con));
|
||||
exit(0);
|
||||
}
|
||||
taos_stop_query(pSql);
|
||||
}
|
||||
}
|
||||
|
||||
gettimeofday(&systemTime, NULL);
|
||||
et = systemTime.tv_sec * 1000000 + systemTime.tv_usec;
|
||||
pPrint("%.1f seconds to create %ld tables", (et - st) / 1000.0 / 1000.0, totalTables);
|
||||
float seconds = (et - st) / 1000.0 / 1000.0;
|
||||
pPrint("%.1f seconds to create %ld tables, speed:%.1f", seconds, totalTables, totalTables / seconds);
|
||||
}
|
||||
|
||||
void insertData() {
|
||||
|
@ -141,7 +157,12 @@ void insertData() {
|
|||
gettimeofday(&systemTime, NULL);
|
||||
st = systemTime.tv_sec * 1000000 + systemTime.tv_usec;
|
||||
|
||||
if (rowsPerTable <= 0) {
|
||||
pPrint("not insert data for rowsPerTable is :%d", rowsPerTable);
|
||||
exit(0);
|
||||
} else {
|
||||
pPrint("%d threads are spawned to insert data", numOfThreads);
|
||||
}
|
||||
|
||||
pthread_attr_t thattr;
|
||||
pthread_attr_init(&thattr);
|
||||
|
@ -230,10 +251,13 @@ void *syncTest(void *param) {
|
|||
}
|
||||
len += sprintf(sql + len, ")");
|
||||
if (len > maxBytes) {
|
||||
if (taos_query(con, qstr)) {
|
||||
TAOS_RES *pSql = taos_query(con, qstr);
|
||||
int32_t code = taos_errno(pSql);
|
||||
if (code != 0) {
|
||||
pError("thread:%d, failed to insert table:%s%ld row:%ld, reason:%s", pInfo->threadIndex, pInfo->stableName,
|
||||
table, row, taos_errstr(con));
|
||||
}
|
||||
taos_stop_query(pSql);
|
||||
|
||||
// "insert into"
|
||||
len = sprintf(sql, "%s", inserStr);
|
||||
|
|
Loading…
Reference in New Issue