Merge branch '2.0' into liaohj_2
This commit is contained in:
commit
b8fc28fb95
|
@ -380,7 +380,7 @@ int tsParseSql(SSqlObj *pSql, bool multiVnodeInsertion);
|
||||||
void tscInitMsgs();
|
void tscInitMsgs();
|
||||||
extern int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo);
|
extern int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo);
|
||||||
|
|
||||||
void tscProcessMsgFromServer(char type, void *pCont, int contLen, void *ahandle, int32_t code);
|
void tscProcessMsgFromServer(SRpcMsg *rpcMsg);
|
||||||
int tscProcessSql(SSqlObj *pSql);
|
int tscProcessSql(SSqlObj *pSql);
|
||||||
|
|
||||||
void tscAsyncInsertMultiVnodesProxy(void *param, TAOS_RES *tres, int numOfRows);
|
void tscAsyncInsertMultiVnodesProxy(void *param, TAOS_RES *tres, int numOfRows);
|
||||||
|
|
|
@ -324,7 +324,6 @@ void tscProcessFetchRow(SSchedMsg *pMsg) {
|
||||||
|
|
||||||
void tscProcessAsyncRes(SSchedMsg *pMsg) {
|
void tscProcessAsyncRes(SSchedMsg *pMsg) {
|
||||||
SSqlObj *pSql = (SSqlObj *)pMsg->ahandle;
|
SSqlObj *pSql = (SSqlObj *)pMsg->ahandle;
|
||||||
STscObj *pTscObj = pSql->pTscObj;
|
|
||||||
SSqlCmd *pCmd = &pSql->cmd;
|
SSqlCmd *pCmd = &pSql->cmd;
|
||||||
SSqlRes *pRes = &pSql->res;
|
SSqlRes *pRes = &pSql->res;
|
||||||
|
|
||||||
|
|
|
@ -98,6 +98,7 @@ void tscSetMgmtIpList(SRpcIpSet *pIpList) {
|
||||||
* The retry will not be executed since only *two* retry is allowed in case of single management node in the cluster.
|
* The retry will not be executed since only *two* retry is allowed in case of single management node in the cluster.
|
||||||
* Therefore, we need to multiply the retry times by factor of 2 to fix this problem.
|
* Therefore, we need to multiply the retry times by factor of 2 to fix this problem.
|
||||||
*/
|
*/
|
||||||
|
UNUSED_FUNC
|
||||||
static int32_t tscGetMgmtConnMaxRetryTimes() {
|
static int32_t tscGetMgmtConnMaxRetryTimes() {
|
||||||
int32_t factor = 2;
|
int32_t factor = 2;
|
||||||
return tscMgmtIpList.numOfIps * factor;
|
return tscMgmtIpList.numOfIps * factor;
|
||||||
|
@ -188,23 +189,34 @@ int tscSendMsgToServer(SSqlObj *pSql) {
|
||||||
tscPrint("%p msg:%s is sent to server %d", pSql, taosMsg[pSql->cmd.msgType], pSql->ipList->port);
|
tscPrint("%p msg:%s is sent to server %d", pSql, taosMsg[pSql->cmd.msgType], pSql->ipList->port);
|
||||||
memcpy(pMsg, pSql->cmd.payload + tsRpcHeadSize, pSql->cmd.payloadLen);
|
memcpy(pMsg, pSql->cmd.payload + tsRpcHeadSize, pSql->cmd.payloadLen);
|
||||||
|
|
||||||
SRpcMsg msg = {.msgType = pCmd->msgType, .contLen = pCmd->payloadLen, .pCont = pMsg, .handle = pSql};
|
SRpcMsg rpcMsg = {
|
||||||
rpcSendRequest(pVnodeConn, pSql->ipList, &msg);
|
.msgType = pSql->cmd.msgType,
|
||||||
|
.pCont = pMsg,
|
||||||
|
.contLen = pSql->cmd.payloadLen,
|
||||||
|
.handle = pSql,
|
||||||
|
.code = 0
|
||||||
|
};
|
||||||
|
rpcSendRequest(pVnodeConn, pSql->ipList, &rpcMsg);
|
||||||
} else {
|
} else {
|
||||||
pSql->ipList->port = tsMgmtShellPort;
|
pSql->ipList->port = tsMgmtShellPort;
|
||||||
tscPrint("%p msg:%s is sent to server %d", pSql, taosMsg[pSql->cmd.msgType], pSql->ipList->port);
|
tscPrint("%p msg:%s is sent to server %d", pSql, taosMsg[pSql->cmd.msgType], pSql->ipList->port);
|
||||||
memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen);
|
memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen);
|
||||||
|
SRpcMsg rpcMsg = {
|
||||||
SRpcMsg msg = {.msgType = pCmd->msgType, .contLen = pCmd->payloadLen, .pCont = pMsg, .handle = pSql};
|
.msgType = pSql->cmd.msgType,
|
||||||
rpcSendRequest(pTscMgmtConn, pSql->ipList, &msg);
|
.pCont = pMsg,
|
||||||
|
.contLen = pSql->cmd.payloadLen,
|
||||||
|
.handle = pSql,
|
||||||
|
.code = 0
|
||||||
|
};
|
||||||
|
rpcSendRequest(pTscMgmtConn, pSql->ipList, &rpcMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
void tscProcessMsgFromServer(char type, void *pCont, int contLen, void *ahandle, int32_t code) {
|
void tscProcessMsgFromServer(SRpcMsg *rpcMsg) {
|
||||||
tscPrint("response:%s is received, len:%d error:%s", taosMsg[(uint8_t)type], contLen, tstrerror(code));
|
tscPrint("response:%s is received, len:%d error:%s", taosMsg[rpcMsg->msgType], rpcMsg->contLen, tstrerror(rpcMsg->code));
|
||||||
SSqlObj *pSql = (SSqlObj *)ahandle;
|
SSqlObj *pSql = (SSqlObj *)rpcMsg->handle;
|
||||||
if (pSql == NULL || pSql->signature != pSql) {
|
if (pSql == NULL || pSql->signature != pSql) {
|
||||||
tscError("%p sql is already released, signature:%p", pSql, pSql->signature);
|
tscError("%p sql is already released, signature:%p", pSql, pSql->signature);
|
||||||
return;
|
return;
|
||||||
|
@ -213,24 +225,24 @@ void tscProcessMsgFromServer(char type, void *pCont, int contLen, void *ahandle,
|
||||||
SSqlRes *pRes = &pSql->res;
|
SSqlRes *pRes = &pSql->res;
|
||||||
SSqlCmd *pCmd = &pSql->cmd;
|
SSqlCmd *pCmd = &pSql->cmd;
|
||||||
STscObj *pObj = pSql->pTscObj;
|
STscObj *pObj = pSql->pTscObj;
|
||||||
tscTrace("%p msg:%p is received from server", pSql, pCont);
|
tscTrace("%p msg:%p is received from server", pSql, rpcMsg->pCont);
|
||||||
|
|
||||||
if (pSql->freed || pObj->signature != pObj) {
|
if (pSql->freed || pObj->signature != pObj) {
|
||||||
tscTrace("%p sql is already released or DB connection is closed, freed:%d pObj:%p signature:%p", pSql, pSql->freed,
|
tscTrace("%p sql is already released or DB connection is closed, freed:%d pObj:%p signature:%p", pSql, pSql->freed,
|
||||||
pObj, pObj->signature);
|
pObj, pObj->signature);
|
||||||
tscFreeSqlObj(pSql);
|
tscFreeSqlObj(pSql);
|
||||||
rpcFreeCont(pCont);
|
rpcFreeCont(rpcMsg->pCont);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pCont == NULL) {
|
if (rpcMsg->pCont == NULL) {
|
||||||
code = TSDB_CODE_NETWORK_UNAVAIL;
|
rpcMsg->code = TSDB_CODE_NETWORK_UNAVAIL;
|
||||||
} else {
|
} else {
|
||||||
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
|
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
|
||||||
if (code == TSDB_CODE_NOT_ACTIVE_TABLE || code == TSDB_CODE_INVALID_TABLE_ID ||
|
if (rpcMsg->code == TSDB_CODE_NOT_ACTIVE_TABLE || rpcMsg->code == TSDB_CODE_INVALID_TABLE_ID ||
|
||||||
code == TSDB_CODE_INVALID_VNODE_ID || code == TSDB_CODE_NOT_ACTIVE_VNODE ||
|
rpcMsg->code == TSDB_CODE_INVALID_VNODE_ID || rpcMsg->code == TSDB_CODE_NOT_ACTIVE_VNODE ||
|
||||||
code == TSDB_CODE_NETWORK_UNAVAIL || code == TSDB_CODE_NOT_ACTIVE_SESSION ||
|
rpcMsg->code == TSDB_CODE_NETWORK_UNAVAIL || rpcMsg->code == TSDB_CODE_NOT_ACTIVE_SESSION ||
|
||||||
code == TSDB_CODE_TABLE_ID_MISMATCH) {
|
rpcMsg->code == TSDB_CODE_TABLE_ID_MISMATCH) {
|
||||||
/*
|
/*
|
||||||
* not_active_table: 1. the virtual node may fail to create table, since the procedure of create table is asynchronized,
|
* not_active_table: 1. the virtual node may fail to create table, since the procedure of create table is asynchronized,
|
||||||
* the virtual node may have not create table till now, so try again by using the new metermeta.
|
* the virtual node may have not create table till now, so try again by using the new metermeta.
|
||||||
|
@ -242,24 +254,24 @@ void tscProcessMsgFromServer(char type, void *pCont, int contLen, void *ahandle,
|
||||||
* not_active_session: db has been move to other node, the vnode does not exist on this dnode anymore.
|
* not_active_session: db has been move to other node, the vnode does not exist on this dnode anymore.
|
||||||
*/
|
*/
|
||||||
if (pCmd->command == TSDB_SQL_CONNECT) {
|
if (pCmd->command == TSDB_SQL_CONNECT) {
|
||||||
code = TSDB_CODE_NETWORK_UNAVAIL;
|
rpcMsg->code = TSDB_CODE_NETWORK_UNAVAIL;
|
||||||
rpcFreeCont(pCont);
|
rpcFreeCont(rpcMsg->pCont);
|
||||||
return;
|
return;
|
||||||
} else if (pCmd->command == TSDB_SQL_HB) {
|
} else if (pCmd->command == TSDB_SQL_HB) {
|
||||||
code = TSDB_CODE_NOT_READY;
|
rpcMsg->code = TSDB_CODE_NOT_READY;
|
||||||
rpcFreeCont(pCont);
|
rpcFreeCont(rpcMsg->pCont);
|
||||||
return;
|
return;
|
||||||
} else {
|
} else {
|
||||||
tscTrace("%p it shall renew meter meta, code:%d", pSql, code);
|
tscTrace("%p it shall renew meter meta, code:%d", pSql, rpcMsg->code);
|
||||||
|
|
||||||
pSql->maxRetry = TSDB_VNODES_SUPPORT * 2;
|
pSql->maxRetry = TSDB_VNODES_SUPPORT * 2;
|
||||||
pSql->res.code = (uint8_t) code; // keep the previous error code
|
pSql->res.code = rpcMsg->code; // keep the previous error code
|
||||||
|
|
||||||
code = tscRenewMeterMeta(pSql, pMeterMetaInfo->name);
|
rpcMsg->code = tscRenewMeterMeta(pSql, pMeterMetaInfo->name);
|
||||||
|
|
||||||
if (pMeterMetaInfo->pMeterMeta) {
|
if (pMeterMetaInfo->pMeterMeta) {
|
||||||
tscSendMsgToServer(pSql);
|
tscSendMsgToServer(pSql);
|
||||||
rpcFreeCont(pCont);
|
rpcFreeCont(rpcMsg->pCont);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -272,16 +284,16 @@ void tscProcessMsgFromServer(char type, void *pCont, int contLen, void *ahandle,
|
||||||
|
|
||||||
pRes->rspLen = 0;
|
pRes->rspLen = 0;
|
||||||
if (pRes->code != TSDB_CODE_QUERY_CANCELLED) {
|
if (pRes->code != TSDB_CODE_QUERY_CANCELLED) {
|
||||||
pRes->code = (code != TSDB_CODE_SUCCESS) ? code : TSDB_CODE_NETWORK_UNAVAIL;
|
pRes->code = (rpcMsg->code != TSDB_CODE_SUCCESS) ? rpcMsg->code : TSDB_CODE_NETWORK_UNAVAIL;
|
||||||
} else {
|
} else {
|
||||||
tscTrace("%p query is cancelled, code:%d", pSql, pRes->code);
|
tscTrace("%p query is cancelled, code:%d", pSql, pRes->code);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pRes->code != TSDB_CODE_QUERY_CANCELLED) {
|
if (pRes->code != TSDB_CODE_QUERY_CANCELLED) {
|
||||||
assert(type == pCmd->msgType + 1);
|
assert(rpcMsg->msgType == pCmd->msgType + 1);
|
||||||
pRes->code = (int32_t)code;
|
pRes->code = (int32_t)rpcMsg->code;
|
||||||
pRes->rspType = type;
|
pRes->rspType = rpcMsg->msgType;
|
||||||
pRes->rspLen = contLen;
|
pRes->rspLen = rpcMsg->contLen;
|
||||||
|
|
||||||
char *tmp = (char *)realloc(pRes->pRsp, pRes->rspLen);
|
char *tmp = (char *)realloc(pRes->pRsp, pRes->rspLen);
|
||||||
if (tmp == NULL) {
|
if (tmp == NULL) {
|
||||||
|
@ -289,7 +301,7 @@ void tscProcessMsgFromServer(char type, void *pCont, int contLen, void *ahandle,
|
||||||
} else {
|
} else {
|
||||||
pRes->pRsp = tmp;
|
pRes->pRsp = tmp;
|
||||||
if (pRes->rspLen) {
|
if (pRes->rspLen) {
|
||||||
memcpy(pRes->pRsp, pCont, pRes->rspLen);
|
memcpy(pRes->pRsp, rpcMsg->pCont, pRes->rspLen);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -302,8 +314,8 @@ void tscProcessMsgFromServer(char type, void *pCont, int contLen, void *ahandle,
|
||||||
* There is not response callback function for submit response.
|
* There is not response callback function for submit response.
|
||||||
* The actual inserted number of points is the first number.
|
* The actual inserted number of points is the first number.
|
||||||
*/
|
*/
|
||||||
if (type == TSDB_MSG_TYPE_SUBMIT_RSP) {
|
if (rpcMsg->msgType == TSDB_MSG_TYPE_SUBMIT_RSP) {
|
||||||
SShellSubmitRspMsg *pMsg = pRes->pRsp;
|
SShellSubmitRspMsg *pMsg = (SShellSubmitRspMsg*)pRes->pRsp;
|
||||||
pMsg->code = htonl(pMsg->code);
|
pMsg->code = htonl(pMsg->code);
|
||||||
pMsg->numOfRows = htonl(pMsg->numOfRows);
|
pMsg->numOfRows = htonl(pMsg->numOfRows);
|
||||||
pMsg->affectedRows = htonl(pMsg->affectedRows);
|
pMsg->affectedRows = htonl(pMsg->affectedRows);
|
||||||
|
@ -322,14 +334,14 @@ void tscProcessMsgFromServer(char type, void *pCont, int contLen, void *ahandle,
|
||||||
tsem_post(&pSql->rspSem);
|
tsem_post(&pSql->rspSem);
|
||||||
} else {
|
} else {
|
||||||
if (pRes->code == TSDB_CODE_SUCCESS && tscProcessMsgRsp[pCmd->command])
|
if (pRes->code == TSDB_CODE_SUCCESS && tscProcessMsgRsp[pCmd->command])
|
||||||
code = (*tscProcessMsgRsp[pCmd->command])(pSql);
|
rpcMsg->code = (*tscProcessMsgRsp[pCmd->command])(pSql);
|
||||||
|
|
||||||
if (code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
if (rpcMsg->code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
||||||
int command = pCmd->command;
|
int command = pCmd->command;
|
||||||
void *taosres = tscKeepConn[command] ? pSql : NULL;
|
void *taosres = tscKeepConn[command] ? pSql : NULL;
|
||||||
code = pRes->code ? -pRes->code : pRes->numOfRows;
|
rpcMsg->code = pRes->code ? -pRes->code : pRes->numOfRows;
|
||||||
|
|
||||||
tscTrace("%p Async SQL result:%d res:%p", pSql, code, taosres);
|
tscTrace("%p Async SQL result:%d res:%p", pSql, rpcMsg->code, taosres);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Whether to free sqlObj or not should be decided before call the user defined function, since this SqlObj
|
* Whether to free sqlObj or not should be decided before call the user defined function, since this SqlObj
|
||||||
|
@ -341,9 +353,9 @@ void tscProcessMsgFromServer(char type, void *pCont, int contLen, void *ahandle,
|
||||||
*/
|
*/
|
||||||
bool shouldFree = tscShouldFreeAsyncSqlObj(pSql);
|
bool shouldFree = tscShouldFreeAsyncSqlObj(pSql);
|
||||||
if (command == TSDB_SQL_INSERT) { // handle multi-vnode insertion situation
|
if (command == TSDB_SQL_INSERT) { // handle multi-vnode insertion situation
|
||||||
(*pSql->fp)(pSql, taosres, code);
|
(*pSql->fp)(pSql, taosres, rpcMsg->code);
|
||||||
} else {
|
} else {
|
||||||
(*pSql->fp)(pSql->param, taosres, code);
|
(*pSql->fp)(pSql->param, taosres, rpcMsg->code);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (shouldFree) {
|
if (shouldFree) {
|
||||||
|
@ -359,7 +371,7 @@ void tscProcessMsgFromServer(char type, void *pCont, int contLen, void *ahandle,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
rpcFreeCont(pCont);
|
rpcFreeCont(rpcMsg->pCont);
|
||||||
}
|
}
|
||||||
|
|
||||||
static SSqlObj *tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj);
|
static SSqlObj *tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj);
|
||||||
|
@ -1212,7 +1224,7 @@ int tscBuildRetrieveMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
||||||
pStart = pSql->cmd.payload + tsRpcHeadSize;
|
pStart = pSql->cmd.payload + tsRpcHeadSize;
|
||||||
pMsg = pStart;
|
pMsg = pStart;
|
||||||
|
|
||||||
SRetrieveTableMsg *pRetrieveMsg = (SShellSubmitMsg *)pMsg;
|
SRetrieveTableMsg *pRetrieveMsg = (SRetrieveTableMsg *)pMsg;
|
||||||
pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle);
|
pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle);
|
||||||
pMsg += sizeof(pSql->res.qhandle);
|
pMsg += sizeof(pSql->res.qhandle);
|
||||||
|
|
||||||
|
@ -1227,13 +1239,13 @@ int tscBuildRetrieveMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void tscUpdateVnodeInSubmitMsg(SSqlObj *pSql, char *buf) {
|
void tscUpdateVnodeInSubmitMsg(SSqlObj *pSql, char *buf) {
|
||||||
SShellSubmitMsg *pShellMsg;
|
//SShellSubmitMsg *pShellMsg;
|
||||||
char * pMsg;
|
//char * pMsg;
|
||||||
SMeterMetaInfo * pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, pSql->cmd.clauseIndex, 0);
|
//SMeterMetaInfo * pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, pSql->cmd.clauseIndex, 0);
|
||||||
|
|
||||||
STableMeta *pMeterMeta = pMeterMetaInfo->pMeterMeta;
|
//STableMeta *pMeterMeta = pMeterMetaInfo->pMeterMeta;
|
||||||
|
|
||||||
pMsg = buf + tsRpcHeadSize;
|
//pMsg = buf + tsRpcHeadSize;
|
||||||
|
|
||||||
//TODO set iplist
|
//TODO set iplist
|
||||||
//pShellMsg = (SShellSubmitMsg *)pMsg;
|
//pShellMsg = (SShellSubmitMsg *)pMsg;
|
||||||
|
@ -2005,7 +2017,7 @@ int tscBuildCreateTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
||||||
memcpy(pMsg, &pInfo->pCreateTableInfo->usingInfo.tagdata, sizeof(STagData));
|
memcpy(pMsg, &pInfo->pCreateTableInfo->usingInfo.tagdata, sizeof(STagData));
|
||||||
pMsg += sizeof(STagData);
|
pMsg += sizeof(STagData);
|
||||||
} else { // create (super) table
|
} else { // create (super) table
|
||||||
pSchema = pCreateTableMsg->schema;
|
pSchema = (SSchema *)pCreateTableMsg->schema;
|
||||||
|
|
||||||
for (int i = 0; i < pCmd->numOfCols + pCmd->count; ++i) {
|
for (int i = 0; i < pCmd->numOfCols + pCmd->count; ++i) {
|
||||||
TAOS_FIELD *pField = tscFieldInfoGetField(pQueryInfo, i);
|
TAOS_FIELD *pField = tscFieldInfoGetField(pQueryInfo, i);
|
||||||
|
@ -2045,7 +2057,7 @@ int tscEstimateAlterTableMsgLength(SSqlCmd *pCmd) {
|
||||||
|
|
||||||
int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
||||||
SAlterTableMsg *pAlterTableMsg;
|
SAlterTableMsg *pAlterTableMsg;
|
||||||
char * pMsg, *pStart;
|
char * pMsg;
|
||||||
int msgLen = 0;
|
int msgLen = 0;
|
||||||
int size = 0;
|
int size = 0;
|
||||||
|
|
||||||
|
@ -2563,7 +2575,6 @@ int tscBuildHeartBeatMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
||||||
int tscProcessMeterMetaRsp(SSqlObj *pSql) {
|
int tscProcessMeterMetaRsp(SSqlObj *pSql) {
|
||||||
STableMeta *pMeta;
|
STableMeta *pMeta;
|
||||||
SSchema * pSchema;
|
SSchema * pSchema;
|
||||||
uint8_t ieType;
|
|
||||||
|
|
||||||
pMeta = (STableMeta *)pSql->res.pRsp;
|
pMeta = (STableMeta *)pSql->res.pRsp;
|
||||||
|
|
||||||
|
@ -2667,7 +2678,7 @@ int tscProcessMultiMeterMetaRsp(SSqlObj *pSql) {
|
||||||
|
|
||||||
for (i = 0; i < totalNum; i++) {
|
for (i = 0; i < totalNum; i++) {
|
||||||
SMultiTableMeta *pMultiMeta = (SMultiTableMeta *)rsp;
|
SMultiTableMeta *pMultiMeta = (SMultiTableMeta *)rsp;
|
||||||
STableMeta * pMeta = &pMultiMeta->metas;
|
STableMeta * pMeta = pMultiMeta->metas;
|
||||||
|
|
||||||
pMeta->sid = htonl(pMeta->sid);
|
pMeta->sid = htonl(pMeta->sid);
|
||||||
pMeta->sversion = htons(pMeta->sversion);
|
pMeta->sversion = htons(pMeta->sversion);
|
||||||
|
@ -3029,7 +3040,7 @@ int tscProcessAlterDbMsgRsp(SSqlObj *pSql) {
|
||||||
int tscProcessQueryRsp(SSqlObj *pSql) {
|
int tscProcessQueryRsp(SSqlObj *pSql) {
|
||||||
SSqlRes *pRes = &pSql->res;
|
SSqlRes *pRes = &pSql->res;
|
||||||
|
|
||||||
SQueryTableRsp *pQuery = (SRetrieveTableRsp *)pRes->pRsp;
|
SQueryTableRsp *pQuery = (SQueryTableRsp *)pRes->pRsp;
|
||||||
pQuery->qhandle = htobe64(pQuery->qhandle);
|
pQuery->qhandle = htobe64(pQuery->qhandle);
|
||||||
pRes->qhandle = pQuery->qhandle;
|
pRes->qhandle = pQuery->qhandle;
|
||||||
|
|
||||||
|
@ -3041,7 +3052,6 @@ int tscProcessQueryRsp(SSqlObj *pSql) {
|
||||||
int tscProcessRetrieveRspFromVnode(SSqlObj *pSql) {
|
int tscProcessRetrieveRspFromVnode(SSqlObj *pSql) {
|
||||||
SSqlRes *pRes = &pSql->res;
|
SSqlRes *pRes = &pSql->res;
|
||||||
SSqlCmd *pCmd = &pSql->cmd;
|
SSqlCmd *pCmd = &pSql->cmd;
|
||||||
STscObj *pObj = pSql->pTscObj;
|
|
||||||
|
|
||||||
SRetrieveTableRsp *pRetrieve = (SRetrieveTableRsp *)pRes->pRsp;
|
SRetrieveTableRsp *pRetrieve = (SRetrieveTableRsp *)pRes->pRsp;
|
||||||
|
|
||||||
|
|
|
@ -72,7 +72,7 @@ int32_t tscInitRpc(const char *user, const char *secret) {
|
||||||
rpcInit.cfp = tscProcessMsgFromServer;
|
rpcInit.cfp = tscProcessMsgFromServer;
|
||||||
rpcInit.sessions = tsMaxVnodeConnections;
|
rpcInit.sessions = tsMaxVnodeConnections;
|
||||||
rpcInit.connType = TAOS_CONN_CLIENT;
|
rpcInit.connType = TAOS_CONN_CLIENT;
|
||||||
rpcInit.user = user;
|
rpcInit.user = (char*)user;
|
||||||
rpcInit.ckey = "key";
|
rpcInit.ckey = "key";
|
||||||
rpcInit.secret = secretEncrypt;
|
rpcInit.secret = secretEncrypt;
|
||||||
|
|
||||||
|
|
|
@ -11,7 +11,7 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
|
||||||
AUX_SOURCE_DIRECTORY(src SRC)
|
AUX_SOURCE_DIRECTORY(src SRC)
|
||||||
|
|
||||||
ADD_EXECUTABLE(taosd ${SRC})
|
ADD_EXECUTABLE(taosd ${SRC})
|
||||||
TARGET_LINK_LIBRARIES(taosd mnode sdb taos_static monitor http tsdb)
|
TARGET_LINK_LIBRARIES(taosd mnode sdb taos_static monitor http)
|
||||||
|
|
||||||
#IF (TD_CLUSTER)
|
#IF (TD_CLUSTER)
|
||||||
# TARGET_LINK_LIBRARIES(taosd dcluster)
|
# TARGET_LINK_LIBRARIES(taosd dcluster)
|
||||||
|
@ -24,7 +24,7 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
|
||||||
COMMAND echo "make test directory"
|
COMMAND echo "make test directory"
|
||||||
DEPENDS taosd
|
DEPENDS taosd
|
||||||
COMMAND ${CMAKE_COMMAND} -E make_directory ${TD_TESTS_OUTPUT_DIR}/cfg/
|
COMMAND ${CMAKE_COMMAND} -E make_directory ${TD_TESTS_OUTPUT_DIR}/cfg/
|
||||||
COMMAND ${CMAKE_COMMAND} -E make_directory ${TD_TESTS_OUTPUT_DIR}/log/
|
COMMAND ${CMAKE_COMMAND} -E make_directoryF ${TD_TESTS_OUTPUT_DIR}/log/
|
||||||
COMMAND ${CMAKE_COMMAND} -E make_directory ${TD_TESTS_OUTPUT_DIR}/data/
|
COMMAND ${CMAKE_COMMAND} -E make_directory ${TD_TESTS_OUTPUT_DIR}/data/
|
||||||
COMMAND ${CMAKE_COMMAND} -E echo dataDir ${TD_TESTS_OUTPUT_DIR}/data > ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
|
COMMAND ${CMAKE_COMMAND} -E echo dataDir ${TD_TESTS_OUTPUT_DIR}/data > ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
|
||||||
COMMAND ${CMAKE_COMMAND} -E echo logDir ${TD_TESTS_OUTPUT_DIR}/log >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
|
COMMAND ${CMAKE_COMMAND} -E echo logDir ${TD_TESTS_OUTPUT_DIR}/log >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
|
||||||
|
|
|
@ -0,0 +1,30 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef TDENGINE_DNODE_MCLIENT_H
|
||||||
|
#define TDENGINE_DNODE_MCLIENT_H
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
extern "C" {
|
||||||
|
#endif
|
||||||
|
|
||||||
|
int32_t dnodeInitMClient();
|
||||||
|
void dnodeCleanupMClient();
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#endif
|
|
@ -20,18 +20,17 @@
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#include <stdint.h>
|
int dnodeInitMgmt();
|
||||||
#include <stdbool.h>
|
void dnodeCleanupMgmt();
|
||||||
|
void dnodeMgmt(SRpcMsg *);
|
||||||
|
|
||||||
int32_t dnodeInitMgmt();
|
void* dnodeGetVnode(int vgId);
|
||||||
void dnodeInitMgmtIp();
|
int dnodeGetVnodeStatus(void *);
|
||||||
|
void* dnodeGetVnodeRworker(void *);
|
||||||
void dnodeProcessMsgFromMgmt(char msgType, void *pCont, int32_t contLen, void *pConn, int32_t code);
|
void* dnodeGetVnodeWworker(void *);
|
||||||
void dnodeSendMsgToMnode(int8_t msgType, void *pCont, int32_t contLen);
|
void* dnodeGetVnodeWal(void *);
|
||||||
void dnodeSendRspToMnode(void *pConn, int8_t msgType, int32_t code, void *pCont, int32_t contLen);
|
void* dnodeGetVnodeTsdb(void *);
|
||||||
|
void dnodeReleaseVnode(void *);
|
||||||
void dnodeSendVnodeCfgMsg(int32_t vnode);
|
|
||||||
void dnodeSendTableCfgMsg(int32_t vnode, int32_t sid);
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,30 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef TDENGINE_DNODE_MNODE_H
|
||||||
|
#define TDENGINE_DNODE_MNODE_H
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
extern "C" {
|
||||||
|
#endif
|
||||||
|
|
||||||
|
int32_t dnodeInitMnode();
|
||||||
|
void dnodeCleanupMnode();
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#endif
|
|
@ -20,16 +20,10 @@
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#include <stdint.h>
|
|
||||||
#include <stdbool.h>
|
|
||||||
#include <pthread.h>
|
|
||||||
|
|
||||||
void dnodeAllocModules();
|
void dnodeAllocModules();
|
||||||
int32_t dnodeInitModules();
|
int32_t dnodeInitModules();
|
||||||
void dnodeCleanUpModules();
|
void dnodeCleanUpModules();
|
||||||
|
|
||||||
extern void (*dnodeStartModules)();
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -20,31 +20,12 @@
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#include <stdbool.h>
|
int dnodeInitRead();
|
||||||
#include <stdint.h>
|
void dnodeCleanupRead();
|
||||||
#include "taosdef.h"
|
void dnodeRead(SRpcMsg *);
|
||||||
#include "taosmsg.h"
|
void *dnodeAllocateReadWorker();
|
||||||
|
void dnodeFreeReadWorker(void *rqueue);
|
||||||
|
|
||||||
/*
|
|
||||||
* handle query message, and the result is returned by callback function
|
|
||||||
*/
|
|
||||||
void dnodeQueryData(SQueryTableMsg *pQuery, void *pConn, void (*callback)(int32_t code, void *pQInfo, void *pConn));
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Dispose retrieve msg, and the result will passed through callback function
|
|
||||||
*/
|
|
||||||
typedef void (*SDnodeRetrieveCallbackFp)(int32_t code, void *pQInfo, void *pConn);
|
|
||||||
void dnodeRetrieveData(SRetrieveTableMsg *pRetrieve, void *pConn, SDnodeRetrieveCallbackFp callbackFp);
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Fill retrieve result according to query info
|
|
||||||
*/
|
|
||||||
int32_t dnodeGetRetrieveData(void *pQInfo, SRetrieveTableRsp *pRetrieve);
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Get the size of retrieve result according to query info
|
|
||||||
*/
|
|
||||||
int32_t dnodeGetRetrieveDataSize(void *pQInfo);
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,27 +20,9 @@
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#include <stdint.h>
|
|
||||||
#include <stdint.h>
|
|
||||||
#include "dnode.h"
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
int sid;
|
|
||||||
uint32_t ip;
|
|
||||||
uint16_t port;
|
|
||||||
int32_t count; // track the number of imports
|
|
||||||
int32_t code; // track the code of imports
|
|
||||||
int32_t numOfTotalPoints; // track the total number of points imported
|
|
||||||
void *thandle; // handle from TAOS layer
|
|
||||||
void *qhandle;
|
|
||||||
} SShellObj;
|
|
||||||
|
|
||||||
int32_t dnodeInitShell();
|
int32_t dnodeInitShell();
|
||||||
|
|
||||||
void dnodeCleanupShell();
|
void dnodeCleanupShell();
|
||||||
|
|
||||||
//SDnodeStatisInfo dnodeGetStatisInfo()
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -20,34 +20,15 @@
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#include <stdint.h>
|
|
||||||
#include <stdbool.h>
|
|
||||||
|
|
||||||
typedef enum {
|
typedef enum {
|
||||||
TSDB_DNODE_RUN_STATUS_INITIALIZE,
|
TSDB_DNODE_RUN_STATUS_INITIALIZE,
|
||||||
TSDB_DNODE_RUN_STATUS_RUNING,
|
TSDB_DNODE_RUN_STATUS_RUNING,
|
||||||
TSDB_DNODE_RUN_STATUS_STOPPED
|
TSDB_DNODE_RUN_STATUS_STOPPED
|
||||||
} SDnodeRunStatus;
|
} SDnodeRunStatus;
|
||||||
|
|
||||||
extern int32_t (*dnodeInitPeers)(int32_t numOfThreads);
|
|
||||||
extern int32_t (*dnodeCheckSystem)();
|
|
||||||
extern int32_t (*dnodeInitStorage)();
|
|
||||||
extern void (*dnodeCleanupStorage)();
|
|
||||||
extern int32_t tsMaxQueues;
|
|
||||||
extern void ** tsRpcQhandle;
|
|
||||||
extern void *tsQueryQhandle;
|
|
||||||
extern void *tsDnodeMgmtQhandle;
|
|
||||||
extern void *tsDnodeTmr;
|
|
||||||
|
|
||||||
int32_t dnodeInitSystem();
|
int32_t dnodeInitSystem();
|
||||||
void dnodeCleanUpSystem();
|
void dnodeCleanUpSystem();
|
||||||
void dnodeInitPlugins();
|
|
||||||
|
|
||||||
SDnodeRunStatus dnodeGetRunStatus();
|
SDnodeRunStatus dnodeGetRunStatus();
|
||||||
void dnodeSetRunStatus(SDnodeRunStatus status);
|
|
||||||
void dnodeCheckDataDirOpenned(const char *dir);
|
|
||||||
void dnodeLockVnodes();
|
|
||||||
void dnodeUnLockVnodes();
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,77 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
|
||||||
*
|
|
||||||
* This program is free software: you can use, redistribute, and/or modify
|
|
||||||
* it under the terms of the GNU Affero General Public License, version 3
|
|
||||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
|
||||||
*
|
|
||||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
|
||||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
||||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
|
||||||
*
|
|
||||||
* You should have received a copy of the GNU Affero General Public License
|
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
*/
|
|
||||||
|
|
||||||
#ifndef TDENGINE_DNODE_VNODE_MGMT_H
|
|
||||||
#define TDENGINE_DNODE_VNODE_MGMT_H
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
|
||||||
extern "C" {
|
|
||||||
#endif
|
|
||||||
|
|
||||||
#include <stdint.h>
|
|
||||||
#include <stdbool.h>
|
|
||||||
#include "taosdef.h"
|
|
||||||
#include "taosmsg.h"
|
|
||||||
#include "tstatus.h"
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Open all Vnodes in the local data directory
|
|
||||||
*/
|
|
||||||
int32_t dnodeOpenVnodes();
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Close all Vnodes that have been created and opened
|
|
||||||
*/
|
|
||||||
int32_t dnodeCleanupVnodes();
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Check if vnode already exists
|
|
||||||
*/
|
|
||||||
bool dnodeCheckVnodeExist(int32_t vid);
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Create vnode with specified configuration and open it
|
|
||||||
* if exist, config it
|
|
||||||
*/
|
|
||||||
int32_t dnodeCreateVnode(SCreateVnodeMsg *pVnode);
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Remove vnode from local repository
|
|
||||||
*/
|
|
||||||
int32_t dnodeDropVnode(int32_t vnode);
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Get the vnode object that has been opened
|
|
||||||
*/
|
|
||||||
//tsdb_repo_t* dnodeGetVnode(int vid);
|
|
||||||
void* dnodeGetVnode(int32_t vnode);
|
|
||||||
|
|
||||||
int32_t dnodeGetVnodesNum();
|
|
||||||
|
|
||||||
/*
|
|
||||||
* get the status of vnode
|
|
||||||
*/
|
|
||||||
EVnodeStatus dnodeGetVnodeStatus(int32_t vnode);
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Check if vnode already exists, and table exist in this vnode
|
|
||||||
*/
|
|
||||||
bool dnodeCheckTableExist(int32_t vnode, int32_t sid, int64_t uid);
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
#endif
|
|
|
@ -20,41 +20,12 @@
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#include <stdbool.h>
|
int dnodeInitWrite();
|
||||||
#include <stdint.h>
|
void dnodeCleanupWrite();
|
||||||
#include "taosdef.h"
|
void dnodeWrite(SRpcMsg *pMsg);
|
||||||
#include "taosmsg.h"
|
void *dnodeAllocateWriteWorker();
|
||||||
|
void dnodeFreeWriteWorker(void *worker);
|
||||||
|
|
||||||
/*
|
|
||||||
* Write data based on dnode, the detail result can be fetched from rsponse
|
|
||||||
* pSubmit: Data to be written
|
|
||||||
* pConn: Communication handle
|
|
||||||
* callback: Pass the write result through a callback function, possibly in a different thread space
|
|
||||||
* rsp: will not be freed by callback function
|
|
||||||
*/
|
|
||||||
void dnodeWriteData(SShellSubmitMsg *pSubmit, void *pConn, void (*callback)(SShellSubmitRspMsg *rsp, void *pConn));
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Create table with specified configuration and open it
|
|
||||||
* if table already exist, update its schema and tag
|
|
||||||
*/
|
|
||||||
int32_t dnodeCreateTable(SDCreateTableMsg *pTable);
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Remove table from local repository
|
|
||||||
*/
|
|
||||||
int32_t dnodeDropTable(SDRemoveTableMsg *pTable);
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Create stream
|
|
||||||
* if stream already exist, update it
|
|
||||||
*/
|
|
||||||
int32_t dnodeCreateStream(SDAlterStreamMsg *pStream);
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Remove all child tables of supertable from local repository
|
|
||||||
*/
|
|
||||||
int32_t dnodeDropSuperTable(SDRemoveSuperTableMsg *pStable);
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,72 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#define _DEFAULT_SOURCE
|
||||||
|
#include "os.h"
|
||||||
|
#include "taosmsg.h"
|
||||||
|
#include "tlog.h"
|
||||||
|
#include "trpc.h"
|
||||||
|
#include "dnodeSystem.h"
|
||||||
|
|
||||||
|
static void (*dnodeProcessMgmtRspFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *);
|
||||||
|
static void dnodeProcessRspFromMnode(SRpcMsg *pMsg);
|
||||||
|
static void dnodeProcessStatusRsp(SRpcMsg *pMsg);
|
||||||
|
static void *tsDnodeMClientRpc;
|
||||||
|
|
||||||
|
int32_t dnodeInitMClient() {
|
||||||
|
dnodeProcessMgmtRspFp[TSDB_MSG_TYPE_STATUS_RSP] = dnodeProcessStatusRsp;
|
||||||
|
|
||||||
|
SRpcInit rpcInit;
|
||||||
|
memset(&rpcInit, 0, sizeof(rpcInit));
|
||||||
|
rpcInit.localIp = tsAnyIp ? "0.0.0.0" : tsPrivateIp;
|
||||||
|
rpcInit.localPort = 0;
|
||||||
|
rpcInit.label = "DND-MC";
|
||||||
|
rpcInit.numOfThreads = 1;
|
||||||
|
rpcInit.cfp = dnodeProcessRspFromMnode;
|
||||||
|
rpcInit.sessions = TSDB_SESSIONS_PER_DNODE;
|
||||||
|
rpcInit.connType = TAOS_CONN_CLIENT;
|
||||||
|
rpcInit.idleTime = tsShellActivityTimer * 1000;
|
||||||
|
|
||||||
|
tsDnodeMClientRpc = rpcOpen(&rpcInit);
|
||||||
|
if (tsDnodeMClientRpc == NULL) {
|
||||||
|
dError("failed to init connection from mgmt");
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
dPrint("client connection to mgmt is opened");
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
void dnodeCleanupMClient() {
|
||||||
|
if (tsDnodeMClientRpc) {
|
||||||
|
rpcClose(tsDnodeMClientRpc);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void dnodeProcessRspFromMnode(SRpcMsg *pMsg) {
|
||||||
|
|
||||||
|
if (dnodeProcessMgmtRspFp[pMsg->msgType]) {
|
||||||
|
(*dnodeProcessMgmtRspFp[pMsg->msgType])(pMsg);
|
||||||
|
} else {
|
||||||
|
dError("%s is not processed", taosMsg[pMsg->msgType]);
|
||||||
|
}
|
||||||
|
|
||||||
|
rpcFreeCont(pMsg->pCont);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void dnodeProcessStatusRsp(SRpcMsg *pMsg) {
|
||||||
|
|
||||||
|
|
||||||
|
}
|
|
@ -15,327 +15,219 @@
|
||||||
|
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
#include "taosmsg.h"
|
|
||||||
#include "tlog.h"
|
#include "tlog.h"
|
||||||
|
#include "taoserror.h"
|
||||||
|
#include "taosmsg.h"
|
||||||
#include "trpc.h"
|
#include "trpc.h"
|
||||||
#include "tsched.h"
|
|
||||||
#include "tsystem.h"
|
|
||||||
#include "mnode.h"
|
|
||||||
#include "dnode.h"
|
|
||||||
#include "dnodeSystem.h"
|
|
||||||
#include "dnodeMgmt.h"
|
|
||||||
#include "dnodeWrite.h"
|
#include "dnodeWrite.h"
|
||||||
#include "dnodeVnodeMgmt.h"
|
#include "dnodeRead.h"
|
||||||
|
#include "dnodeMgmt.h"
|
||||||
|
|
||||||
void (*dnodeInitMgmtIpFp)() = NULL;
|
typedef struct {
|
||||||
int32_t (*dnodeInitMgmtFp)() = NULL;
|
int32_t vgId; // global vnode group ID
|
||||||
void (*dnodeCleanUpMgmtFp)() = NULL;
|
int status; // status: master, slave, notready, deleting
|
||||||
void (*dnodeProcessStatusRspFp)(void *pCont, int32_t contLen, int8_t msgType, void *pConn) = NULL;
|
int refCount; // reference count
|
||||||
void (*dnodeSendMsgToMnodeFp)(int8_t msgType, void *pCont, int32_t contLen) = NULL;
|
int64_t version;
|
||||||
void (*dnodeSendRspToMnodeFp)(void *handle, int32_t code, void *pCont, int contLen) = NULL;
|
void *wworker;
|
||||||
|
void *rworker;
|
||||||
|
void *wal;
|
||||||
|
void *tsdb;
|
||||||
|
void *replica;
|
||||||
|
void *events;
|
||||||
|
void *cq; // continuous query
|
||||||
|
} SVnodeObj;
|
||||||
|
|
||||||
static void *tsStatusTimer = NULL;
|
static int dnodeOpenVnodes();
|
||||||
static void (*dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(void *pCont, int32_t contLen, int8_t msgType, void *pConn);
|
static void dnodeCleanupVnodes();
|
||||||
static void dnodeInitProcessShellMsg();
|
static int dnodeCreateVnode(int32_t vgId, SCreateVnodeMsg *cfg);
|
||||||
|
static int dnodeDropVnode(SVnodeObj *pVnode);
|
||||||
|
static void dnodeRemoveVnode(SVnodeObj *pVnode);
|
||||||
|
|
||||||
static void dnodeSendMsgToMnodeQueueFp(SSchedMsg *sched) {
|
static void dnodeProcessCreateVnodeMsg(SRpcMsg *pMsg);
|
||||||
int32_t contLen = *(int32_t *) (sched->msg - 4);
|
static void dnodeProcessDropVnodeMsg(SRpcMsg *pMsg);
|
||||||
int32_t code = *(int32_t *) (sched->msg - 8);
|
static void dnodeProcessAlterVnodeMsg(SRpcMsg *pMsg);
|
||||||
int8_t msgType = *(int8_t *) (sched->msg - 9);
|
static void (*dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *pMsg);
|
||||||
void *handle = sched->ahandle;
|
|
||||||
int8_t *pCont = sched->msg;
|
|
||||||
|
|
||||||
mgmtProcessMsgFromDnode(msgType, pCont, contLen, handle, code);
|
int dnodeInitMgmt() {
|
||||||
|
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_FREE_VNODE] = dnodeProcessDropVnodeMsg;
|
||||||
}
|
}
|
||||||
|
|
||||||
void dnodeSendMsgToMnode(int8_t msgType, void *pCont, int32_t contLen) {
|
void dnodeCleanupMgmt() {
|
||||||
dTrace("msg:%d:%s is sent to mnode", msgType, taosMsg[msgType]);
|
|
||||||
if (dnodeSendMsgToMnodeFp) {
|
}
|
||||||
dnodeSendMsgToMnodeFp(msgType, pCont, contLen);
|
|
||||||
|
void dnodeMgmt(SRpcMsg *pMsg) {
|
||||||
|
|
||||||
|
terrno = 0;
|
||||||
|
|
||||||
|
if (dnodeProcessMgmtMsgFp[pMsg->msgType]) {
|
||||||
|
(*dnodeProcessMgmtMsgFp[pMsg->msgType])(pMsg);
|
||||||
} else {
|
} else {
|
||||||
if (pCont == NULL) {
|
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
|
||||||
pCont = rpcMallocCont(1);
|
|
||||||
contLen = 0;
|
|
||||||
}
|
|
||||||
SSchedMsg schedMsg = {0};
|
|
||||||
schedMsg.fp = dnodeSendMsgToMnodeQueueFp;
|
|
||||||
schedMsg.msg = pCont;
|
|
||||||
*(int32_t *) (pCont - 4) = contLen;
|
|
||||||
*(int32_t *) (pCont - 8) = TSDB_CODE_SUCCESS;
|
|
||||||
*(int8_t *) (pCont - 9) = msgType;
|
|
||||||
taosScheduleTask(tsDnodeMgmtQhandle, &schedMsg);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SRpcMsg rsp;
|
||||||
|
rsp.handle = pMsg->handle;
|
||||||
|
rsp.code = terrno;
|
||||||
|
rsp.pCont = NULL;
|
||||||
|
rpcSendResponse(&rsp);
|
||||||
|
rpcFreeCont(pMsg->pCont); // free the received message
|
||||||
}
|
}
|
||||||
|
|
||||||
void dnodeSendRspToMnode(void *pConn, int8_t msgType, int32_t code, void *pCont, int32_t contLen) {
|
void *dnodeGetVnode(int vgId) {
|
||||||
dTrace("rsp:%d:%s is sent to mnode, pConn:%p", msgType, taosMsg[msgType], pConn);
|
SVnodeObj *pVnode;
|
||||||
if (dnodeSendRspToMnodeFp) {
|
|
||||||
dnodeSendRspToMnodeFp(pConn, code, pCont, contLen);
|
// retrieve the pVnode from vgId
|
||||||
} else {
|
|
||||||
//hack way
|
|
||||||
if (pCont == NULL) {
|
// if (pVnode->status == ....) {
|
||||||
pCont = rpcMallocCont(1);
|
// terrno = ;
|
||||||
contLen = 0;
|
// return NULL;
|
||||||
}
|
// }
|
||||||
SSchedMsg schedMsg = {0};
|
|
||||||
schedMsg.fp = dnodeSendMsgToMnodeQueueFp;
|
atomic_add_fetch_32(&pVnode->refCount, 1);
|
||||||
schedMsg.msg = pCont;
|
return pVnode;
|
||||||
schedMsg.ahandle = pConn;
|
|
||||||
*(int32_t *) (pCont - 4) = contLen;
|
|
||||||
*(int32_t *) (pCont - 8) = code;
|
|
||||||
*(int8_t *) (pCont - 9) = msgType;
|
|
||||||
taosScheduleTask(tsDnodeMgmtQhandle, &schedMsg);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void dnodeSendStatusMsgToMgmt(void *handle, void *tmrId) {
|
int dnodeGetVnodeStatus(void *pVnode) {
|
||||||
taosTmrReset(dnodeSendStatusMsgToMgmt, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer);
|
return ((SVnodeObj *)pVnode)->status;
|
||||||
if (tsStatusTimer == NULL) {
|
|
||||||
dError("Failed to start status timer");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t contLen = sizeof(SStatusMsg) + dnodeGetVnodesNum() * sizeof(SVnodeLoad);
|
|
||||||
SStatusMsg *pStatus = rpcMallocCont(contLen);
|
|
||||||
if (pStatus == NULL) {
|
|
||||||
dError("Failed to malloc status message");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t totalVnodes = dnodeGetVnodesNum();
|
|
||||||
|
|
||||||
pStatus->version = htonl(tsVersion);
|
|
||||||
pStatus->privateIp = htonl(inet_addr(tsPrivateIp));
|
|
||||||
pStatus->publicIp = htonl(inet_addr(tsPublicIp));
|
|
||||||
pStatus->lastReboot = htonl(tsRebootTime);
|
|
||||||
pStatus->numOfTotalVnodes = htons((uint16_t) tsNumOfTotalVnodes);
|
|
||||||
pStatus->openVnodes = htons((uint16_t) totalVnodes);
|
|
||||||
pStatus->numOfCores = htons((uint16_t) tsNumOfCores);
|
|
||||||
pStatus->diskAvailable = tsAvailDataDirGB;
|
|
||||||
pStatus->alternativeRole = (uint8_t) tsAlternativeRole;
|
|
||||||
|
|
||||||
SVnodeLoad *pLoad = (SVnodeLoad *)pStatus->load;
|
|
||||||
|
|
||||||
//TODO loop all vnodes
|
|
||||||
// for (int32_t vnode = 0, count = 0; vnode <= totalVnodes; ++vnode) {
|
|
||||||
// if (vnodeList[vnode].cfg.maxSessions <= 0) continue;
|
|
||||||
//
|
|
||||||
// SVnodeObj *pVnode = vnodeList + vnode;
|
|
||||||
// pLoad->vnode = htonl(vnode);
|
|
||||||
// pLoad->vgId = htonl(pVnode->cfg.vgId);
|
|
||||||
// pLoad->status = (uint8_t)vnodeList[vnode].vnodeStatus;
|
|
||||||
// pLoad->syncStatus =(uint8_t)vnodeList[vnode].syncStatus;
|
|
||||||
// pLoad->accessState = (uint8_t)(pVnode->accessState);
|
|
||||||
// pLoad->totalStorage = htobe64(pVnode->vnodeStatistic.totalStorage);
|
|
||||||
// pLoad->compStorage = htobe64(pVnode->vnodeStatistic.compStorage);
|
|
||||||
// if (pVnode->vnodeStatus == TSDB_VN_STATUS_MASTER) {
|
|
||||||
// pLoad->pointsWritten = htobe64(pVnode->vnodeStatistic.pointsWritten);
|
|
||||||
// } else {
|
|
||||||
// pLoad->pointsWritten = htobe64(0);
|
|
||||||
// }
|
|
||||||
// pLoad++;
|
|
||||||
//
|
|
||||||
// if (++count >= tsOpenVnodes) {
|
|
||||||
// break;
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
|
|
||||||
dnodeSendMsgToMnode(TSDB_MSG_TYPE_STATUS, pStatus, contLen);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void *dnodeGetVnodeWworker(void *pVnode) {
|
||||||
|
return ((SVnodeObj *)pVnode)->wworker;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t dnodeInitMgmt() {
|
void *dnodeGetVnodeRworker(void *pVnode) {
|
||||||
if (dnodeInitMgmtFp) {
|
return ((SVnodeObj *)pVnode)->rworker;
|
||||||
dnodeInitMgmtFp();
|
}
|
||||||
}
|
|
||||||
|
void *dnodeGetVnodeWal(void *pVnode) {
|
||||||
|
return ((SVnodeObj *)pVnode)->wal;
|
||||||
|
}
|
||||||
|
|
||||||
|
void *dnodeGetVnodeTsdb(void *pVnode) {
|
||||||
|
return ((SVnodeObj *)pVnode)->tsdb;
|
||||||
|
}
|
||||||
|
|
||||||
|
void dnodeReleaseVnode(void *param) {
|
||||||
|
SVnodeObj *pVnode = (SVnodeObj *)param;
|
||||||
|
|
||||||
|
int refCount = atomic_sub_fetch_32(&pVnode->refCount, 1);
|
||||||
|
if (refCount == 0) dnodeRemoveVnode(pVnode);
|
||||||
|
}
|
||||||
|
|
||||||
|
static int dnodeOpenVnode() {
|
||||||
|
SVnodeObj *pVnode;
|
||||||
|
|
||||||
|
// create tsdb
|
||||||
|
|
||||||
|
// create wal
|
||||||
|
|
||||||
|
// allocate write worker
|
||||||
|
pVnode->wworker = dnodeAllocateWriteWorker();
|
||||||
|
|
||||||
|
// create read queue
|
||||||
|
pVnode->rworker = dnodeAllocateReadWorker();
|
||||||
|
|
||||||
|
// create the replica
|
||||||
|
|
||||||
|
// set the status
|
||||||
|
|
||||||
|
pVnode->refCount = 1;
|
||||||
|
|
||||||
dnodeInitProcessShellMsg();
|
|
||||||
taosTmrReset(dnodeSendStatusMsgToMgmt, 500, NULL, tsDnodeTmr, &tsStatusTimer);
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void dnodeInitMgmtIp() {
|
static int dnodeOpenVnodes() {
|
||||||
if (dnodeInitMgmtIpFp) {
|
return 0;
|
||||||
dnodeInitMgmtIpFp();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void dnodeCleanUpMgmt() {
|
static void dnodeCleanupVnode() {
|
||||||
if (tsStatusTimer != NULL) {
|
|
||||||
taosTmrStopA(&tsStatusTimer);
|
|
||||||
tsStatusTimer = NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (dnodeCleanUpMgmtFp) {
|
|
||||||
dnodeCleanUpMgmtFp();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void dnodeProcessMsgFromMgmt(char msgType, void *pCont, int32_t contLen, void *pConn, int32_t code) {
|
static void dnodeCleanupVnodes() {
|
||||||
if (msgType < 0 || msgType >= TSDB_MSG_TYPE_MAX) {
|
|
||||||
dError("invalid msg type:%d", msgType);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
dTrace("msg:%d:%s is received from mgmt, pConn:%p", msgType, taosMsg[(int8_t)msgType], pConn);
|
|
||||||
|
|
||||||
if (msgType == TSDB_MSG_TYPE_STATUS_RSP && dnodeProcessStatusRspFp != NULL) {
|
|
||||||
dnodeProcessStatusRspFp(pCont, contLen, msgType, pConn);
|
|
||||||
}
|
|
||||||
if (dnodeProcessMgmtMsgFp[msgType]) {
|
|
||||||
(*dnodeProcessMgmtMsgFp[msgType])(pCont, contLen, msgType, pConn);
|
|
||||||
} else {
|
|
||||||
dError("%s is not processed", taosMsg[msgType]);
|
|
||||||
}
|
|
||||||
|
|
||||||
//rpcFreeCont(pCont);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static void dnodeProcessCreateTableRequest(void *pCont, int32_t contLen, int8_t msgType, void *pConn) {
|
static int dnodeCreateVnode(int32_t vgId, SCreateVnodeMsg *cfg) {
|
||||||
SDCreateTableMsg *pTable = pCont;
|
|
||||||
pTable->numOfColumns = htons(pTable->numOfColumns);
|
|
||||||
pTable->numOfTags = htons(pTable->numOfTags);
|
|
||||||
pTable->sid = htonl(pTable->sid);
|
|
||||||
pTable->sversion = htonl(pTable->sversion);
|
|
||||||
pTable->tagDataLen = htonl(pTable->tagDataLen);
|
|
||||||
pTable->sqlDataLen = htonl(pTable->sqlDataLen);
|
|
||||||
pTable->contLen = htonl(pTable->contLen);
|
|
||||||
pTable->numOfVPeers = htonl(pTable->numOfVPeers);
|
|
||||||
pTable->uid = htobe64(pTable->uid);
|
|
||||||
pTable->superTableUid = htobe64(pTable->superTableUid);
|
|
||||||
pTable->createdTime = htobe64(pTable->createdTime);
|
|
||||||
|
|
||||||
for (int i = 0; i < pTable->numOfVPeers; ++i) {
|
SVnodeObj *pVnode = malloc(sizeof(SVnodeObj));
|
||||||
pTable->vpeerDesc[i].ip = htonl(pTable->vpeerDesc[i].ip);
|
|
||||||
pTable->vpeerDesc[i].vnode = htonl(pTable->vpeerDesc[i].vnode);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t totalCols = pTable->numOfColumns + pTable->numOfTags;
|
// save the vnode info in non-volatile storage
|
||||||
SSchema *pSchema = (SSchema *) pTable->data;
|
|
||||||
for (int32_t col = 0; col < totalCols; ++col) {
|
|
||||||
pSchema->bytes = htons(pSchema->bytes);
|
|
||||||
pSchema->colId = htons(pSchema->colId);
|
|
||||||
pSchema++;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t code = dnodeCreateTable(pTable);
|
// add into hash, so it can be retrieved
|
||||||
dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0);
|
dnodeOpenVnode(pVnode);
|
||||||
|
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void dnodeProcessAlterStreamRequest(void *pCont, int32_t contLen, int8_t msgType, void *pConn) {
|
static void dnodeRemoveVnode(SVnodeObj *pVnode) {
|
||||||
SDAlterStreamMsg *pStream = pCont;
|
|
||||||
pStream->uid = htobe64(pStream->uid);
|
// remove replica
|
||||||
pStream->stime = htobe64(pStream->stime);
|
|
||||||
pStream->vnode = htonl(pStream->vnode);
|
// remove read queue
|
||||||
pStream->sid = htonl(pStream->sid);
|
dnodeFreeReadWorker(pVnode->rworker);
|
||||||
pStream->status = htonl(pStream->status);
|
|
||||||
|
// remove write queue
|
||||||
|
dnodeFreeWriteWorker(pVnode->wworker);
|
||||||
|
|
||||||
|
// remove wal
|
||||||
|
|
||||||
|
// remove tsdb
|
||||||
|
|
||||||
int32_t code = dnodeCreateStream(pStream);
|
|
||||||
dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static void dnodeProcessRemoveTableRequest(void *pCont, int32_t contLen, int8_t msgType, void *pConn) {
|
static int dnodeDropVnode(SVnodeObj *pVnode) {
|
||||||
SDRemoveTableMsg *pTable = pCont;
|
|
||||||
pTable->sid = htonl(pTable->sid);
|
|
||||||
pTable->numOfVPeers = htonl(pTable->numOfVPeers);
|
|
||||||
pTable->uid = htobe64(pTable->uid);
|
|
||||||
|
|
||||||
for (int i = 0; i < pTable->numOfVPeers; ++i) {
|
int count = atomic_sub_fetch_32(&pVnode->refCount, 1);
|
||||||
pTable->vpeerDesc[i].ip = htonl(pTable->vpeerDesc[i].ip);
|
|
||||||
pTable->vpeerDesc[i].vnode = htonl(pTable->vpeerDesc[i].vnode);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t code = dnodeDropTable(pTable);
|
if (count<=0) dnodeRemoveVnode(pVnode);
|
||||||
dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0);
|
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void dnodeProcessVPeerCfgRsp(void *pCont, int32_t contLen, int8_t msgType, void *pConn) {
|
static void dnodeProcessCreateVnodeMsg(SRpcMsg *pMsg) {
|
||||||
int32_t code = htonl(*((int32_t *) pCont));
|
|
||||||
|
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
// SVnodeObj *pVnode;
|
||||||
SCreateVnodeMsg *pVnode = (SCreateVnodeMsg *) (pCont + sizeof(int32_t));
|
// int vgId;
|
||||||
dnodeCreateVnode(pVnode);
|
// SVPeersMsg *pCfg;
|
||||||
} else if (code == TSDB_CODE_INVALID_VNODE_ID) {
|
|
||||||
SFreeVnodeMsg *vpeer = (SFreeVnodeMsg *) (pCont + sizeof(int32_t));
|
// check everything, if not ok, set terrno;
|
||||||
int32_t vnode = htonl(vpeer->vnode);
|
|
||||||
dError("vnode:%d, not exist, remove it", vnode);
|
|
||||||
dnodeDropVnode(vnode);
|
// everything is ok
|
||||||
} else {
|
|
||||||
dError("code:%d invalid message", code);
|
// dnodeCreateVnode(vgId, pCfg);
|
||||||
}
|
|
||||||
|
//if (pVnode == NULL) terrno = TSDB_CODE
|
||||||
}
|
}
|
||||||
|
|
||||||
static void dnodeProcessTableCfgRsp(void *pCont, int32_t contLen, int8_t msgType, void *pConn) {
|
static void dnodeProcessDropVnodeMsg(SRpcMsg *pMsg) {
|
||||||
int32_t code = htonl(*((int32_t *) pCont));
|
|
||||||
|
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
SVnodeObj *pVnode;
|
||||||
SDCreateTableMsg *table = (SDCreateTableMsg *) (pCont + sizeof(int32_t));
|
int vgId;
|
||||||
dnodeCreateTable(table);
|
|
||||||
} else if (code == TSDB_CODE_INVALID_TABLE_ID) {
|
// check everything, if not ok, set terrno;
|
||||||
SDRemoveTableMsg *pTable = (SDRemoveTableMsg *) (pCont + sizeof(int32_t));
|
|
||||||
pTable->sid = htonl(pTable->sid);
|
|
||||||
pTable->uid = htobe64(pTable->uid);
|
// everything is ok
|
||||||
dError("table:%s, sid:%d table is not configured, remove it", pTable->tableId, pTable->sid);
|
dnodeDropVnode(pVnode);
|
||||||
dnodeDropTable(pTable);
|
|
||||||
} else {
|
//if (pVnode == NULL) terrno = TSDB_CODE
|
||||||
dError("code:%d invalid message", code);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static void dnodeProcessCreateVnodeRequest(void *pCont, int32_t contLen, int8_t msgType, void *pConn) {
|
static void dnodeProcessAlterVnodeMsg(SRpcMsg *pMsg) {
|
||||||
SCreateVnodeMsg *pVnode = (SCreateVnodeMsg *) pCont;
|
|
||||||
|
|
||||||
int32_t code = dnodeCreateVnode(pVnode);
|
SVnodeObj *pVnode;
|
||||||
dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0);
|
int vgId;
|
||||||
|
|
||||||
|
// check everything, if not ok, set terrno;
|
||||||
|
|
||||||
|
|
||||||
|
// everything is ok
|
||||||
|
// dnodeAlterVnode(pVnode);
|
||||||
|
|
||||||
|
//if (pVnode == NULL) terrno = TSDB_CODE
|
||||||
}
|
}
|
||||||
|
|
||||||
static void dnodeProcessFreeVnodeRequest(void *pCont, int32_t contLen, int8_t msgType, void *pConn) {
|
|
||||||
SFreeVnodeMsg *pVnode = (SFreeVnodeMsg *) pCont;
|
|
||||||
int32_t vnode = htonl(pVnode->vnode);
|
|
||||||
|
|
||||||
int32_t code = dnodeDropVnode(vnode);
|
|
||||||
dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0);
|
|
||||||
}
|
|
||||||
|
|
||||||
static void dnodeProcessDnodeCfgRequest(void *pCont, int32_t contLen, int8_t msgType, void *pConn) {
|
|
||||||
SCfgDnodeMsg *pCfg = (SCfgDnodeMsg *)pCont;
|
|
||||||
|
|
||||||
int32_t code = tsCfgDynamicOptions(pCfg->config);
|
|
||||||
dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0);
|
|
||||||
}
|
|
||||||
|
|
||||||
static void dnodeProcessDropStableRequest(void *pCont, int32_t contLen, int8_t msgType, void *pConn) {
|
|
||||||
dnodeSendRspToMnode(pConn, msgType + 1, TSDB_CODE_SUCCESS, NULL, 0);
|
|
||||||
}
|
|
||||||
|
|
||||||
void dnodeSendVnodeCfgMsg(int32_t vnode) {
|
|
||||||
SVpeerCfgMsg *cfg = (SVpeerCfgMsg *) rpcMallocCont(sizeof(SVpeerCfgMsg));
|
|
||||||
if (cfg == NULL) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
cfg->vnode = htonl(vnode);
|
|
||||||
dnodeSendMsgToMnode(TSDB_MSG_TYPE_VNODE_CFG, cfg, sizeof(SVpeerCfgMsg));
|
|
||||||
}
|
|
||||||
|
|
||||||
void dnodeSendTableCfgMsg(int32_t vnode, int32_t sid) {
|
|
||||||
STableCfgMsg *cfg = (STableCfgMsg *) rpcMallocCont(sizeof(STableCfgMsg));
|
|
||||||
if (cfg == NULL) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
cfg->vnode = htonl(vnode);
|
|
||||||
dnodeSendMsgToMnode(TSDB_MSG_TYPE_TABLE_CFG, cfg, sizeof(STableCfgMsg));
|
|
||||||
}
|
|
||||||
|
|
||||||
static void dnodeInitProcessShellMsg() {
|
|
||||||
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_DNODE_CREATE_TABLE] = dnodeProcessCreateTableRequest;
|
|
||||||
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_DNODE_REMOVE_TABLE] = dnodeProcessRemoveTableRequest;
|
|
||||||
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_CREATE_VNODE] = dnodeProcessCreateVnodeRequest;
|
|
||||||
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_FREE_VNODE] = dnodeProcessFreeVnodeRequest;
|
|
||||||
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_DNODE_CFG] = dnodeProcessDnodeCfgRequest;
|
|
||||||
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_ALTER_STREAM] = dnodeProcessAlterStreamRequest;
|
|
||||||
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_DROP_STABLE] = dnodeProcessDropStableRequest;
|
|
||||||
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_VNODE_CFG_RSP] = dnodeProcessVPeerCfgRsp;
|
|
||||||
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_TABLE_CFG_RSP] = dnodeProcessTableCfgRsp;
|
|
||||||
}
|
|
|
@ -0,0 +1,87 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "os.h"
|
||||||
|
#include "taosmsg.h"
|
||||||
|
#include "tlog.h"
|
||||||
|
#include "trpc.h"
|
||||||
|
#include "dnodeSystem.h"
|
||||||
|
#include "dnodeMgmt.h"
|
||||||
|
#include "dnodeWrite.h"
|
||||||
|
|
||||||
|
static void (*dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *);
|
||||||
|
static void dnodeProcessMsgFromMnode(SRpcMsg *pMsg);
|
||||||
|
static void *tsDnodeMnodeRpc = NULL;
|
||||||
|
|
||||||
|
int32_t dnodeInitMnode() {
|
||||||
|
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_DNODE_CREATE_TABLE] = dnodeWrite;
|
||||||
|
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_DNODE_REMOVE_TABLE] = dnodeWrite;
|
||||||
|
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_FREE_VNODE] = dnodeMgmt;
|
||||||
|
|
||||||
|
SRpcInit rpcInit;
|
||||||
|
memset(&rpcInit, 0, sizeof(rpcInit));
|
||||||
|
rpcInit.localIp = tsAnyIp ? "0.0.0.0" : tsPrivateIp;
|
||||||
|
|
||||||
|
// note: a new port shall be assigned
|
||||||
|
// rpcInit.localPort = tsDnodeMnodePort;
|
||||||
|
rpcInit.label = "DND-mgmt";
|
||||||
|
rpcInit.numOfThreads = 1;
|
||||||
|
rpcInit.cfp = dnodeProcessMsgFromMnode;
|
||||||
|
rpcInit.sessions = TSDB_SESSIONS_PER_DNODE;
|
||||||
|
rpcInit.connType = TAOS_CONN_SERVER;
|
||||||
|
rpcInit.idleTime = tsShellActivityTimer * 1500;
|
||||||
|
|
||||||
|
tsDnodeMnodeRpc = rpcOpen(&rpcInit);
|
||||||
|
if (tsDnodeMnodeRpc == NULL) {
|
||||||
|
dError("failed to init connection from mgmt");
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
dPrint("connection to mgmt is opened");
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
void dnodeCleanupMnode() {
|
||||||
|
if (tsDnodeMnodeRpc) {
|
||||||
|
rpcClose(tsDnodeMnodeRpc);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void dnodeProcessMsgFromMnode(SRpcMsg *pMsg) {
|
||||||
|
SRpcMsg rspMsg;
|
||||||
|
|
||||||
|
rspMsg.handle = pMsg->handle;
|
||||||
|
rspMsg.pCont = NULL;
|
||||||
|
rspMsg.contLen = 0;
|
||||||
|
|
||||||
|
if (dnodeGetRunStatus() != TSDB_DNODE_RUN_STATUS_RUNING) {
|
||||||
|
rspMsg.code = TSDB_CODE_NOT_READY;
|
||||||
|
rpcSendResponse(&rspMsg);
|
||||||
|
rpcFreeCont(pMsg->pCont);
|
||||||
|
dTrace("conn:%p, query msg is ignored since dnode not running", pMsg->handle);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (dnodeProcessMgmtMsgFp[pMsg->msgType]) {
|
||||||
|
(*dnodeProcessMgmtMsgFp[pMsg->msgType])(pMsg);
|
||||||
|
} else {
|
||||||
|
dError("%s is not processed", taosMsg[pMsg->msgType]);
|
||||||
|
rspMsg.code = TSDB_CODE_MSG_NOT_PROCESSED;
|
||||||
|
rpcSendResponse(&rspMsg);
|
||||||
|
rpcFreeCont(pMsg->pCont);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
|
@ -17,56 +17,196 @@
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
#include "taoserror.h"
|
#include "taoserror.h"
|
||||||
#include "tlog.h"
|
#include "tlog.h"
|
||||||
#include "tsched.h"
|
#include "trpc.h"
|
||||||
#include "dnode.h"
|
#include "taosmsg.h"
|
||||||
|
#include "tqueue.h"
|
||||||
#include "dnodeRead.h"
|
#include "dnodeRead.h"
|
||||||
#include "dnodeSystem.h"
|
#include "dnodeMgmt.h"
|
||||||
|
|
||||||
void dnodeQueryData(SQueryTableMsg *pQuery, void *pConn, void (*callback)(int32_t code, void *pQInfo, void *pConn)) {
|
typedef struct {
|
||||||
dTrace("conn:%p, query msg is disposed", pConn);
|
int32_t code;
|
||||||
void *pQInfo = 100;
|
int32_t count;
|
||||||
callback(TSDB_CODE_SUCCESS, pQInfo, pConn);
|
int32_t numOfVnodes;
|
||||||
}
|
} SRpcContext;
|
||||||
|
|
||||||
static void dnodeExecuteRetrieveData(SSchedMsg *pSched) {
|
typedef struct {
|
||||||
SDnodeRetrieveCallbackFp callback = (SDnodeRetrieveCallbackFp)pSched->thandle;
|
void *pCont;
|
||||||
SRetrieveTableMsg *pRetrieve = pSched->msg;
|
int contLen;
|
||||||
void *pConn = pSched->ahandle;
|
SRpcMsg rpcMsg;
|
||||||
|
void *pVnode;
|
||||||
|
SRpcContext *pRpcContext; // RPC message context
|
||||||
|
} SReadMsg;
|
||||||
|
|
||||||
dTrace("conn:%p, retrieve msg is disposed, qhandle:%" PRId64, pConn, pRetrieve->qhandle);
|
static void *dnodeProcessReadQueue(void *param);
|
||||||
|
static void dnodeProcessReadResult(SReadMsg *pRead);
|
||||||
|
static void dnodeHandleIdleReadWorker();
|
||||||
|
static void dnodeProcessQueryMsg(SReadMsg *pMsg);
|
||||||
|
static void dnodeProcessRetrieveMsg(SReadMsg *pMsg);
|
||||||
|
static void (*dnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SReadMsg *pNode);
|
||||||
|
|
||||||
//examples
|
// module global variable
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
static taos_qset readQset;
|
||||||
void *pQInfo = (void*)pRetrieve->qhandle;
|
static int threads; // number of query threads
|
||||||
|
static int maxThreads;
|
||||||
|
static int minThreads;
|
||||||
|
|
||||||
(*callback)(code, pQInfo, pConn);
|
int dnodeInitRead() {
|
||||||
|
dnodeProcessReadMsgFp[TSDB_MSG_TYPE_QUERY] = dnodeProcessQueryMsg;
|
||||||
|
dnodeProcessReadMsgFp[TSDB_MSG_TYPE_RETRIEVE] = dnodeProcessRetrieveMsg;
|
||||||
|
|
||||||
free(pSched->msg);
|
readQset = taosOpenQset();
|
||||||
}
|
|
||||||
|
|
||||||
void dnodeRetrieveData(SRetrieveTableMsg *pRetrieve, void *pConn, SDnodeRetrieveCallbackFp callbackFp) {
|
minThreads = 3;
|
||||||
dTrace("conn:%p, retrieve msg is received", pConn);
|
maxThreads = tsNumOfCores*tsNumOfThreadsPerCore;
|
||||||
|
if (maxThreads <= minThreads*2) maxThreads = 2*minThreads;
|
||||||
|
|
||||||
void *msg = malloc(sizeof(SRetrieveTableMsg));
|
|
||||||
memcpy(msg, pRetrieve, sizeof(SRetrieveTableMsg));
|
|
||||||
|
|
||||||
SSchedMsg schedMsg;
|
|
||||||
schedMsg.msg = msg;
|
|
||||||
schedMsg.ahandle = pConn;
|
|
||||||
schedMsg.thandle = callbackFp;
|
|
||||||
schedMsg.fp = dnodeExecuteRetrieveData;
|
|
||||||
taosScheduleTask(tsQueryQhandle, &schedMsg);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t dnodeGetRetrieveData(void *pQInfo, SRetrieveTableRsp *pRetrieve) {
|
|
||||||
dTrace("qInfo:%p, data is retrieved");
|
|
||||||
pRetrieve->numOfRows = 0;
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t dnodeGetRetrieveDataSize(void *pQInfo) {
|
void dnodeCleanupRead() {
|
||||||
dTrace("qInfo:%p, contLen is 100");
|
taosCloseQset(readQset);
|
||||||
return 100;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void dnodeRead(SRpcMsg *pMsg) {
|
||||||
|
int leftLen = pMsg->contLen;
|
||||||
|
char *pCont = (char *)pMsg->pCont;
|
||||||
|
int contLen = 0;
|
||||||
|
int numOfVnodes = 0;
|
||||||
|
int32_t vgId = 0;
|
||||||
|
SRpcContext *pRpcContext = NULL;
|
||||||
|
|
||||||
|
// parse head, get number of vnodes;
|
||||||
|
if ( numOfVnodes > 1) {
|
||||||
|
pRpcContext = calloc(sizeof(SRpcContext), 1);
|
||||||
|
pRpcContext->numOfVnodes = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
while (leftLen > 0) {
|
||||||
|
// todo: parse head, get vgId, contLen
|
||||||
|
|
||||||
|
// get pVnode from vgId
|
||||||
|
void *pVnode = dnodeGetVnode(vgId);
|
||||||
|
if (pVnode == NULL) {
|
||||||
|
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// put message into queue
|
||||||
|
SReadMsg readMsg;
|
||||||
|
readMsg.rpcMsg = *pMsg;
|
||||||
|
readMsg.pCont = pCont;
|
||||||
|
readMsg.contLen = contLen;
|
||||||
|
readMsg.pRpcContext = pRpcContext;
|
||||||
|
readMsg.pVnode = pVnode;
|
||||||
|
|
||||||
|
taos_queue queue = dnodeGetVnodeRworker(pVnode);
|
||||||
|
taosWriteQitem(queue, &readMsg);
|
||||||
|
|
||||||
|
// next vnode
|
||||||
|
leftLen -= contLen;
|
||||||
|
pCont -= contLen;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void *dnodeAllocateReadWorker() {
|
||||||
|
|
||||||
|
taos_queue *queue = taosOpenQueue(sizeof(SReadMsg));
|
||||||
|
if ( queue == NULL ) return NULL;
|
||||||
|
|
||||||
|
taosAddIntoQset(readQset, queue);
|
||||||
|
|
||||||
|
// spawn a thread to process queue
|
||||||
|
if (threads < maxThreads) {
|
||||||
|
pthread_t thread;
|
||||||
|
pthread_attr_t thAttr;
|
||||||
|
pthread_attr_init(&thAttr);
|
||||||
|
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
|
||||||
|
|
||||||
|
if (pthread_create(&thread, &thAttr, dnodeProcessReadQueue, readQset) != 0) {
|
||||||
|
dError("failed to create thread to process read queue, reason:%s", strerror(errno));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return queue;
|
||||||
|
}
|
||||||
|
|
||||||
|
void dnodeFreeReadWorker(void *rqueue) {
|
||||||
|
|
||||||
|
taosCloseQueue(rqueue);
|
||||||
|
|
||||||
|
// dynamically adjust the number of threads
|
||||||
|
}
|
||||||
|
|
||||||
|
static void *dnodeProcessReadQueue(void *param) {
|
||||||
|
taos_qset qset = (taos_qset)param;
|
||||||
|
SReadMsg readMsg;
|
||||||
|
|
||||||
|
while (1) {
|
||||||
|
if (taosReadQitemFromQset(qset, &readMsg) <= 0) {
|
||||||
|
dnodeHandleIdleReadWorker();
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
terrno = 0;
|
||||||
|
if (dnodeProcessReadMsgFp[readMsg.rpcMsg.msgType]) {
|
||||||
|
(*dnodeProcessReadMsgFp[readMsg.rpcMsg.msgType]) (&readMsg);
|
||||||
|
} else {
|
||||||
|
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
|
||||||
|
}
|
||||||
|
|
||||||
|
dnodeProcessReadResult(&readMsg);
|
||||||
|
}
|
||||||
|
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void dnodeHandleIdleReadWorker() {
|
||||||
|
int num = taosGetQueueNumber(readQset);
|
||||||
|
|
||||||
|
if (num == 0 || (num <= minThreads && threads > minThreads)) {
|
||||||
|
threads--;
|
||||||
|
pthread_exit(NULL);
|
||||||
|
} else {
|
||||||
|
usleep(100);
|
||||||
|
sched_yield();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void dnodeProcessReadResult(SReadMsg *pRead) {
|
||||||
|
SRpcContext *pRpcContext = pRead->pRpcContext;
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
dnodeReleaseVnode(pRead->pVnode);
|
||||||
|
|
||||||
|
if (pRpcContext) {
|
||||||
|
if (terrno) {
|
||||||
|
if (pRpcContext->code == 0) pRpcContext->code = terrno;
|
||||||
|
}
|
||||||
|
|
||||||
|
int count = atomic_add_fetch_32(&pRpcContext->count, 1);
|
||||||
|
if (count < pRpcContext->numOfVnodes) {
|
||||||
|
// not over yet, multiple vnodes
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// over, result can be merged now
|
||||||
|
code = pRpcContext->code;
|
||||||
|
} else {
|
||||||
|
code = terrno;
|
||||||
|
}
|
||||||
|
|
||||||
|
SRpcMsg rsp;
|
||||||
|
rsp.handle = pRead->rpcMsg.handle;
|
||||||
|
rsp.code = code;
|
||||||
|
rsp.pCont = NULL;
|
||||||
|
rpcSendResponse(&rsp);
|
||||||
|
rpcFreeCont(pRead->rpcMsg.pCont); // free the received message
|
||||||
|
}
|
||||||
|
|
||||||
|
static void dnodeProcessQueryMsg(SReadMsg *pMsg) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
static void dnodeProcessRetrieveMsg(SReadMsg *pMsg) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
|
@ -19,31 +19,22 @@
|
||||||
#include "taosdef.h"
|
#include "taosdef.h"
|
||||||
#include "taosmsg.h"
|
#include "taosmsg.h"
|
||||||
#include "tlog.h"
|
#include "tlog.h"
|
||||||
#include "tsocket.h"
|
|
||||||
#include "tschemautil.h"
|
|
||||||
#include "textbuffer.h"
|
|
||||||
#include "trpc.h"
|
#include "trpc.h"
|
||||||
#include "http.h"
|
|
||||||
#include "dnode.h"
|
|
||||||
#include "dnodeMgmt.h"
|
|
||||||
#include "dnodeRead.h"
|
|
||||||
#include "dnodeSystem.h"
|
#include "dnodeSystem.h"
|
||||||
#include "dnodeShell.h"
|
#include "dnodeRead.h"
|
||||||
#include "dnodeVnodeMgmt.h"
|
|
||||||
#include "dnodeWrite.h"
|
#include "dnodeWrite.h"
|
||||||
|
#include "dnodeShell.h"
|
||||||
|
|
||||||
static void dnodeProcessRetrieveMsg(void *pCont, int32_t contLen, void *pConn);
|
static void (*dnodeProcessShellMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *);
|
||||||
static void dnodeProcessQueryMsg(void *pCont, int32_t contLen, void *pConn);
|
static void dnodeProcessMsgFromShell(SRpcMsg *pMsg);
|
||||||
static void dnodeProcessSubmitMsg(void *pCont, int32_t contLen, void *pConn);
|
static void *tsDnodeShellRpc = NULL;
|
||||||
static void dnodeProcessMsgFromShell(char msgType, void *pCont, int contLen, void *handle, int32_t code);
|
|
||||||
static int dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey);
|
|
||||||
|
|
||||||
static void *tsDnodeShellServer = NULL;
|
|
||||||
static int32_t tsDnodeQueryReqNum = 0;
|
|
||||||
static int32_t tsDnodeSubmitReqNum = 0;
|
|
||||||
|
|
||||||
int32_t dnodeInitShell() {
|
int32_t dnodeInitShell() {
|
||||||
int32_t numOfThreads = tsNumOfCores * tsNumOfThreadsPerCore;
|
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeWrite;
|
||||||
|
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_QUERY] = dnodeRead;
|
||||||
|
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_RETRIEVE] = dnodeRead;
|
||||||
|
|
||||||
|
int numOfThreads = tsNumOfCores * tsNumOfThreadsPerCore;
|
||||||
numOfThreads = (int32_t) ((1.0 - tsRatioOfQueryThreads) * numOfThreads / 2.0);
|
numOfThreads = (int32_t) ((1.0 - tsRatioOfQueryThreads) * numOfThreads / 2.0);
|
||||||
if (numOfThreads < 1) {
|
if (numOfThreads < 1) {
|
||||||
numOfThreads = 1;
|
numOfThreads = 1;
|
||||||
|
@ -58,164 +49,47 @@ int32_t dnodeInitShell() {
|
||||||
rpcInit.cfp = dnodeProcessMsgFromShell;
|
rpcInit.cfp = dnodeProcessMsgFromShell;
|
||||||
rpcInit.sessions = TSDB_SESSIONS_PER_DNODE;
|
rpcInit.sessions = TSDB_SESSIONS_PER_DNODE;
|
||||||
rpcInit.connType = TAOS_CONN_SERVER;
|
rpcInit.connType = TAOS_CONN_SERVER;
|
||||||
rpcInit.idleTime = tsShellActivityTimer * 2000;
|
rpcInit.idleTime = tsShellActivityTimer * 1500;
|
||||||
rpcInit.afp = dnodeRetrieveUserAuthInfo;
|
|
||||||
|
|
||||||
tsDnodeShellServer = rpcOpen(&rpcInit);
|
tsDnodeShellRpc = rpcOpen(&rpcInit);
|
||||||
if (tsDnodeShellServer == NULL) {
|
if (tsDnodeShellRpc == NULL) {
|
||||||
dError("failed to init connection from shell");
|
dError("failed to init connection from shell");
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
dPrint("shell is opened");
|
dPrint("connection to shell is opened");
|
||||||
return TSDB_CODE_SUCCESS;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void dnodeCleanupShell() {
|
void dnodeCleanupShell() {
|
||||||
if (tsDnodeShellServer) {
|
if (tsDnodeShellRpc) {
|
||||||
rpcClose(tsDnodeShellServer);
|
rpcClose(tsDnodeShellRpc);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
SDnodeStatisInfo dnodeGetStatisInfo() {
|
void dnodeProcessMsgFromShell(SRpcMsg *pMsg) {
|
||||||
SDnodeStatisInfo info = {0};
|
SRpcMsg rpcMsg;
|
||||||
if (dnodeGetRunStatus() == TSDB_DNODE_RUN_STATUS_RUNING) {
|
|
||||||
info.httpReqNum = httpGetReqCount();
|
|
||||||
info.queryReqNum = atomic_exchange_32(&tsDnodeQueryReqNum, 0);
|
|
||||||
info.submitReqNum = atomic_exchange_32(&tsDnodeSubmitReqNum, 0);
|
|
||||||
}
|
|
||||||
|
|
||||||
return info;
|
rpcMsg.handle = pMsg->handle;
|
||||||
}
|
rpcMsg.pCont = NULL;
|
||||||
|
rpcMsg.contLen = 0;
|
||||||
|
|
||||||
static void dnodeProcessMsgFromShell(char msgType, void *pCont, int contLen, void *handle, int32_t code) {
|
|
||||||
if (dnodeGetRunStatus() != TSDB_DNODE_RUN_STATUS_RUNING) {
|
if (dnodeGetRunStatus() != TSDB_DNODE_RUN_STATUS_RUNING) {
|
||||||
rpcSendResponse(handle, TSDB_CODE_NOT_READY, 0, 0);
|
dError("RPC %p, shell msg is ignored since dnode not running", pMsg->handle);
|
||||||
dTrace("query msg is ignored since dnode not running");
|
rpcMsg.code = TSDB_CODE_NOT_READY;
|
||||||
|
rpcSendResponse(&rpcMsg);
|
||||||
|
rpcFreeCont(pMsg->pCont);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
dTrace("conn:%p, msg:%s is received", handle, taosMsg[(int8_t)msgType]);
|
if ( dnodeProcessShellMsgFp[pMsg->msgType] ) {
|
||||||
|
(*dnodeProcessShellMsgFp[pMsg->msgType])(pMsg);
|
||||||
if (msgType == TSDB_MSG_TYPE_QUERY) {
|
|
||||||
dnodeProcessQueryMsg(pCont, contLen, handle);
|
|
||||||
} else if (msgType == TSDB_MSG_TYPE_RETRIEVE) {
|
|
||||||
dnodeProcessRetrieveMsg(pCont, contLen, handle);
|
|
||||||
} else if (msgType == TSDB_MSG_TYPE_SUBMIT) {
|
|
||||||
dnodeProcessSubmitMsg(pCont, contLen, handle);
|
|
||||||
} else {
|
} else {
|
||||||
dError("conn:%p, msg:%s is not processed", handle, taosMsg[(int8_t)msgType]);
|
dError("RPC %p, msg:%s from shell is not handled", pMsg->handle, taosMsg[pMsg->msgType]);
|
||||||
|
rpcMsg.code = TSDB_CODE_MSG_NOT_PROCESSED;
|
||||||
|
rpcSendResponse(&rpcMsg);
|
||||||
|
rpcFreeCont(pMsg->pCont);
|
||||||
}
|
}
|
||||||
|
|
||||||
//TODO free may be cause segmentfault
|
|
||||||
// rpcFreeCont(pCont);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static int dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey) {
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void dnodeProcessQueryMsgCb(int32_t code, void *pQInfo, void *pConn) {
|
|
||||||
dTrace("conn:%p, query is returned, code:%d", pConn, code);
|
|
||||||
|
|
||||||
int32_t contLen = sizeof(SQueryTableRsp);
|
|
||||||
SQueryTableRsp *queryRsp = (SQueryTableRsp *) rpcMallocCont(contLen);
|
|
||||||
if (queryRsp == NULL) {
|
|
||||||
rpcSendResponse(pConn, TSDB_CODE_SERV_OUT_OF_MEMORY, NULL, 0);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
queryRsp->code = htonl(code);
|
|
||||||
queryRsp->qhandle = htobe64((uint64_t) (pQInfo));
|
|
||||||
rpcSendResponse(pConn, TSDB_CODE_SUCCESS, queryRsp, contLen);
|
|
||||||
}
|
|
||||||
|
|
||||||
static void dnodeProcessQueryMsg(void *pCont, int32_t contLen, void *pConn) {
|
|
||||||
atomic_fetch_add_32(&tsDnodeQueryReqNum, 1);
|
|
||||||
SQueryTableMsg *pQuery = (SQueryTableMsg *) pCont;
|
|
||||||
dnodeQueryData(pQuery, pConn, dnodeProcessQueryMsgCb);
|
|
||||||
}
|
|
||||||
|
|
||||||
void dnodeProcessRetrieveMsgCb(int32_t code, void *pQInfo, void *pConn) {
|
|
||||||
dTrace("conn:%p, retrieve is returned, code:%d", pConn, code);
|
|
||||||
|
|
||||||
assert(pConn != NULL);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
rpcSendResponse(pConn, code, 0, 0);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
assert(pQInfo != NULL);
|
|
||||||
int32_t contLen = dnodeGetRetrieveDataSize(pQInfo);
|
|
||||||
SRetrieveTableRsp *pRetrieve = (SRetrieveTableRsp *) rpcMallocCont(contLen);
|
|
||||||
if (pRetrieve == NULL) {
|
|
||||||
rpcSendResponse(pConn, TSDB_CODE_SERV_OUT_OF_MEMORY, 0, 0);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
code = dnodeGetRetrieveData(pQInfo, pRetrieve);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
rpcSendResponse(pConn, TSDB_CODE_INVALID_QHANDLE, 0, 0);
|
|
||||||
}
|
|
||||||
|
|
||||||
pRetrieve->numOfRows = htonl(pRetrieve->numOfRows);
|
|
||||||
pRetrieve->precision = htons(pRetrieve->precision);
|
|
||||||
pRetrieve->offset = htobe64(pRetrieve->offset);
|
|
||||||
pRetrieve->useconds = htobe64(pRetrieve->useconds);
|
|
||||||
|
|
||||||
rpcSendResponse(pConn, TSDB_CODE_SUCCESS, pRetrieve, contLen);
|
|
||||||
}
|
|
||||||
|
|
||||||
static void dnodeProcessRetrieveMsg(void *pCont, int32_t contLen, void *pConn) {
|
|
||||||
SRetrieveTableMsg *pRetrieve = (SRetrieveTableMsg *) pCont;
|
|
||||||
pRetrieve->qhandle = htobe64(pRetrieve->qhandle);
|
|
||||||
pRetrieve->free = htons(pRetrieve->free);
|
|
||||||
|
|
||||||
dnodeRetrieveData(pRetrieve, pConn, dnodeProcessRetrieveMsgCb);
|
|
||||||
}
|
|
||||||
|
|
||||||
void dnodeProcessSubmitMsgCb(SShellSubmitRspMsg *result, void *pConn) {
|
|
||||||
assert(result != NULL);
|
|
||||||
dTrace("conn:%p, submit is returned, code:%d", pConn, result->code);
|
|
||||||
|
|
||||||
if (result->code != 0) {
|
|
||||||
rpcSendResponse(pConn, result->code, NULL, 0);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t contLen = sizeof(SShellSubmitRspMsg) + result->numOfFailedBlocks * sizeof(SShellSubmitRspBlock);
|
|
||||||
SShellSubmitRspMsg *submitRsp = (SShellSubmitRspMsg *) rpcMallocCont(contLen);
|
|
||||||
if (submitRsp == NULL) {
|
|
||||||
rpcSendResponse(pConn, TSDB_CODE_SERV_OUT_OF_MEMORY, NULL, 0);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
memcpy(submitRsp, result, contLen);
|
|
||||||
|
|
||||||
for (int i = 0; i < submitRsp->numOfFailedBlocks; ++i) {
|
|
||||||
SShellSubmitRspBlock *block = &submitRsp->failedBlocks[i];
|
|
||||||
if (block->code == TSDB_CODE_NOT_ACTIVE_VNODE || block->code == TSDB_CODE_INVALID_VNODE_ID) {
|
|
||||||
dnodeSendVnodeCfgMsg(block->vnode);
|
|
||||||
} else if (block->code == TSDB_CODE_INVALID_TABLE_ID || block->code == TSDB_CODE_NOT_ACTIVE_TABLE) {
|
|
||||||
dnodeSendTableCfgMsg(block->vnode, block->sid);
|
|
||||||
}
|
|
||||||
block->index = htonl(block->index);
|
|
||||||
block->vnode = htonl(block->vnode);
|
|
||||||
block->sid = htonl(block->sid);
|
|
||||||
block->code = htonl(block->code);
|
|
||||||
}
|
|
||||||
submitRsp->code = htonl(submitRsp->code);
|
|
||||||
submitRsp->numOfRows = htonl(submitRsp->numOfRows);
|
|
||||||
submitRsp->affectedRows = htonl(submitRsp->affectedRows);
|
|
||||||
submitRsp->failedRows = htonl(submitRsp->failedRows);
|
|
||||||
submitRsp->numOfFailedBlocks = htonl(submitRsp->numOfFailedBlocks);
|
|
||||||
|
|
||||||
rpcSendResponse(pConn, TSDB_CODE_SUCCESS, submitRsp, contLen);
|
|
||||||
}
|
|
||||||
|
|
||||||
static void dnodeProcessSubmitMsg(void *pCont, int32_t contLen, void *pConn) {
|
|
||||||
atomic_fetch_add_32(&tsDnodeSubmitReqNum, 1);
|
|
||||||
|
|
||||||
SShellSubmitMsg *pSubmit = (SShellSubmitMsg *) pCont;
|
|
||||||
dnodeWriteData(pSubmit, pConn, dnodeProcessSubmitMsgCb);
|
|
||||||
}
|
|
||||||
|
|
|
@ -25,12 +25,12 @@
|
||||||
#include "ttimer.h"
|
#include "ttimer.h"
|
||||||
#include "tutil.h"
|
#include "tutil.h"
|
||||||
#include "http.h"
|
#include "http.h"
|
||||||
|
#include "trpc.h"
|
||||||
#include "dnode.h"
|
#include "dnode.h"
|
||||||
#include "dnodeMgmt.h"
|
#include "dnodeMgmt.h"
|
||||||
#include "dnodeModule.h"
|
#include "dnodeModule.h"
|
||||||
#include "dnodeShell.h"
|
#include "dnodeShell.h"
|
||||||
#include "dnodeSystem.h"
|
#include "dnodeSystem.h"
|
||||||
#include "dnodeVnodeMgmt.h"
|
|
||||||
|
|
||||||
#ifdef CLUSTER
|
#ifdef CLUSTER
|
||||||
#include "account.h"
|
#include "account.h"
|
||||||
|
@ -50,6 +50,11 @@ static int32_t dnodeInitRpcQHandle();
|
||||||
static int32_t dnodeInitQueryQHandle();
|
static int32_t dnodeInitQueryQHandle();
|
||||||
static int32_t dnodeInitTmrCtl();
|
static int32_t dnodeInitTmrCtl();
|
||||||
|
|
||||||
|
|
||||||
|
int32_t (*dnodeInitStorage)() = NULL;
|
||||||
|
void (*dnodeCleanupStorage)() = NULL;
|
||||||
|
int32_t (*dnodeInitPeers)(int32_t numOfThreads) = NULL;
|
||||||
|
|
||||||
void *tsDnodeTmr;
|
void *tsDnodeTmr;
|
||||||
void **tsRpcQhandle;
|
void **tsRpcQhandle;
|
||||||
void *tsDnodeMgmtQhandle;
|
void *tsDnodeMgmtQhandle;
|
||||||
|
@ -93,7 +98,7 @@ void dnodeCleanUpSystem() {
|
||||||
|
|
||||||
dnodeCleanupShell();
|
dnodeCleanupShell();
|
||||||
dnodeCleanUpModules();
|
dnodeCleanUpModules();
|
||||||
dnodeCleanupVnodes();
|
dnodeCleanupMgmt();
|
||||||
taosCloseLogger();
|
taosCloseLogger();
|
||||||
dnodeCleanupStorage();
|
dnodeCleanupStorage();
|
||||||
dnodeCleanVnodesLock();
|
dnodeCleanVnodesLock();
|
||||||
|
@ -154,7 +159,7 @@ int32_t dnodeInitSystem() {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
dnodeInitMgmtIp();
|
// dnodeInitMgmtIp();
|
||||||
|
|
||||||
tsPrintGlobalConfig();
|
tsPrintGlobalConfig();
|
||||||
|
|
||||||
|
@ -193,7 +198,7 @@ int32_t dnodeInitSystem() {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (dnodeOpenVnodes() < 0) {
|
if (dnodeInitMgmt() < 0) {
|
||||||
dError("failed to init vnode storage");
|
dError("failed to init vnode storage");
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -246,12 +251,6 @@ int32_t dnodeInitStorageImp() {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t (*dnodeInitStorage)() = dnodeInitStorageImp;
|
|
||||||
|
|
||||||
void dnodeCleanupStorageImp() {}
|
|
||||||
|
|
||||||
void (*dnodeCleanupStorage)() = dnodeCleanupStorageImp;
|
|
||||||
|
|
||||||
static int32_t dnodeInitQueryQHandle() {
|
static int32_t dnodeInitQueryQHandle() {
|
||||||
int32_t numOfThreads = tsRatioOfQueryThreads * tsNumOfCores * tsNumOfThreadsPerCore;
|
int32_t numOfThreads = tsRatioOfQueryThreads * tsNumOfCores * tsNumOfThreadsPerCore;
|
||||||
if (numOfThreads < 1) {
|
if (numOfThreads < 1) {
|
||||||
|
@ -303,9 +302,3 @@ int32_t (*dnodeCheckSystem)() = dnodeCheckSystemImp;
|
||||||
int32_t dnodeInitPeersImp(int32_t numOfThreads) {
|
int32_t dnodeInitPeersImp(int32_t numOfThreads) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t (*dnodeInitPeers)(int32_t numOfThreads) = dnodeInitPeersImp;
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -1,64 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
|
||||||
*
|
|
||||||
* This program is free software: you can use, redistribute, and/or modify
|
|
||||||
* it under the terms of the GNU Affero General Public License, version 3
|
|
||||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
|
||||||
*
|
|
||||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
|
||||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
||||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
|
||||||
*
|
|
||||||
* You should have received a copy of the GNU Affero General Public License
|
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
*/
|
|
||||||
|
|
||||||
#define _DEFAULT_SOURCE
|
|
||||||
#include "os.h"
|
|
||||||
#include "tlog.h"
|
|
||||||
#include "taoserror.h"
|
|
||||||
#include "dnodeVnodeMgmt.h"
|
|
||||||
|
|
||||||
int32_t dnodeOpenVnodes() {
|
|
||||||
dPrint("open all vnodes");
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t dnodeCleanupVnodes() {
|
|
||||||
dPrint("clean all vnodes");
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool dnodeCheckVnodeExist(int32_t vnode) {
|
|
||||||
dPrint("vnode:%d, check vnode exist", vnode);
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t dnodeCreateVnode(SCreateVnodeMsg *pVnode) {
|
|
||||||
dPrint("vnode:%d, is created", htonl(pVnode->vnode));
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t dnodeDropVnode(int32_t vnode) {
|
|
||||||
dPrint("vnode:%d, is dropped", vnode);
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
void* dnodeGetVnode(int32_t vnode) {
|
|
||||||
dPrint("vnode:%d, get vnode");
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
EVnodeStatus dnodeGetVnodeStatus(int32_t vnode) {
|
|
||||||
dPrint("vnode:%d, get vnode status");
|
|
||||||
return TSDB_VN_STATUS_MASTER;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool dnodeCheckTableExist(int32_t vnode, int32_t sid, int64_t uid) {
|
|
||||||
dPrint("vnode:%d, sid:%d, check table exist");
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t dnodeGetVnodesNum() {
|
|
||||||
return 1;
|
|
||||||
}
|
|
|
@ -17,82 +17,246 @@
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
#include "taoserror.h"
|
#include "taoserror.h"
|
||||||
#include "tlog.h"
|
#include "tlog.h"
|
||||||
#include "tutil.h"
|
#include "trpc.h"
|
||||||
|
#include "tqueue.h"
|
||||||
|
#include "taosmsg.h"
|
||||||
#include "dnodeWrite.h"
|
#include "dnodeWrite.h"
|
||||||
#include "dnodeVnodeMgmt.h"
|
#include "dnodeMgmt.h"
|
||||||
|
|
||||||
void dnodeWriteData(SShellSubmitMsg *pSubmit, void *pConn, void (*callback)(SShellSubmitRspMsg *rsp, void *pConn)) {
|
typedef struct {
|
||||||
dTrace("submit msg is disposed, affectrows:1");
|
int32_t code;
|
||||||
|
int32_t count; // number of vnodes returned result
|
||||||
|
int32_t numOfVnodes; // number of vnodes involved
|
||||||
|
} SRpcContext;
|
||||||
|
|
||||||
SShellSubmitRspMsg result = {0};
|
typedef struct _write {
|
||||||
|
void *pCont;
|
||||||
|
int contLen;
|
||||||
|
SRpcMsg rpcMsg;
|
||||||
|
void *pVnode; // pointer to vnode
|
||||||
|
SRpcContext *pRpcContext; // RPC message context
|
||||||
|
} SWriteMsg;
|
||||||
|
|
||||||
int32_t numOfSid = htonl(pSubmit->numOfSid);
|
typedef struct {
|
||||||
if (numOfSid <= 0) {
|
taos_qset qset; // queue set
|
||||||
dError("invalid num of tables:%d", numOfSid);
|
pthread_t thread; // thread
|
||||||
result.code = TSDB_CODE_INVALID_QUERY_MSG;
|
int workerId; // worker ID
|
||||||
callback(&result, pConn);
|
} SWriteWorker;
|
||||||
|
|
||||||
|
typedef struct _thread_obj {
|
||||||
|
int max; // max number of workers
|
||||||
|
int nextId; // from 0 to max-1, cyclic
|
||||||
|
SWriteWorker *writeWorker;
|
||||||
|
} SWriteWorkerPool;
|
||||||
|
|
||||||
|
static void (*dnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MAX])(SWriteMsg *);
|
||||||
|
static void *dnodeProcessWriteQueue(void *param);
|
||||||
|
static void dnodeHandleIdleWorker(SWriteWorker *pWorker);
|
||||||
|
static void dnodeProcessWriteResult(SWriteMsg *pWrite);
|
||||||
|
static void dnodeProcessSubmitMsg(SWriteMsg *pMsg);
|
||||||
|
static void dnodeProcessCreateTableMsg(SWriteMsg *pMsg);
|
||||||
|
static void dnodeProcessDropTableMsg(SWriteMsg *pMsg);
|
||||||
|
|
||||||
|
SWriteWorkerPool wWorkerPool;
|
||||||
|
|
||||||
|
int dnodeInitWrite() {
|
||||||
|
dnodeProcessWriteMsgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeProcessSubmitMsg;
|
||||||
|
dnodeProcessWriteMsgFp[TSDB_MSG_TYPE_DNODE_CREATE_TABLE] = dnodeProcessCreateTableMsg;
|
||||||
|
dnodeProcessWriteMsgFp[TSDB_MSG_TYPE_DNODE_REMOVE_TABLE] = dnodeProcessDropTableMsg;
|
||||||
|
|
||||||
|
wWorkerPool.max = tsNumOfCores;
|
||||||
|
wWorkerPool.writeWorker = (SWriteWorker *)calloc(sizeof(SWriteWorker), wWorkerPool.max);
|
||||||
|
if (wWorkerPool.writeWorker == NULL) return -1;
|
||||||
|
|
||||||
|
for (int i=0; i<wWorkerPool.max; ++i) {
|
||||||
|
wWorkerPool.writeWorker[i].workerId = i;
|
||||||
}
|
}
|
||||||
|
|
||||||
result.code = 0;
|
return 0;
|
||||||
result.numOfRows = 1;
|
|
||||||
result.affectedRows = 1;
|
|
||||||
result.numOfFailedBlocks = 0;
|
|
||||||
callback(&result, pConn);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t dnodeCreateTable(SDCreateTableMsg *pTable) {
|
void dnodeCleanupWrite() {
|
||||||
if (pTable->tableType == TSDB_TABLE_TYPE_CHILD_TABLE) {
|
|
||||||
dTrace("table:%s, start to create child table, stable:%s", pTable->tableId, pTable->superTableId);
|
|
||||||
} else if (pTable->tableType == TSDB_TABLE_TYPE_NORMAL_TABLE){
|
free(wWorkerPool.writeWorker);
|
||||||
dTrace("table:%s, start to create normal table", pTable->tableId);
|
}
|
||||||
} else if (pTable->tableType == TSDB_TABLE_TYPE_STREAM_TABLE){
|
|
||||||
dTrace("table:%s, start to create stream table", pTable->tableId);
|
void dnodeWrite(SRpcMsg *pMsg) {
|
||||||
|
int leftLen = pMsg->contLen;
|
||||||
|
char *pCont = (char *)pMsg->pCont;
|
||||||
|
int contLen = 0;
|
||||||
|
int numOfVnodes = 0;
|
||||||
|
int32_t vgId = 0;
|
||||||
|
SRpcContext *pRpcContext = NULL;
|
||||||
|
|
||||||
|
// parse head, get number of vnodes;
|
||||||
|
|
||||||
|
if ( numOfVnodes > 1) {
|
||||||
|
pRpcContext = calloc(sizeof(SRpcContext), 1);
|
||||||
|
pRpcContext->numOfVnodes = numOfVnodes;
|
||||||
|
}
|
||||||
|
|
||||||
|
while (leftLen > 0) {
|
||||||
|
// todo: parse head, get vgId, contLen
|
||||||
|
|
||||||
|
// get pVnode from vgId
|
||||||
|
void *pVnode = dnodeGetVnode(vgId);
|
||||||
|
if (pVnode == NULL) {
|
||||||
|
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// put message into queue
|
||||||
|
SWriteMsg writeMsg;
|
||||||
|
writeMsg.rpcMsg = *pMsg;
|
||||||
|
writeMsg.pCont = pCont;
|
||||||
|
writeMsg.contLen = contLen;
|
||||||
|
writeMsg.pRpcContext = pRpcContext;
|
||||||
|
writeMsg.pVnode = pVnode; // pVnode shall be saved for usage later
|
||||||
|
|
||||||
|
taos_queue queue = dnodeGetVnodeWworker(pVnode);
|
||||||
|
taosWriteQitem(queue, &writeMsg);
|
||||||
|
|
||||||
|
// next vnode
|
||||||
|
leftLen -= contLen;
|
||||||
|
pCont -= contLen;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void *dnodeAllocateWriteWorker() {
|
||||||
|
SWriteWorker *pWorker = wWorkerPool.writeWorker + wWorkerPool.nextId;
|
||||||
|
|
||||||
|
if (pWorker->qset == NULL) {
|
||||||
|
pWorker->qset = taosOpenQset();
|
||||||
|
if (pWorker->qset == NULL) return NULL;
|
||||||
|
|
||||||
|
pthread_attr_t thAttr;
|
||||||
|
pthread_attr_init(&thAttr);
|
||||||
|
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
|
||||||
|
|
||||||
|
if (pthread_create(&pWorker->thread, &thAttr, dnodeProcessWriteQueue, pWorker) != 0) {
|
||||||
|
dError("failed to create thread to process read queue, reason:%s", strerror(errno));
|
||||||
|
taosCloseQset(pWorker->qset);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
taos_queue *queue = taosOpenQueue(sizeof(SWriteMsg));
|
||||||
|
if (queue) {
|
||||||
|
taosAddIntoQset(pWorker->qset, queue);
|
||||||
|
wWorkerPool.nextId = (wWorkerPool.nextId + 1) % wWorkerPool.max;
|
||||||
|
}
|
||||||
|
|
||||||
|
return queue;
|
||||||
|
}
|
||||||
|
|
||||||
|
void dnodeFreeWriteWorker(void *wqueue) {
|
||||||
|
|
||||||
|
taosCloseQueue(wqueue);
|
||||||
|
|
||||||
|
// dynamically adjust the number of threads
|
||||||
|
}
|
||||||
|
|
||||||
|
static void *dnodeProcessWriteQueue(void *param) {
|
||||||
|
SWriteWorker *pWorker = (SWriteWorker *)param;
|
||||||
|
taos_qall qall;
|
||||||
|
SWriteMsg writeMsg;
|
||||||
|
int numOfMsgs;
|
||||||
|
|
||||||
|
while (1) {
|
||||||
|
numOfMsgs = taosReadAllQitemsFromQset(pWorker->qset, &qall);
|
||||||
|
if (numOfMsgs <=0) {
|
||||||
|
dnodeHandleIdleWorker(pWorker); // thread exit if no queues anymore
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i=0; i<numOfMsgs; ++i) {
|
||||||
|
// retrieve all items, and write them into WAL
|
||||||
|
taosGetQitem(qall, &writeMsg);
|
||||||
|
|
||||||
|
// walWrite(pVnode->whandle, writeMsg.rpcMsg.msgType, writeMsg.pCont, writeMsg.contLen);
|
||||||
|
}
|
||||||
|
|
||||||
|
// flush WAL file
|
||||||
|
// walFsync(pVnode->whandle);
|
||||||
|
|
||||||
|
// browse all items, and process them one by one
|
||||||
|
taosResetQitems(qall);
|
||||||
|
for (int i=0; i<numOfMsgs; ++i) {
|
||||||
|
taosGetQitem(qall, &writeMsg);
|
||||||
|
|
||||||
|
terrno = 0;
|
||||||
|
if (dnodeProcessWriteMsgFp[writeMsg.rpcMsg.msgType]) {
|
||||||
|
(*dnodeProcessWriteMsgFp[writeMsg.rpcMsg.msgType]) (&writeMsg);
|
||||||
} else {
|
} else {
|
||||||
dError("table:%s, invalid table type:%d", pTable->tableType);
|
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int i = 0; i < pTable->numOfVPeers; ++i) {
|
dnodeProcessWriteResult(&writeMsg);
|
||||||
dTrace("table:%s ip:%s vnode:%d sid:%d", pTable->tableId, taosIpStr(pTable->vpeerDesc[i].ip),
|
|
||||||
pTable->vpeerDesc[i].vnode, pTable->sid);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
SSchema *pSchema = (SSchema *) pTable->data;
|
// free the Qitems;
|
||||||
for (int32_t col = 0; col < pTable->numOfColumns; ++col) {
|
taosFreeQitems(qall);
|
||||||
dTrace("table:%s col index:%d colId:%d bytes:%d type:%d name:%s",
|
|
||||||
pTable->tableId, col, pSchema->colId, pSchema->bytes, pSchema->type, pSchema->name);
|
|
||||||
pSchema++;
|
|
||||||
}
|
|
||||||
for (int32_t col = 0; col < pTable->numOfTags; ++col) {
|
|
||||||
dTrace("table:%s tag index:%d colId:%d bytes:%d type:%d name:%s",
|
|
||||||
pTable->tableId, col, pSchema->colId, pSchema->bytes, pSchema->type, pSchema->name);
|
|
||||||
pSchema++;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
static void dnodeProcessWriteResult(SWriteMsg *pWrite) {
|
||||||
* Remove table from local repository
|
SRpcContext *pRpcContext = pWrite->pRpcContext;
|
||||||
*/
|
int32_t code = 0;
|
||||||
int32_t dnodeDropTable(SDRemoveTableMsg *pTable) {
|
|
||||||
dPrint("table:%s, sid:%d is removed", pTable->tableId, pTable->sid);
|
dnodeReleaseVnode(pWrite->pVnode);
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
|
if (pRpcContext) {
|
||||||
|
if (terrno) {
|
||||||
|
if (pRpcContext->code == 0) pRpcContext->code = terrno;
|
||||||
|
}
|
||||||
|
|
||||||
|
int count = atomic_add_fetch_32(&pRpcContext->count, 1);
|
||||||
|
if (count < pRpcContext->numOfVnodes) {
|
||||||
|
// not over yet, multiple vnodes
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// over, result can be merged now
|
||||||
|
code = pRpcContext->code;
|
||||||
|
} else {
|
||||||
|
code = terrno;
|
||||||
|
}
|
||||||
|
|
||||||
|
SRpcMsg rsp;
|
||||||
|
rsp.handle = pWrite->rpcMsg.handle;
|
||||||
|
rsp.code = code;
|
||||||
|
rsp.pCont = NULL;
|
||||||
|
rpcSendResponse(&rsp);
|
||||||
|
rpcFreeCont(pWrite->rpcMsg.pCont); // free the received message
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
static void dnodeHandleIdleWorker(SWriteWorker *pWorker) {
|
||||||
* Create stream
|
|
||||||
* if stream already exist, update it
|
int num = taosGetQueueNumber(pWorker->qset);
|
||||||
*/
|
|
||||||
int32_t dnodeCreateStream(SDAlterStreamMsg *pStream) {
|
if (num > 0) {
|
||||||
dPrint("stream:%s, is created, ", pStream->tableId);
|
usleep(100);
|
||||||
return TSDB_CODE_SUCCESS;
|
sched_yield();
|
||||||
|
} else {
|
||||||
|
taosCloseQset(pWorker->qset);
|
||||||
|
pWorker->qset = NULL;
|
||||||
|
pthread_exit(NULL);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
static void dnodeProcessSubmitMsg(SWriteMsg *pMsg) {
|
||||||
* Remove all child tables of supertable from local repository
|
|
||||||
*/
|
|
||||||
int32_t dnodeDropSuperTable(SDRemoveSuperTableMsg *pStable) {
|
|
||||||
dPrint("stable:%s, is removed", pStable->tableId);
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void dnodeProcessCreateTableMsg(SWriteMsg *pMsg) {
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
static void dnodeProcessDropTableMsg(SWriteMsg *pMsg) {
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
|
|
@ -28,8 +28,6 @@ int32_t mgmtInitShell();
|
||||||
void mgmtCleanUpShell();
|
void mgmtCleanUpShell();
|
||||||
|
|
||||||
extern int32_t (*mgmtCheckRedirectMsg)(void *pConn);
|
extern int32_t (*mgmtCheckRedirectMsg)(void *pConn);
|
||||||
extern void (*mgmtProcessCfgMnodeMsg)(void *pCont, int32_t contLen, void *ahandle);
|
|
||||||
extern void (*mgmtProcessDropMnodeMsg)(void *pCont, int32_t contLen, void *ahandle);
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* If table not exist, will create it
|
* If table not exist, will create it
|
||||||
|
|
|
@ -48,7 +48,7 @@ static void mgmtSendMsgToDnodeQueueFp(SSchedMsg *sched) {
|
||||||
void *ahandle = sched->ahandle;
|
void *ahandle = sched->ahandle;
|
||||||
int8_t *pCont = sched->msg;
|
int8_t *pCont = sched->msg;
|
||||||
|
|
||||||
dnodeProcessMsgFromMgmt(msgType, pCont, contLen, ahandle, code);
|
// dnodeProcessMsgFromMgmt(msgType, pCont, contLen, ahandle, code);
|
||||||
}
|
}
|
||||||
|
|
||||||
void mgmtSendMsgToDnode(SRpcIpSet *ipSet, int8_t msgType, void *pCont, int32_t contLen, void *ahandle) {
|
void mgmtSendMsgToDnode(SRpcIpSet *ipSet, int8_t msgType, void *pCont, int32_t contLen, void *ahandle) {
|
||||||
|
@ -157,13 +157,19 @@ static void mgmtProcessCreateTableRsp(int8_t msgType, int8_t *pCont, int32_t con
|
||||||
}
|
}
|
||||||
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
rpcSendResponse(info->thandle, code, NULL, 0);
|
SRpcMsg rpcMsg = {0};
|
||||||
|
rpcMsg.code = code;
|
||||||
|
rpcMsg.handle = info->thandle;
|
||||||
|
rpcSendResponse(&rpcMsg);
|
||||||
} else {
|
} else {
|
||||||
if (info->type == TSDB_PROCESS_CREATE_TABLE_GET_META) {
|
if (info->type == TSDB_PROCESS_CREATE_TABLE_GET_META) {
|
||||||
mTrace("table:%s, start to process get meta", pTable->tableId);
|
mTrace("table:%s, start to process get meta", pTable->tableId);
|
||||||
mgmtProcessGetTableMeta(pTable, thandle);
|
mgmtProcessGetTableMeta(pTable, thandle);
|
||||||
} else {
|
} else {
|
||||||
rpcSendResponse(info->thandle, code, NULL, 0);
|
SRpcMsg rpcMsg = {0};
|
||||||
|
rpcMsg.code = code;
|
||||||
|
rpcMsg.handle = info->thandle;
|
||||||
|
rpcSendResponse(&rpcMsg);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -236,7 +242,11 @@ static void mgmtProcessDnodeGrantMsg(void *pCont, void *thandle) {
|
||||||
mgmtUpdateGrantInfoFp(pCont);
|
mgmtUpdateGrantInfoFp(pCont);
|
||||||
mTrace("grant info is updated");
|
mTrace("grant info is updated");
|
||||||
}
|
}
|
||||||
rpcSendResponse(thandle, TSDB_CODE_SUCCESS, NULL, 0);
|
|
||||||
|
SRpcMsg rpcMsg = {0};
|
||||||
|
rpcMsg.code = TSDB_CODE_SUCCESS;
|
||||||
|
rpcMsg.handle = thandle;
|
||||||
|
rpcSendResponse(&rpcMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
void mgmtProcessMsgFromDnode(char msgType, void *pCont, int32_t contLen, void *pConn, int32_t code) {
|
void mgmtProcessMsgFromDnode(char msgType, void *pCont, int32_t contLen, void *pConn, int32_t code) {
|
||||||
|
@ -368,7 +378,7 @@ int32_t mgmtSendCfgDnodeMsg(char *cont) {
|
||||||
//#else
|
//#else
|
||||||
// (void)tsCfgDynamicOptions(pCfg->config);
|
// (void)tsCfgDynamicOptions(pCfg->config);
|
||||||
//#endif
|
//#endif
|
||||||
// return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mgmtInitDnodeInt() {
|
int32_t mgmtInitDnodeInt() {
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -133,7 +133,7 @@ int32_t mgmtCreateTable(SCreateTableMsg *pCreate, int32_t contLen, void *thandle
|
||||||
SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct);
|
SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct);
|
||||||
assert(pAcct != NULL);
|
assert(pAcct != NULL);
|
||||||
|
|
||||||
int32_t code = mgmtCheckTableLimit(pAcct, pCreate);
|
int32_t code = mgmtCheckTableLimit(pAcct, pCreate->numOfColumns);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
mError("table:%s, failed to create table, table num exceed the limit", pCreate->tableId);
|
mError("table:%s, failed to create table, table num exceed the limit", pCreate->tableId);
|
||||||
return code;
|
return code;
|
||||||
|
|
|
@ -359,7 +359,7 @@ void rpcSendRequest(void *shandle, SRpcIpSet *pIpSet, SRpcMsg *pMsg) {
|
||||||
// connection type is application specific.
|
// connection type is application specific.
|
||||||
// for TDengine, all the query, show commands shall have TCP connection
|
// for TDengine, all the query, show commands shall have TCP connection
|
||||||
char type = pMsg->msgType;
|
char type = pMsg->msgType;
|
||||||
if (type == TSDB_MSG_TYPE_DNODE_QUERY || type == TSDB_MSG_TYPE_DNODE_RETRIEVE ||
|
if (type == TSDB_MSG_TYPE_QUERY || type == TSDB_MSG_TYPE_RETRIEVE ||
|
||||||
type == TSDB_MSG_TYPE_STABLE_META || type == TSDB_MSG_TYPE_MULTI_TABLE_META ||
|
type == TSDB_MSG_TYPE_STABLE_META || type == TSDB_MSG_TYPE_MULTI_TABLE_META ||
|
||||||
type == TSDB_MSG_TYPE_SHOW )
|
type == TSDB_MSG_TYPE_SHOW )
|
||||||
pContext->connType = RPC_CONN_TCPC;
|
pContext->connType = RPC_CONN_TCPC;
|
||||||
|
|
|
@ -1,9 +1,7 @@
|
||||||
cmake_minimum_required(VERSION 2.8)
|
CMAKE_MINIMUM_REQUIRED(VERSION 2.8)
|
||||||
|
PROJECT(TDengine)
|
||||||
|
|
||||||
add_subdirectory(common)
|
ADD_SUBDIRECTORY(common)
|
||||||
|
ADD_SUBDIRECTORY(tsdb)
|
||||||
add_subdirectory(tsdb)
|
# ENABLE_TESTING()
|
||||||
|
# ADD_SUBDIRECTORY(tests)
|
||||||
enable_testing()
|
|
||||||
|
|
||||||
# add_subdirectory(tests)
|
|
||||||
|
|
|
@ -1,8 +1,13 @@
|
||||||
aux_source_directory(${CMAKE_CURRENT_SOURCE_DIR}/src SOURCE_LIST)
|
CMAKE_MINIMUM_REQUIRED(VERSION 2.8)
|
||||||
|
PROJECT(TDengine)
|
||||||
|
|
||||||
list(REMOVE_ITEM SOURCE_LIST ${CMAKE_CURRENT_SOURCE_DIR}/src/vnodePeer.c)
|
IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
|
||||||
|
INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc)
|
||||||
|
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc)
|
||||||
|
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc)
|
||||||
|
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/dnode/inc)
|
||||||
|
|
||||||
message(STATUS "Common source file ${SOURCE_LIST}")
|
INCLUDE_DIRECTORIES(inc)
|
||||||
|
AUX_SOURCE_DIRECTORY(src SRC)
|
||||||
add_library(common ${SOURCE_LIST})
|
ADD_LIBRARY(common ${SRC})
|
||||||
target_include_directories(common PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/inc")
|
ENDIF ()
|
||||||
|
|
|
@ -1,13 +1,18 @@
|
||||||
aux_source_directory(${CMAKE_CURRENT_SOURCE_DIR}/src SOURCE_LIST)
|
CMAKE_MINIMUM_REQUIRED(VERSION 2.8)
|
||||||
|
PROJECT(TDengine)
|
||||||
|
|
||||||
add_library(tsdb STATIC ${SOURCE_LIST})
|
IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
|
||||||
target_link_libraries(tsdb common tutil)
|
INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc)
|
||||||
|
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc)
|
||||||
|
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc)
|
||||||
|
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/vnode/common/inc)
|
||||||
|
|
||||||
target_include_directories(tsdb
|
INCLUDE_DIRECTORIES(inc)
|
||||||
PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/inc"
|
AUX_SOURCE_DIRECTORY(src SRC)
|
||||||
PUBLIC "${CMAKE_SOURCE_DIR}/src/inc"
|
|
||||||
PUBLIC "${CMAKE_SOURCE_DIR}/src/util/inc"
|
|
||||||
PUBLIC "${CMAKE_SOURCE_DIR}/src/os/linux/inc"
|
|
||||||
)
|
|
||||||
|
|
||||||
add_subdirectory(tests)
|
ADD_LIBRARY(tsdb ${SRC})
|
||||||
|
TARGET_LINK_LIBRARIES(tsdb common tutil)
|
||||||
|
|
||||||
|
# Someone has no gtest directory, so comment it
|
||||||
|
#ADD_SUBDIRECTORY(tests)
|
||||||
|
ENDIF ()
|
||||||
|
|
Loading…
Reference in New Issue