Merge branch 'develop' into feature/2.0tsdb
This commit is contained in:
commit
1c4c9c9507
|
@ -191,7 +191,8 @@ int tscSendMsgToServer(SSqlObj *pSql) {
|
||||||
.msgType = pSql->cmd.msgType,
|
.msgType = pSql->cmd.msgType,
|
||||||
.pCont = pMsg,
|
.pCont = pMsg,
|
||||||
.contLen = pSql->cmd.payloadLen,
|
.contLen = pSql->cmd.payloadLen,
|
||||||
.handle = pSql,
|
.ahandle = pSql,
|
||||||
|
.handle = &pSql->pRpcCtx,
|
||||||
.code = 0
|
.code = 0
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -199,12 +200,12 @@ int tscSendMsgToServer(SSqlObj *pSql) {
|
||||||
// Otherwise, the pSql object may have been released already during the response function, which is
|
// Otherwise, the pSql object may have been released already during the response function, which is
|
||||||
// processMsgFromServer function. In the meanwhile, the assignment of the rpc context to sql object will absolutely
|
// processMsgFromServer function. In the meanwhile, the assignment of the rpc context to sql object will absolutely
|
||||||
// cause crash.
|
// cause crash.
|
||||||
/*pSql->pRpcCtx = */rpcSendRequest(pObj->pDnodeConn, &pSql->ipList, &rpcMsg);
|
rpcSendRequest(pObj->pDnodeConn, &pSql->ipList, &rpcMsg);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet) {
|
void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet) {
|
||||||
SSqlObj *pSql = (SSqlObj *)rpcMsg->handle;
|
SSqlObj *pSql = (SSqlObj *)rpcMsg->ahandle;
|
||||||
if (pSql == NULL || pSql->signature != pSql) {
|
if (pSql == NULL || pSql->signature != pSql) {
|
||||||
tscError("%p sql is already released", pSql);
|
tscError("%p sql is already released", pSql);
|
||||||
return;
|
return;
|
||||||
|
|
|
@ -117,6 +117,7 @@ extern char tsDataDir[];
|
||||||
extern char tsLogDir[];
|
extern char tsLogDir[];
|
||||||
extern char tsScriptDir[];
|
extern char tsScriptDir[];
|
||||||
extern int64_t tsMsPerDay[3];
|
extern int64_t tsMsPerDay[3];
|
||||||
|
extern char tsVnodeBakDir[];
|
||||||
|
|
||||||
// system info
|
// system info
|
||||||
extern char tsOsName[];
|
extern char tsOsName[];
|
||||||
|
|
|
@ -153,6 +153,7 @@ char tsDnodeDir[TSDB_FILENAME_LEN] = {0};
|
||||||
char tsMnodeDir[TSDB_FILENAME_LEN] = {0};
|
char tsMnodeDir[TSDB_FILENAME_LEN] = {0};
|
||||||
char tsDataDir[TSDB_FILENAME_LEN] = "/var/lib/taos";
|
char tsDataDir[TSDB_FILENAME_LEN] = "/var/lib/taos";
|
||||||
char tsScriptDir[TSDB_FILENAME_LEN] = "/etc/taos";
|
char tsScriptDir[TSDB_FILENAME_LEN] = "/etc/taos";
|
||||||
|
char tsVnodeBakDir[TSDB_FILENAME_LEN] = {0};
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* minimum scale for whole system, millisecond by default
|
* minimum scale for whole system, millisecond by default
|
||||||
|
|
|
@ -171,6 +171,7 @@ static int32_t dnodeInitStorage() {
|
||||||
sprintf(tsMnodeDir, "%s/mnode", tsDataDir);
|
sprintf(tsMnodeDir, "%s/mnode", tsDataDir);
|
||||||
sprintf(tsVnodeDir, "%s/vnode", tsDataDir);
|
sprintf(tsVnodeDir, "%s/vnode", tsDataDir);
|
||||||
sprintf(tsDnodeDir, "%s/dnode", tsDataDir);
|
sprintf(tsDnodeDir, "%s/dnode", tsDataDir);
|
||||||
|
sprintf(tsVnodeBakDir, "%s/vnode_bak", tsDataDir);
|
||||||
|
|
||||||
//TODO(dengyihao): no need to init here
|
//TODO(dengyihao): no need to init here
|
||||||
if (dnodeCreateDir(tsMnodeDir) < 0) {
|
if (dnodeCreateDir(tsMnodeDir) < 0) {
|
||||||
|
@ -186,6 +187,10 @@ static int32_t dnodeInitStorage() {
|
||||||
dError("failed to create dir: %s, reason: %s", tsDnodeDir, strerror(errno));
|
dError("failed to create dir: %s, reason: %s", tsDnodeDir, strerror(errno));
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
if (dnodeCreateDir(tsVnodeBakDir) < 0) {
|
||||||
|
dError("failed to create dir: %s, reason: %s", tsVnodeBakDir, strerror(errno));
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
dnodeCheckDataDirOpenned(tsDnodeDir);
|
dnodeCheckDataDirOpenned(tsDnodeDir);
|
||||||
|
|
||||||
|
|
|
@ -47,8 +47,8 @@ typedef struct SRpcMsg {
|
||||||
void *pCont;
|
void *pCont;
|
||||||
int contLen;
|
int contLen;
|
||||||
int32_t code;
|
int32_t code;
|
||||||
void *handle;
|
void *handle; // rpc handle returned to app
|
||||||
void *ahandle; //app handle set by client, for debug purpose
|
void *ahandle; // app handle set by client
|
||||||
} SRpcMsg;
|
} SRpcMsg;
|
||||||
|
|
||||||
typedef struct SRpcInit {
|
typedef struct SRpcInit {
|
||||||
|
@ -78,11 +78,11 @@ void rpcClose(void *);
|
||||||
void *rpcMallocCont(int contLen);
|
void *rpcMallocCont(int contLen);
|
||||||
void rpcFreeCont(void *pCont);
|
void rpcFreeCont(void *pCont);
|
||||||
void *rpcReallocCont(void *ptr, int contLen);
|
void *rpcReallocCont(void *ptr, int contLen);
|
||||||
void *rpcSendRequest(void *thandle, const SRpcIpSet *pIpSet, const SRpcMsg *pMsg);
|
void rpcSendRequest(void *thandle, const SRpcIpSet *pIpSet, SRpcMsg *pMsg);
|
||||||
void rpcSendResponse(const SRpcMsg *pMsg);
|
void rpcSendResponse(const SRpcMsg *pMsg);
|
||||||
void rpcSendRedirectRsp(void *pConn, const SRpcIpSet *pIpSet);
|
void rpcSendRedirectRsp(void *pConn, const SRpcIpSet *pIpSet);
|
||||||
int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo);
|
int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo);
|
||||||
void rpcSendRecv(void *shandle, SRpcIpSet *pIpSet, const SRpcMsg *pReq, SRpcMsg *pRsp);
|
void rpcSendRecv(void *shandle, SRpcIpSet *pIpSet, SRpcMsg *pReq, SRpcMsg *pRsp);
|
||||||
int rpcReportProgress(void *pConn, char *pCont, int contLen);
|
int rpcReportProgress(void *pConn, char *pCont, int contLen);
|
||||||
void rpcCancelRequest(void *pContext);
|
void rpcCancelRequest(void *pContext);
|
||||||
|
|
||||||
|
|
|
@ -264,7 +264,7 @@ static int32_t mnodeProcessCfgDnodeMsg(SMnodeMsg *pMsg) {
|
||||||
strcpy(pMdCfgDnode->config, pCmCfgDnode->config);
|
strcpy(pMdCfgDnode->config, pCmCfgDnode->config);
|
||||||
|
|
||||||
SRpcMsg rpcMdCfgDnodeMsg = {
|
SRpcMsg rpcMdCfgDnodeMsg = {
|
||||||
.handle = 0,
|
.ahandle = 0,
|
||||||
.code = 0,
|
.code = 0,
|
||||||
.msgType = TSDB_MSG_TYPE_MD_CONFIG_DNODE,
|
.msgType = TSDB_MSG_TYPE_MD_CONFIG_DNODE,
|
||||||
.pCont = pMdCfgDnode,
|
.pCont = pMdCfgDnode,
|
||||||
|
|
|
@ -1574,7 +1574,7 @@ static int32_t mnodeDoCreateChildTableCb(SMnodeMsg *pMsg, int32_t code) {
|
||||||
|
|
||||||
SRpcIpSet ipSet = mnodeGetIpSetFromVgroup(pMsg->pVgroup);
|
SRpcIpSet ipSet = mnodeGetIpSetFromVgroup(pMsg->pVgroup);
|
||||||
SRpcMsg rpcMsg = {
|
SRpcMsg rpcMsg = {
|
||||||
.handle = pMsg,
|
.ahandle = pMsg,
|
||||||
.pCont = pMDCreate,
|
.pCont = pMDCreate,
|
||||||
.contLen = htonl(pMDCreate->contLen),
|
.contLen = htonl(pMDCreate->contLen),
|
||||||
.code = 0,
|
.code = 0,
|
||||||
|
@ -1751,7 +1751,7 @@ static int32_t mnodeProcessDropChildTableMsg(SMnodeMsg *pMsg) {
|
||||||
|
|
||||||
mInfo("app:%p:%p, table:%s, send drop ctable msg", pMsg->rpcMsg.ahandle, pMsg, pDrop->tableId);
|
mInfo("app:%p:%p, table:%s, send drop ctable msg", pMsg->rpcMsg.ahandle, pMsg, pDrop->tableId);
|
||||||
SRpcMsg rpcMsg = {
|
SRpcMsg rpcMsg = {
|
||||||
.handle = pMsg,
|
.ahandle = pMsg,
|
||||||
.pCont = pDrop,
|
.pCont = pDrop,
|
||||||
.contLen = sizeof(SMDDropTableMsg),
|
.contLen = sizeof(SMDDropTableMsg),
|
||||||
.code = 0,
|
.code = 0,
|
||||||
|
@ -1799,7 +1799,7 @@ static int32_t mnodeAlterNormalTableColumnCb(SMnodeMsg *pMsg, int32_t code) {
|
||||||
|
|
||||||
SRpcIpSet ipSet = mnodeGetIpSetFromVgroup(pMsg->pVgroup);
|
SRpcIpSet ipSet = mnodeGetIpSetFromVgroup(pMsg->pVgroup);
|
||||||
SRpcMsg rpcMsg = {
|
SRpcMsg rpcMsg = {
|
||||||
.handle = pMsg,
|
.ahandle = pMsg,
|
||||||
.pCont = pMDCreate,
|
.pCont = pMDCreate,
|
||||||
.contLen = htonl(pMDCreate->contLen),
|
.contLen = htonl(pMDCreate->contLen),
|
||||||
.code = 0,
|
.code = 0,
|
||||||
|
@ -2144,9 +2144,9 @@ static int32_t mnodeProcessTableCfgMsg(SMnodeMsg *pMsg) {
|
||||||
|
|
||||||
// handle drop child response
|
// handle drop child response
|
||||||
static void mnodeProcessDropChildTableRsp(SRpcMsg *rpcMsg) {
|
static void mnodeProcessDropChildTableRsp(SRpcMsg *rpcMsg) {
|
||||||
if (rpcMsg->handle == NULL) return;
|
if (rpcMsg->ahandle == NULL) return;
|
||||||
|
|
||||||
SMnodeMsg *mnodeMsg = rpcMsg->handle;
|
SMnodeMsg *mnodeMsg = rpcMsg->ahandle;
|
||||||
mnodeMsg->received++;
|
mnodeMsg->received++;
|
||||||
|
|
||||||
SChildTableObj *pTable = (SChildTableObj *)mnodeMsg->pTable;
|
SChildTableObj *pTable = (SChildTableObj *)mnodeMsg->pTable;
|
||||||
|
@ -2195,9 +2195,9 @@ static void mnodeProcessDropChildTableRsp(SRpcMsg *rpcMsg) {
|
||||||
* if failed, drop the table cached
|
* if failed, drop the table cached
|
||||||
*/
|
*/
|
||||||
static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg) {
|
static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg) {
|
||||||
if (rpcMsg->handle == NULL) return;
|
if (rpcMsg->ahandle == NULL) return;
|
||||||
|
|
||||||
SMnodeMsg *mnodeMsg = rpcMsg->handle;
|
SMnodeMsg *mnodeMsg = rpcMsg->ahandle;
|
||||||
mnodeMsg->received++;
|
mnodeMsg->received++;
|
||||||
|
|
||||||
SChildTableObj *pTable = (SChildTableObj *)mnodeMsg->pTable;
|
SChildTableObj *pTable = (SChildTableObj *)mnodeMsg->pTable;
|
||||||
|
@ -2238,9 +2238,9 @@ static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void mnodeProcessAlterTableRsp(SRpcMsg *rpcMsg) {
|
static void mnodeProcessAlterTableRsp(SRpcMsg *rpcMsg) {
|
||||||
if (rpcMsg->handle == NULL) return;
|
if (rpcMsg->ahandle == NULL) return;
|
||||||
|
|
||||||
SMnodeMsg *mnodeMsg = rpcMsg->handle;
|
SMnodeMsg *mnodeMsg = rpcMsg->ahandle;
|
||||||
mnodeMsg->received++;
|
mnodeMsg->received++;
|
||||||
|
|
||||||
SChildTableObj *pTable = (SChildTableObj *)mnodeMsg->pTable;
|
SChildTableObj *pTable = (SChildTableObj *)mnodeMsg->pTable;
|
||||||
|
|
|
@ -652,7 +652,7 @@ SRpcIpSet mnodeGetIpSetFromIp(char *ep) {
|
||||||
void mnodeSendCreateVnodeMsg(SVgObj *pVgroup, SRpcIpSet *ipSet, void *ahandle) {
|
void mnodeSendCreateVnodeMsg(SVgObj *pVgroup, SRpcIpSet *ipSet, void *ahandle) {
|
||||||
SMDCreateVnodeMsg *pCreate = mnodeBuildCreateVnodeMsg(pVgroup);
|
SMDCreateVnodeMsg *pCreate = mnodeBuildCreateVnodeMsg(pVgroup);
|
||||||
SRpcMsg rpcMsg = {
|
SRpcMsg rpcMsg = {
|
||||||
.handle = ahandle,
|
.ahandle = ahandle,
|
||||||
.pCont = pCreate,
|
.pCont = pCreate,
|
||||||
.contLen = pCreate ? sizeof(SMDCreateVnodeMsg) : 0,
|
.contLen = pCreate ? sizeof(SMDCreateVnodeMsg) : 0,
|
||||||
.code = 0,
|
.code = 0,
|
||||||
|
@ -673,9 +673,9 @@ void mnodeSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void mnodeProcessCreateVnodeRsp(SRpcMsg *rpcMsg) {
|
static void mnodeProcessCreateVnodeRsp(SRpcMsg *rpcMsg) {
|
||||||
if (rpcMsg->handle == NULL) return;
|
if (rpcMsg->ahandle == NULL) return;
|
||||||
|
|
||||||
SMnodeMsg *mnodeMsg = rpcMsg->handle;
|
SMnodeMsg *mnodeMsg = rpcMsg->ahandle;
|
||||||
mnodeMsg->received++;
|
mnodeMsg->received++;
|
||||||
if (rpcMsg->code == TSDB_CODE_SUCCESS) {
|
if (rpcMsg->code == TSDB_CODE_SUCCESS) {
|
||||||
mnodeMsg->successed++;
|
mnodeMsg->successed++;
|
||||||
|
@ -686,7 +686,7 @@ static void mnodeProcessCreateVnodeRsp(SRpcMsg *rpcMsg) {
|
||||||
SVgObj *pVgroup = mnodeMsg->pVgroup;
|
SVgObj *pVgroup = mnodeMsg->pVgroup;
|
||||||
mDebug("vgId:%d, create vnode rsp received, result:%s received:%d successed:%d expected:%d, thandle:%p ahandle:%p",
|
mDebug("vgId:%d, create vnode rsp received, result:%s received:%d successed:%d expected:%d, thandle:%p ahandle:%p",
|
||||||
pVgroup->vgId, tstrerror(rpcMsg->code), mnodeMsg->received, mnodeMsg->successed, mnodeMsg->expected,
|
pVgroup->vgId, tstrerror(rpcMsg->code), mnodeMsg->received, mnodeMsg->successed, mnodeMsg->expected,
|
||||||
mnodeMsg->rpcMsg.handle, rpcMsg->handle);
|
mnodeMsg->rpcMsg.handle, rpcMsg->ahandle);
|
||||||
|
|
||||||
if (mnodeMsg->received != mnodeMsg->expected) return;
|
if (mnodeMsg->received != mnodeMsg->expected) return;
|
||||||
|
|
||||||
|
@ -718,7 +718,7 @@ static SMDDropVnodeMsg *mnodeBuildDropVnodeMsg(int32_t vgId) {
|
||||||
void mnodeSendDropVnodeMsg(int32_t vgId, SRpcIpSet *ipSet, void *ahandle) {
|
void mnodeSendDropVnodeMsg(int32_t vgId, SRpcIpSet *ipSet, void *ahandle) {
|
||||||
SMDDropVnodeMsg *pDrop = mnodeBuildDropVnodeMsg(vgId);
|
SMDDropVnodeMsg *pDrop = mnodeBuildDropVnodeMsg(vgId);
|
||||||
SRpcMsg rpcMsg = {
|
SRpcMsg rpcMsg = {
|
||||||
.handle = ahandle,
|
.ahandle = ahandle,
|
||||||
.pCont = pDrop,
|
.pCont = pDrop,
|
||||||
.contLen = pDrop ? sizeof(SMDDropVnodeMsg) : 0,
|
.contLen = pDrop ? sizeof(SMDDropVnodeMsg) : 0,
|
||||||
.code = 0,
|
.code = 0,
|
||||||
|
@ -737,10 +737,10 @@ static void mnodeSendDropVgroupMsg(SVgObj *pVgroup, void *ahandle) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void mnodeProcessDropVnodeRsp(SRpcMsg *rpcMsg) {
|
static void mnodeProcessDropVnodeRsp(SRpcMsg *rpcMsg) {
|
||||||
mDebug("drop vnode rsp is received, handle:%p", rpcMsg->handle);
|
mDebug("drop vnode rsp is received, handle:%p", rpcMsg->ahandle);
|
||||||
if (rpcMsg->handle == NULL) return;
|
if (rpcMsg->ahandle == NULL) return;
|
||||||
|
|
||||||
SMnodeMsg *mnodeMsg = rpcMsg->handle;
|
SMnodeMsg *mnodeMsg = rpcMsg->ahandle;
|
||||||
mnodeMsg->received++;
|
mnodeMsg->received++;
|
||||||
if (rpcMsg->code == TSDB_CODE_SUCCESS) {
|
if (rpcMsg->code == TSDB_CODE_SUCCESS) {
|
||||||
mnodeMsg->code = rpcMsg->code;
|
mnodeMsg->code = rpcMsg->code;
|
||||||
|
@ -750,7 +750,7 @@ static void mnodeProcessDropVnodeRsp(SRpcMsg *rpcMsg) {
|
||||||
SVgObj *pVgroup = mnodeMsg->pVgroup;
|
SVgObj *pVgroup = mnodeMsg->pVgroup;
|
||||||
mDebug("vgId:%d, drop vnode rsp received, result:%s received:%d successed:%d expected:%d, thandle:%p ahandle:%p",
|
mDebug("vgId:%d, drop vnode rsp received, result:%s received:%d successed:%d expected:%d, thandle:%p ahandle:%p",
|
||||||
pVgroup->vgId, tstrerror(rpcMsg->code), mnodeMsg->received, mnodeMsg->successed, mnodeMsg->expected,
|
pVgroup->vgId, tstrerror(rpcMsg->code), mnodeMsg->received, mnodeMsg->successed, mnodeMsg->expected,
|
||||||
mnodeMsg->rpcMsg.handle, rpcMsg->handle);
|
mnodeMsg->rpcMsg.handle, rpcMsg->ahandle);
|
||||||
|
|
||||||
if (mnodeMsg->received != mnodeMsg->expected) return;
|
if (mnodeMsg->received != mnodeMsg->expected) return;
|
||||||
|
|
||||||
|
|
|
@ -236,6 +236,9 @@ void taosSetCoreDump();
|
||||||
|
|
||||||
void taosBlockSIGPIPE();
|
void taosBlockSIGPIPE();
|
||||||
|
|
||||||
|
int tSystem(const char * cmd) ;
|
||||||
|
|
||||||
|
|
||||||
#ifdef _ALPINE
|
#ifdef _ALPINE
|
||||||
typedef int(*__compar_fn_t)(const void *, const void *);
|
typedef int(*__compar_fn_t)(const void *, const void *);
|
||||||
void error (int, int, const char *);
|
void error (int, int, const char *);
|
||||||
|
|
|
@ -241,3 +241,32 @@ void taosBlockSIGPIPE() {
|
||||||
uError("failed to block SIGPIPE");
|
uError("failed to block SIGPIPE");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int tSystem(const char * cmd)
|
||||||
|
{
|
||||||
|
FILE * fp;
|
||||||
|
int res;
|
||||||
|
char buf[1024];
|
||||||
|
if (cmd == NULL) {
|
||||||
|
uError("tSystem cmd is NULL!\n");
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if ((fp = popen(cmd, "r") ) == NULL) {
|
||||||
|
uError("popen cmd:%s error: %s/n", cmd, strerror(errno));
|
||||||
|
return -1;
|
||||||
|
} else {
|
||||||
|
while(fgets(buf, sizeof(buf), fp)) {
|
||||||
|
uDebug("popen result:%s", buf);
|
||||||
|
}
|
||||||
|
|
||||||
|
if ((res = pclose(fp)) == -1) {
|
||||||
|
uError("close popen file pointer fp error!\n");
|
||||||
|
} else {
|
||||||
|
uDebug("popen res is :%d\n", res);
|
||||||
|
}
|
||||||
|
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
|
@ -354,13 +354,13 @@ void *rpcReallocCont(void *ptr, int contLen) {
|
||||||
return start + sizeof(SRpcReqContext) + sizeof(SRpcHead);
|
return start + sizeof(SRpcReqContext) + sizeof(SRpcHead);
|
||||||
}
|
}
|
||||||
|
|
||||||
void *rpcSendRequest(void *shandle, const SRpcIpSet *pIpSet, const SRpcMsg *pMsg) {
|
void rpcSendRequest(void *shandle, const SRpcIpSet *pIpSet, SRpcMsg *pMsg) {
|
||||||
SRpcInfo *pRpc = (SRpcInfo *)shandle;
|
SRpcInfo *pRpc = (SRpcInfo *)shandle;
|
||||||
SRpcReqContext *pContext;
|
SRpcReqContext *pContext;
|
||||||
|
|
||||||
int contLen = rpcCompressRpcMsg(pMsg->pCont, pMsg->contLen);
|
int contLen = rpcCompressRpcMsg(pMsg->pCont, pMsg->contLen);
|
||||||
pContext = (SRpcReqContext *) (pMsg->pCont-sizeof(SRpcHead)-sizeof(SRpcReqContext));
|
pContext = (SRpcReqContext *) (pMsg->pCont-sizeof(SRpcHead)-sizeof(SRpcReqContext));
|
||||||
pContext->ahandle = pMsg->handle;
|
pContext->ahandle = pMsg->ahandle;
|
||||||
pContext->pRpc = (SRpcInfo *)shandle;
|
pContext->pRpc = (SRpcInfo *)shandle;
|
||||||
pContext->ipSet = *pIpSet;
|
pContext->ipSet = *pIpSet;
|
||||||
pContext->contLen = contLen;
|
pContext->contLen = contLen;
|
||||||
|
@ -380,9 +380,12 @@ void *rpcSendRequest(void *shandle, const SRpcIpSet *pIpSet, const SRpcMsg *pMsg
|
||||||
|| type == TSDB_MSG_TYPE_CM_SHOW )
|
|| type == TSDB_MSG_TYPE_CM_SHOW )
|
||||||
pContext->connType = RPC_CONN_TCPC;
|
pContext->connType = RPC_CONN_TCPC;
|
||||||
|
|
||||||
|
// set the handle to pContext, so app can cancel the request
|
||||||
|
if (pMsg->handle) *((void **)pMsg->handle) = pContext;
|
||||||
|
|
||||||
rpcSendReqToServer(pRpc, pContext);
|
rpcSendReqToServer(pRpc, pContext);
|
||||||
|
|
||||||
return pContext;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
void rpcSendResponse(const SRpcMsg *pRsp) {
|
void rpcSendResponse(const SRpcMsg *pRsp) {
|
||||||
|
@ -483,7 +486,7 @@ int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void rpcSendRecv(void *shandle, SRpcIpSet *pIpSet, const SRpcMsg *pMsg, SRpcMsg *pRsp) {
|
void rpcSendRecv(void *shandle, SRpcIpSet *pIpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) {
|
||||||
SRpcReqContext *pContext;
|
SRpcReqContext *pContext;
|
||||||
pContext = (SRpcReqContext *) (pMsg->pCont-sizeof(SRpcHead)-sizeof(SRpcReqContext));
|
pContext = (SRpcReqContext *) (pMsg->pCont-sizeof(SRpcHead)-sizeof(SRpcReqContext));
|
||||||
|
|
||||||
|
@ -1051,7 +1054,7 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) {
|
||||||
} else {
|
} else {
|
||||||
// it's a response
|
// it's a response
|
||||||
SRpcReqContext *pContext = pConn->pContext;
|
SRpcReqContext *pContext = pConn->pContext;
|
||||||
rpcMsg.handle = pContext->ahandle;
|
rpcMsg.handle = pContext;
|
||||||
pConn->pContext = NULL;
|
pConn->pContext = NULL;
|
||||||
|
|
||||||
// for UDP, port may be changed by server, the port in ipSet shall be used for cache
|
// for UDP, port may be changed by server, the port in ipSet shall be used for cache
|
||||||
|
@ -1255,7 +1258,7 @@ static void rpcProcessConnError(void *param, void *id) {
|
||||||
|
|
||||||
if (pContext->numOfTry >= pContext->ipSet.numOfIps) {
|
if (pContext->numOfTry >= pContext->ipSet.numOfIps) {
|
||||||
rpcMsg.msgType = pContext->msgType+1;
|
rpcMsg.msgType = pContext->msgType+1;
|
||||||
rpcMsg.handle = pContext->ahandle;
|
rpcMsg.ahandle = pContext->ahandle;
|
||||||
rpcMsg.code = pContext->code;
|
rpcMsg.code = pContext->code;
|
||||||
rpcMsg.pCont = NULL;
|
rpcMsg.pCont = NULL;
|
||||||
rpcMsg.contLen = 0;
|
rpcMsg.contLen = 0;
|
||||||
|
|
|
@ -33,7 +33,7 @@ typedef struct {
|
||||||
} SInfo;
|
} SInfo;
|
||||||
|
|
||||||
static void processResponse(SRpcMsg *pMsg, SRpcIpSet *pIpSet) {
|
static void processResponse(SRpcMsg *pMsg, SRpcIpSet *pIpSet) {
|
||||||
SInfo *pInfo = (SInfo *)pMsg->handle;
|
SInfo *pInfo = (SInfo *)pMsg->ahandle;
|
||||||
tDebug("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen, pMsg->code);
|
tDebug("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen, pMsg->code);
|
||||||
|
|
||||||
if (pIpSet) pInfo->ipSet = *pIpSet;
|
if (pIpSet) pInfo->ipSet = *pIpSet;
|
||||||
|
@ -46,7 +46,7 @@ static int tcount = 0;
|
||||||
|
|
||||||
static void *sendRequest(void *param) {
|
static void *sendRequest(void *param) {
|
||||||
SInfo *pInfo = (SInfo *)param;
|
SInfo *pInfo = (SInfo *)param;
|
||||||
SRpcMsg rpcMsg;
|
SRpcMsg rpcMsg = {0};
|
||||||
|
|
||||||
tDebug("thread:%d, start to send request", pInfo->index);
|
tDebug("thread:%d, start to send request", pInfo->index);
|
||||||
|
|
||||||
|
@ -54,7 +54,7 @@ static void *sendRequest(void *param) {
|
||||||
pInfo->num++;
|
pInfo->num++;
|
||||||
rpcMsg.pCont = rpcMallocCont(pInfo->msgSize);
|
rpcMsg.pCont = rpcMallocCont(pInfo->msgSize);
|
||||||
rpcMsg.contLen = pInfo->msgSize;
|
rpcMsg.contLen = pInfo->msgSize;
|
||||||
rpcMsg.handle = pInfo;
|
rpcMsg.ahandle = pInfo;
|
||||||
rpcMsg.msgType = 1;
|
rpcMsg.msgType = 1;
|
||||||
tDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num);
|
tDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num);
|
||||||
rpcSendRequest(pInfo->pRpc, &pInfo->ipSet, &rpcMsg);
|
rpcSendRequest(pInfo->pRpc, &pInfo->ipSet, &rpcMsg);
|
||||||
|
|
|
@ -184,6 +184,7 @@ uint32_t ip2uint(const char *const ip_addr);
|
||||||
|
|
||||||
void taosRemoveDir(char *rootDir);
|
void taosRemoveDir(char *rootDir);
|
||||||
int tmkdir(const char *pathname, mode_t mode);
|
int tmkdir(const char *pathname, mode_t mode);
|
||||||
|
void taosMvDir(char* destDir, char *srcDir);
|
||||||
|
|
||||||
#define TAOS_ALLOC_MODE_DEFAULT 0
|
#define TAOS_ALLOC_MODE_DEFAULT 0
|
||||||
#define TAOS_ALLOC_MODE_RANDOM_FAIL 1
|
#define TAOS_ALLOC_MODE_RANDOM_FAIL 1
|
||||||
|
|
|
@ -799,3 +799,13 @@ int tmkdir(const char *path, mode_t mode) {
|
||||||
if (code < 0 && errno == EEXIST) code = 0;
|
if (code < 0 && errno == EEXIST) code = 0;
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void taosMvDir(char* destDir, char *srcDir) {
|
||||||
|
char shellCmd[1024+1] = {0};
|
||||||
|
|
||||||
|
//(void)snprintf(shellCmd, 1024, "cp -rf %s %s", srcDir, destDir);
|
||||||
|
(void)snprintf(shellCmd, 1024, "mv %s %s", srcDir, destDir);
|
||||||
|
tSystem(shellCmd);
|
||||||
|
uInfo("shell cmd:%s is executed", shellCmd);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
|
@ -347,6 +347,7 @@ void vnodeRelease(void *pVnodeRaw) {
|
||||||
if (pVnode->status == TAOS_VN_STATUS_DELETING) {
|
if (pVnode->status == TAOS_VN_STATUS_DELETING) {
|
||||||
char rootDir[TSDB_FILENAME_LEN] = {0};
|
char rootDir[TSDB_FILENAME_LEN] = {0};
|
||||||
sprintf(rootDir, "%s/vnode%d", tsVnodeDir, vgId);
|
sprintf(rootDir, "%s/vnode%d", tsVnodeDir, vgId);
|
||||||
|
taosMvDir(tsVnodeBakDir, rootDir);
|
||||||
taosRemoveDir(rootDir);
|
taosRemoveDir(rootDir);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue