Merge remote-tracking branch 'origin/2.0' into refact/slguan
This commit is contained in:
commit
260856fb0f
|
@ -85,6 +85,7 @@ void rpcSendRequest(void *thandle, SRpcIpSet *pIpSet, SRpcMsg *pMsg);
|
||||||
void rpcSendResponse(SRpcMsg *pMsg);
|
void rpcSendResponse(SRpcMsg *pMsg);
|
||||||
void rpcSendRedirectRsp(void *pConn, SRpcIpSet *pIpSet);
|
void rpcSendRedirectRsp(void *pConn, SRpcIpSet *pIpSet);
|
||||||
int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo);
|
int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo);
|
||||||
|
void rpcSendRecv(void *shandle, SRpcIpSet *pIpSet, SRpcMsg *pOut, SRpcMsg *pRsp);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -83,6 +83,9 @@ typedef struct {
|
||||||
int8_t oldInUse; // server IP inUse passed by app
|
int8_t oldInUse; // server IP inUse passed by app
|
||||||
int8_t redirect; // flag to indicate redirect
|
int8_t redirect; // flag to indicate redirect
|
||||||
int8_t connType; // connection type
|
int8_t connType; // connection type
|
||||||
|
SRpcMsg *pRsp; // for synchronous API
|
||||||
|
tsem_t *pSem; // for synchronous API
|
||||||
|
SRpcIpSet *pSet; // for synchronous API
|
||||||
char msg[0]; // RpcHead starts from here
|
char msg[0]; // RpcHead starts from here
|
||||||
} SRpcReqContext;
|
} SRpcReqContext;
|
||||||
|
|
||||||
|
@ -183,6 +186,7 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext);
|
||||||
static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code);
|
static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code);
|
||||||
static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code);
|
static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code);
|
||||||
static void rpcSendMsgToPeer(SRpcConn *pConn, void *data, int dataLen);
|
static void rpcSendMsgToPeer(SRpcConn *pConn, void *data, int dataLen);
|
||||||
|
static void rpcSendReqHead(SRpcConn *pConn);
|
||||||
|
|
||||||
static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv);
|
static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv);
|
||||||
static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead);
|
static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead);
|
||||||
|
@ -415,12 +419,12 @@ void rpcSendResponse(SRpcMsg *pMsg) {
|
||||||
rpcFreeMsg(pConn->pRspMsg);
|
rpcFreeMsg(pConn->pRspMsg);
|
||||||
pConn->pRspMsg = msg;
|
pConn->pRspMsg = msg;
|
||||||
pConn->rspMsgLen = msgLen;
|
pConn->rspMsgLen = msgLen;
|
||||||
if (pHead->content[0] == TSDB_CODE_ACTION_IN_PROGRESS) pConn->inTranId--;
|
if (pMsg->code == TSDB_CODE_ACTION_IN_PROGRESS) pConn->inTranId--;
|
||||||
|
|
||||||
rpcUnlockConn(pConn);
|
rpcUnlockConn(pConn);
|
||||||
|
|
||||||
taosTmrStopA(&pConn->pTimer);
|
taosTmrStopA(&pConn->pTimer);
|
||||||
taosTmrReset(rpcProcessIdleTimer, pRpc->idleTime, pConn, pRpc->tmrCtrl, &pConn->pIdleTimer);
|
// taosTmrReset(rpcProcessIdleTimer, pRpc->idleTime, pConn, pRpc->tmrCtrl, &pConn->pIdleTimer);
|
||||||
rpcSendMsgToPeer(pConn, msg, msgLen);
|
rpcSendMsgToPeer(pConn, msg, msgLen);
|
||||||
pConn->secured = 1; // connection shall be secured
|
pConn->secured = 1; // connection shall be secured
|
||||||
|
|
||||||
|
@ -456,6 +460,26 @@ int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void rpcSendRecv(void *shandle, SRpcIpSet *pIpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) {
|
||||||
|
SRpcReqContext *pContext;
|
||||||
|
pContext = (SRpcReqContext *) (pMsg->pCont-sizeof(SRpcHead)-sizeof(SRpcReqContext));
|
||||||
|
|
||||||
|
memset(pRsp, 0, sizeof(SRpcMsg));
|
||||||
|
|
||||||
|
tsem_t sem;
|
||||||
|
tsem_init(&sem, 0, 0);
|
||||||
|
pContext->pSem = &sem;
|
||||||
|
pContext->pRsp = pRsp;
|
||||||
|
pContext->pSet = pIpSet;
|
||||||
|
|
||||||
|
rpcSendRequest(shandle, pIpSet, pMsg);
|
||||||
|
|
||||||
|
tsem_wait(&sem);
|
||||||
|
tsem_destroy(&sem);
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
static void rpcFreeMsg(void *msg) {
|
static void rpcFreeMsg(void *msg) {
|
||||||
if ( msg ) {
|
if ( msg ) {
|
||||||
char *temp = (char *)msg - sizeof(SRpcReqContext);
|
char *temp = (char *)msg - sizeof(SRpcReqContext);
|
||||||
|
@ -661,8 +685,12 @@ static int rpcProcessReqHead(SRpcConn *pConn, SRpcHead *pHead) {
|
||||||
|
|
||||||
if (pConn->inTranId == pHead->tranId) {
|
if (pConn->inTranId == pHead->tranId) {
|
||||||
if (pConn->inType == pHead->msgType) {
|
if (pConn->inType == pHead->msgType) {
|
||||||
|
if (pHead->code == 0) {
|
||||||
tTrace("%s %p, %s is retransmitted", pRpc->label, pConn, taosMsg[pHead->msgType]);
|
tTrace("%s %p, %s is retransmitted", pRpc->label, pConn, taosMsg[pHead->msgType]);
|
||||||
rpcSendQuickRsp(pConn, TSDB_CODE_ACTION_IN_PROGRESS);
|
rpcSendQuickRsp(pConn, TSDB_CODE_ACTION_IN_PROGRESS);
|
||||||
|
} else {
|
||||||
|
// do nothing, it is heart beat from client
|
||||||
|
}
|
||||||
} else if (pConn->inType == 0) {
|
} else if (pConn->inType == 0) {
|
||||||
tTrace("%s %p, %s is already processed, tranId:%d", pRpc->label, pConn,
|
tTrace("%s %p, %s is already processed, tranId:%d", pRpc->label, pConn,
|
||||||
taosMsg[pHead->msgType], pConn->inTranId);
|
taosMsg[pHead->msgType], pConn->inTranId);
|
||||||
|
@ -703,22 +731,23 @@ static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) {
|
||||||
return TSDB_CODE_INVALID_RESPONSE_TYPE;
|
return TSDB_CODE_INVALID_RESPONSE_TYPE;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (*pHead->content == TSDB_CODE_NOT_READY) {
|
if (pHead->code == TSDB_CODE_NOT_READY) {
|
||||||
return TSDB_CODE_ALREADY_PROCESSED;
|
return TSDB_CODE_ALREADY_PROCESSED;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosTmrStopA(&pConn->pTimer);
|
taosTmrStopA(&pConn->pTimer);
|
||||||
pConn->retry = 0;
|
pConn->retry = 0;
|
||||||
|
|
||||||
if (*pHead->content == TSDB_CODE_ACTION_IN_PROGRESS) {
|
if (pHead->code == TSDB_CODE_ACTION_IN_PROGRESS) {
|
||||||
if (pConn->tretry <= tsRpcMaxRetry) {
|
if (pConn->tretry <= tsRpcMaxRetry) {
|
||||||
pConn->tretry++;
|
|
||||||
tTrace("%s %p, peer is still processing the transaction", pRpc->label, pConn);
|
tTrace("%s %p, peer is still processing the transaction", pRpc->label, pConn);
|
||||||
taosTmrReset(rpcProcessRetryTimer, tsRpcProgressTime, pConn, pRpc->tmrCtrl, &pConn->pTimer);
|
pConn->tretry++;
|
||||||
|
rpcSendReqHead(pConn);
|
||||||
|
taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer);
|
||||||
return TSDB_CODE_ALREADY_PROCESSED;
|
return TSDB_CODE_ALREADY_PROCESSED;
|
||||||
} else {
|
} else {
|
||||||
// peer still in processing, give up
|
// peer still in processing, give up
|
||||||
*pHead->content = TSDB_CODE_TOO_SLOW;
|
return TSDB_CODE_TOO_SLOW;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -779,6 +808,7 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) {
|
||||||
if ( rpcIsReq(pHead->msgType) ) {
|
if ( rpcIsReq(pHead->msgType) ) {
|
||||||
terrno = rpcProcessReqHead(pConn, pHead);
|
terrno = rpcProcessReqHead(pConn, pHead);
|
||||||
pConn->connType = pRecv->connType;
|
pConn->connType = pRecv->connType;
|
||||||
|
taosTmrReset(rpcProcessIdleTimer, pRpc->idleTime, pConn, pRpc->tmrCtrl, &pConn->pIdleTimer);
|
||||||
} else {
|
} else {
|
||||||
terrno = rpcProcessRspHead(pConn, pHead);
|
terrno = rpcProcessRspHead(pConn, pHead);
|
||||||
}
|
}
|
||||||
|
@ -801,6 +831,18 @@ static void rpcProcessBrokenLink(SRpcConn *pConn) {
|
||||||
taosTmrStart(rpcProcessConnError, 0, pContext, pRpc->tmrCtrl);
|
taosTmrStart(rpcProcessConnError, 0, pContext, pRpc->tmrCtrl);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pConn->inType) {
|
||||||
|
// if there are pending request, notify the app
|
||||||
|
tTrace("%s %p, connection is gone, notify the app", pRpc->label, pConn);
|
||||||
|
SRpcMsg rpcMsg;
|
||||||
|
rpcMsg.pCont = NULL;
|
||||||
|
rpcMsg.contLen = 0;
|
||||||
|
rpcMsg.handle = pConn;
|
||||||
|
rpcMsg.msgType = pConn->inType;
|
||||||
|
rpcMsg.code = TSDB_CODE_NETWORK_UNAVAIL;
|
||||||
|
(*(pRpc->cfp))(&rpcMsg);
|
||||||
|
}
|
||||||
|
|
||||||
rpcCloseConn(pConn);
|
rpcCloseConn(pConn);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -824,7 +866,7 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) {
|
||||||
pConn = rpcProcessMsgHead(pRpc, pRecv);
|
pConn = rpcProcessMsgHead(pRpc, pRecv);
|
||||||
|
|
||||||
if (pHead->msgType < TSDB_MSG_TYPE_CM_HEARTBEAT || (rpcDebugFlag & 16)) {
|
if (pHead->msgType < TSDB_MSG_TYPE_CM_HEARTBEAT || (rpcDebugFlag & 16)) {
|
||||||
tTrace("%s %p, %s received from 0x%x:%hu, parse code:%x len:%d sig:0x%08x:0x%08x:%d",
|
tTrace("%s %p, %s received from 0x%x:%hu, parse code:0x%x len:%d sig:0x%08x:0x%08x:%d",
|
||||||
pRpc->label, pConn, taosMsg[pHead->msgType], pRecv->ip, pRecv->port, terrno,
|
pRpc->label, pConn, taosMsg[pHead->msgType], pRecv->ip, pRecv->port, terrno,
|
||||||
pRecv->msgLen, pHead->sourceId, pHead->destId, pHead->tranId, pHead->port);
|
pRecv->msgLen, pHead->sourceId, pHead->destId, pHead->tranId, pHead->port);
|
||||||
}
|
}
|
||||||
|
@ -845,6 +887,26 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) {
|
||||||
return pConn;
|
return pConn;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void rpcNotifyClient(SRpcReqContext *pContext, SRpcMsg *pMsg) {
|
||||||
|
SRpcInfo *pRpc = pContext->pRpc;
|
||||||
|
|
||||||
|
if (pContext->pRsp) {
|
||||||
|
// for synchronous API
|
||||||
|
tsem_post(pContext->pSem);
|
||||||
|
memcpy(pContext->pSet, &pContext->ipSet, sizeof(SRpcIpSet));
|
||||||
|
memcpy(pContext->pRsp, pMsg, sizeof(SRpcMsg));
|
||||||
|
} else {
|
||||||
|
// for asynchronous API
|
||||||
|
if (pRpc->ufp && (pContext->ipSet.inUse != pContext->oldInUse || pContext->redirect))
|
||||||
|
(*pRpc->ufp)(pContext->ahandle, &pContext->ipSet); // notify the update of ipSet
|
||||||
|
|
||||||
|
(*pRpc->cfp)(pMsg);
|
||||||
|
}
|
||||||
|
|
||||||
|
// free the request message
|
||||||
|
rpcFreeCont(pContext->pCont);
|
||||||
|
}
|
||||||
|
|
||||||
static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) {
|
static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) {
|
||||||
|
|
||||||
SRpcInfo *pRpc = pConn->pRpc;
|
SRpcInfo *pRpc = pConn->pRpc;
|
||||||
|
@ -877,10 +939,7 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) {
|
||||||
tTrace("%s %p, redirect is received, numOfIps:%d", pRpc->label, pConn, pContext->ipSet.numOfIps);
|
tTrace("%s %p, redirect is received, numOfIps:%d", pRpc->label, pConn, pContext->ipSet.numOfIps);
|
||||||
rpcSendReqToServer(pRpc, pContext);
|
rpcSendReqToServer(pRpc, pContext);
|
||||||
} else {
|
} else {
|
||||||
if ( pRpc->ufp && (pContext->ipSet.inUse != pContext->oldInUse || pContext->redirect) )
|
rpcNotifyClient(pContext, &rpcMsg);
|
||||||
(*pRpc->ufp)(pContext->ahandle, &pContext->ipSet); // notify the update of ipSet
|
|
||||||
(*pRpc->cfp)(&rpcMsg);
|
|
||||||
rpcFreeCont(pContext->pCont); // free the request msg
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -894,7 +953,7 @@ static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code) {
|
||||||
pHead = (SRpcHead *)msg;
|
pHead = (SRpcHead *)msg;
|
||||||
pHead->version = 1;
|
pHead->version = 1;
|
||||||
pHead->msgType = pConn->inType+1;
|
pHead->msgType = pConn->inType+1;
|
||||||
pHead->spi = 0;
|
pHead->spi = pConn->spi;
|
||||||
pHead->encrypt = 0;
|
pHead->encrypt = 0;
|
||||||
pHead->tranId = pConn->inTranId;
|
pHead->tranId = pConn->inTranId;
|
||||||
pHead->sourceId = pConn->ownId;
|
pHead->sourceId = pConn->ownId;
|
||||||
|
@ -903,7 +962,29 @@ static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code) {
|
||||||
memcpy(pHead->user, pConn->user, tListLen(pHead->user));
|
memcpy(pHead->user, pConn->user, tListLen(pHead->user));
|
||||||
pHead->code = htonl(code);
|
pHead->code = htonl(code);
|
||||||
|
|
||||||
rpcSendMsgToPeer(pConn, msg, 0);
|
rpcSendMsgToPeer(pConn, msg, sizeof(SRpcHead));
|
||||||
|
pConn->secured = 1; // connection shall be secured
|
||||||
|
}
|
||||||
|
|
||||||
|
static void rpcSendReqHead(SRpcConn *pConn) {
|
||||||
|
char msg[RPC_MSG_OVERHEAD];
|
||||||
|
SRpcHead *pHead;
|
||||||
|
|
||||||
|
// set msg header
|
||||||
|
memset(msg, 0, sizeof(SRpcHead));
|
||||||
|
pHead = (SRpcHead *)msg;
|
||||||
|
pHead->version = 1;
|
||||||
|
pHead->msgType = pConn->outType;
|
||||||
|
pHead->spi = pConn->spi;
|
||||||
|
pHead->encrypt = 0;
|
||||||
|
pHead->tranId = pConn->outTranId;
|
||||||
|
pHead->sourceId = pConn->ownId;
|
||||||
|
pHead->destId = pConn->peerId;
|
||||||
|
pHead->linkUid = pConn->linkUid;
|
||||||
|
memcpy(pHead->user, pConn->user, tListLen(pHead->user));
|
||||||
|
pHead->code = 1;
|
||||||
|
|
||||||
|
rpcSendMsgToPeer(pConn, msg, sizeof(SRpcHead));
|
||||||
}
|
}
|
||||||
|
|
||||||
static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code) {
|
static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code) {
|
||||||
|
@ -999,9 +1080,9 @@ static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) {
|
||||||
pConn->peerPort, msgLen, pHead->sourceId, pHead->destId, pHead->tranId);
|
pConn->peerPort, msgLen, pHead->sourceId, pHead->destId, pHead->tranId);
|
||||||
} else {
|
} else {
|
||||||
if (pHead->msgType < TSDB_MSG_TYPE_CM_HEARTBEAT || (rpcDebugFlag & 16))
|
if (pHead->msgType < TSDB_MSG_TYPE_CM_HEARTBEAT || (rpcDebugFlag & 16))
|
||||||
tTrace( "%s %p, %s is sent to %s:%hu, code:%u len:%d sig:0x%08x:0x%08x:%d",
|
tTrace( "%s %p, %s is sent to %s:%hu, code:0x%x len:%d sig:0x%08x:0x%08x:%d",
|
||||||
pRpc->label, pConn, taosMsg[pHead->msgType], pConn->peerIpstr, pConn->peerPort,
|
pRpc->label, pConn, taosMsg[pHead->msgType], pConn->peerIpstr, pConn->peerPort,
|
||||||
(uint8_t)pHead->content[0], msgLen, pHead->sourceId, pHead->destId, pHead->tranId);
|
pHead->code, msgLen, pHead->sourceId, pHead->destId, pHead->tranId);
|
||||||
}
|
}
|
||||||
|
|
||||||
writtenLen = (*taosSendData[pConn->connType])(pConn->peerIp, pConn->peerPort, pHead, msgLen, pConn->chandle);
|
writtenLen = (*taosSendData[pConn->connType])(pConn->peerIp, pConn->peerPort, pHead, msgLen, pConn->chandle);
|
||||||
|
@ -1027,8 +1108,8 @@ static void rpcProcessConnError(void *param, void *id) {
|
||||||
rpcMsg.code = pContext->code;
|
rpcMsg.code = pContext->code;
|
||||||
rpcMsg.pCont = NULL;
|
rpcMsg.pCont = NULL;
|
||||||
rpcMsg.contLen = 0;
|
rpcMsg.contLen = 0;
|
||||||
(*(pRpc->cfp))(&rpcMsg);
|
|
||||||
rpcFreeCont(pContext->pCont); // free the request msg
|
rpcNotifyClient(pContext, &rpcMsg);
|
||||||
} else {
|
} else {
|
||||||
// move to next IP
|
// move to next IP
|
||||||
pContext->ipSet.inUse++;
|
pContext->ipSet.inUse++;
|
||||||
|
@ -1079,6 +1160,17 @@ static void rpcProcessIdleTimer(void *param, void *tmrId) {
|
||||||
|
|
||||||
if (pConn->user[0]) {
|
if (pConn->user[0]) {
|
||||||
tTrace("%s %p, close the connection since no activity", pRpc->label, pConn);
|
tTrace("%s %p, close the connection since no activity", pRpc->label, pConn);
|
||||||
|
if (pConn->inType && pRpc->cfp) {
|
||||||
|
// if there are pending request, notify the app
|
||||||
|
tTrace("%s %p, notify the app, connection is gone", pRpc->label, pConn);
|
||||||
|
SRpcMsg rpcMsg;
|
||||||
|
rpcMsg.pCont = NULL;
|
||||||
|
rpcMsg.contLen = 0;
|
||||||
|
rpcMsg.handle = pConn;
|
||||||
|
rpcMsg.msgType = pConn->inType;
|
||||||
|
rpcMsg.code = TSDB_CODE_NETWORK_UNAVAIL;
|
||||||
|
(*(pRpc->cfp))(&rpcMsg);
|
||||||
|
}
|
||||||
rpcCloseConn(pConn);
|
rpcCloseConn(pConn);
|
||||||
} else {
|
} else {
|
||||||
tTrace("%s %p, idle timer:%p not processed", pRpc->label, pConn, tmrId);
|
tTrace("%s %p, idle timer:%p not processed", pRpc->label, pConn, tmrId);
|
||||||
|
|
|
@ -11,6 +11,10 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
|
||||||
ADD_EXECUTABLE(rclient ${CLIENT_SRC})
|
ADD_EXECUTABLE(rclient ${CLIENT_SRC})
|
||||||
TARGET_LINK_LIBRARIES(rclient trpc)
|
TARGET_LINK_LIBRARIES(rclient trpc)
|
||||||
|
|
||||||
|
LIST(APPEND SCLIENT_SRC ./rsclient.c)
|
||||||
|
ADD_EXECUTABLE(rsclient ${SCLIENT_SRC})
|
||||||
|
TARGET_LINK_LIBRARIES(rsclient trpc)
|
||||||
|
|
||||||
LIST(APPEND SERVER_SRC ./rserver.c)
|
LIST(APPEND SERVER_SRC ./rserver.c)
|
||||||
ADD_EXECUTABLE(rserver ${SERVER_SRC})
|
ADD_EXECUTABLE(rserver ${SERVER_SRC})
|
||||||
TARGET_LINK_LIBRARIES(rserver trpc)
|
TARGET_LINK_LIBRARIES(rserver trpc)
|
||||||
|
|
|
@ -40,7 +40,7 @@ typedef struct {
|
||||||
void *pRpc;
|
void *pRpc;
|
||||||
} SInfo;
|
} SInfo;
|
||||||
|
|
||||||
void processResponse(SRpcMsg *pMsg) {
|
static void processResponse(SRpcMsg *pMsg) {
|
||||||
SInfo *pInfo = (SInfo *)pMsg->handle;
|
SInfo *pInfo = (SInfo *)pMsg->handle;
|
||||||
tTrace("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen, pMsg->code);
|
tTrace("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen, pMsg->code);
|
||||||
|
|
||||||
|
@ -49,16 +49,16 @@ void processResponse(SRpcMsg *pMsg) {
|
||||||
sem_post(&pInfo->rspSem);
|
sem_post(&pInfo->rspSem);
|
||||||
}
|
}
|
||||||
|
|
||||||
void processUpdateIpSet(void *handle, SRpcIpSet *pIpSet) {
|
static void processUpdateIpSet(void *handle, SRpcIpSet *pIpSet) {
|
||||||
SInfo *pInfo = (SInfo *)handle;
|
SInfo *pInfo = (SInfo *)handle;
|
||||||
|
|
||||||
tTrace("thread:%d, ip set is changed, index:%d", pInfo->index, pIpSet->inUse);
|
tTrace("thread:%d, ip set is changed, index:%d", pInfo->index, pIpSet->inUse);
|
||||||
pInfo->ipSet = *pIpSet;
|
pInfo->ipSet = *pIpSet;
|
||||||
}
|
}
|
||||||
|
|
||||||
int tcount = 0;
|
static int tcount = 0;
|
||||||
|
|
||||||
void *sendRequest(void *param) {
|
static void *sendRequest(void *param) {
|
||||||
SInfo *pInfo = (SInfo *)param;
|
SInfo *pInfo = (SInfo *)param;
|
||||||
SRpcMsg rpcMsg;
|
SRpcMsg rpcMsg;
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,212 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include <stdio.h>
|
||||||
|
#include <stdlib.h>
|
||||||
|
#include <string.h>
|
||||||
|
#include <sys/time.h>
|
||||||
|
#include <pthread.h>
|
||||||
|
#include <errno.h>
|
||||||
|
#include <signal.h>
|
||||||
|
#include <semaphore.h>
|
||||||
|
#include "os.h"
|
||||||
|
#include "tlog.h"
|
||||||
|
#include "trpc.h"
|
||||||
|
#include "taoserror.h"
|
||||||
|
#include <stdint.h>
|
||||||
|
#include <unistd.h>
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int index;
|
||||||
|
SRpcIpSet ipSet;
|
||||||
|
int num;
|
||||||
|
int numOfReqs;
|
||||||
|
int msgSize;
|
||||||
|
sem_t rspSem;
|
||||||
|
sem_t *pOverSem;
|
||||||
|
pthread_t thread;
|
||||||
|
void *pRpc;
|
||||||
|
} SInfo;
|
||||||
|
|
||||||
|
static void processUpdateIpSet(void *handle, SRpcIpSet *pIpSet) {
|
||||||
|
SInfo *pInfo = (SInfo *)handle;
|
||||||
|
|
||||||
|
tTrace("thread:%d, ip set is changed, index:%d", pInfo->index, pIpSet->inUse);
|
||||||
|
pInfo->ipSet = *pIpSet;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int tcount = 0;
|
||||||
|
static int terror = 0;
|
||||||
|
|
||||||
|
static void *sendRequest(void *param) {
|
||||||
|
SInfo *pInfo = (SInfo *)param;
|
||||||
|
SRpcMsg rpcMsg, rspMsg;
|
||||||
|
|
||||||
|
tTrace("thread:%d, start to send request", pInfo->index);
|
||||||
|
|
||||||
|
while ( pInfo->numOfReqs == 0 || pInfo->num < pInfo->numOfReqs) {
|
||||||
|
pInfo->num++;
|
||||||
|
rpcMsg.pCont = rpcMallocCont(pInfo->msgSize);
|
||||||
|
rpcMsg.contLen = pInfo->msgSize;
|
||||||
|
rpcMsg.handle = pInfo;
|
||||||
|
rpcMsg.msgType = 1;
|
||||||
|
tTrace("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num);
|
||||||
|
|
||||||
|
rpcSendRecv(pInfo->pRpc, &pInfo->ipSet, &rpcMsg, &rspMsg);
|
||||||
|
|
||||||
|
// handle response
|
||||||
|
if (rspMsg.code != 0) terror++;
|
||||||
|
|
||||||
|
tTrace("thread:%d, rspLen:%d code:%d", pInfo->index, rspMsg.contLen, rspMsg.code);
|
||||||
|
|
||||||
|
rpcFreeCont(rspMsg.pCont);
|
||||||
|
|
||||||
|
if ( pInfo->num % 20000 == 0 )
|
||||||
|
tPrint("thread:%d, %d requests have been sent", pInfo->index, pInfo->num);
|
||||||
|
}
|
||||||
|
|
||||||
|
tTrace("thread:%d, it is over", pInfo->index);
|
||||||
|
tcount++;
|
||||||
|
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
int main(int argc, char *argv[]) {
|
||||||
|
SRpcInit rpcInit;
|
||||||
|
SRpcIpSet ipSet;
|
||||||
|
int msgSize = 128;
|
||||||
|
int numOfReqs = 0;
|
||||||
|
int appThreads = 1;
|
||||||
|
char serverIp[40] = "127.0.0.1";
|
||||||
|
struct timeval systemTime;
|
||||||
|
int64_t startTime, endTime;
|
||||||
|
pthread_attr_t thattr;
|
||||||
|
|
||||||
|
// server info
|
||||||
|
ipSet.numOfIps = 1;
|
||||||
|
ipSet.inUse = 0;
|
||||||
|
ipSet.port = 7000;
|
||||||
|
ipSet.ip[0] = inet_addr(serverIp);
|
||||||
|
ipSet.ip[1] = inet_addr("192.168.0.1");
|
||||||
|
|
||||||
|
// client info
|
||||||
|
memset(&rpcInit, 0, sizeof(rpcInit));
|
||||||
|
rpcInit.localIp = "0.0.0.0";
|
||||||
|
rpcInit.localPort = 0;
|
||||||
|
rpcInit.label = "APP";
|
||||||
|
rpcInit.numOfThreads = 1;
|
||||||
|
// rpcInit.cfp = processResponse;
|
||||||
|
rpcInit.ufp = processUpdateIpSet;
|
||||||
|
rpcInit.sessions = 100;
|
||||||
|
rpcInit.idleTime = tsShellActivityTimer*1000;
|
||||||
|
rpcInit.user = "michael";
|
||||||
|
rpcInit.secret = "mypassword";
|
||||||
|
rpcInit.ckey = "key";
|
||||||
|
rpcInit.spi = 1;
|
||||||
|
rpcInit.connType = TAOS_CONN_CLIENT;
|
||||||
|
|
||||||
|
for (int i=1; i<argc; ++i) {
|
||||||
|
if (strcmp(argv[i], "-p")==0 && i < argc-1) {
|
||||||
|
ipSet.port = atoi(argv[++i]);
|
||||||
|
} else if (strcmp(argv[i], "-i") ==0 && i < argc-1) {
|
||||||
|
ipSet.ip[0] = inet_addr(argv[++i]);
|
||||||
|
} else if (strcmp(argv[i], "-l")==0 && i < argc-1) {
|
||||||
|
strcpy(rpcInit.localIp, argv[++i]);
|
||||||
|
} else if (strcmp(argv[i], "-t")==0 && i < argc-1) {
|
||||||
|
rpcInit.numOfThreads = atoi(argv[++i]);
|
||||||
|
} else if (strcmp(argv[i], "-m")==0 && i < argc-1) {
|
||||||
|
msgSize = atoi(argv[++i]);
|
||||||
|
} else if (strcmp(argv[i], "-s")==0 && i < argc-1) {
|
||||||
|
rpcInit.sessions = atoi(argv[++i]);
|
||||||
|
} else if (strcmp(argv[i], "-n")==0 && i < argc-1) {
|
||||||
|
numOfReqs = atoi(argv[++i]);
|
||||||
|
} else if (strcmp(argv[i], "-a")==0 && i < argc-1) {
|
||||||
|
appThreads = atoi(argv[++i]);
|
||||||
|
} else if (strcmp(argv[i], "-o")==0 && i < argc-1) {
|
||||||
|
tsCompressMsgSize = atoi(argv[++i]);
|
||||||
|
} else if (strcmp(argv[i], "-u")==0 && i < argc-1) {
|
||||||
|
rpcInit.user = argv[++i];
|
||||||
|
} else if (strcmp(argv[i], "-k")==0 && i < argc-1) {
|
||||||
|
rpcInit.secret = argv[++i];
|
||||||
|
} else if (strcmp(argv[i], "-spi")==0 && i < argc-1) {
|
||||||
|
rpcInit.spi = atoi(argv[++i]);
|
||||||
|
} else if (strcmp(argv[i], "-d")==0 && i < argc-1) {
|
||||||
|
rpcDebugFlag = atoi(argv[++i]);
|
||||||
|
} else {
|
||||||
|
printf("\nusage: %s [options] \n", argv[0]);
|
||||||
|
printf(" [-i ip]: first server IP address, default is:%s\n", serverIp);
|
||||||
|
printf(" [-p port]: server port number, default is:%d\n", ipSet.port);
|
||||||
|
printf(" [-t threads]: number of rpc threads, default is:%d\n", rpcInit.numOfThreads);
|
||||||
|
printf(" [-s sessions]: number of rpc sessions, default is:%d\n", rpcInit.sessions);
|
||||||
|
printf(" [-l localIp]: local IP address, default is:%s\n", rpcInit.localIp);
|
||||||
|
printf(" [-m msgSize]: message body size, default is:%d\n", msgSize);
|
||||||
|
printf(" [-a threads]: number of app threads, default is:%d\n", appThreads);
|
||||||
|
printf(" [-n requests]: number of requests per thread, default is:%d\n", numOfReqs);
|
||||||
|
printf(" [-o compSize]: compression message size, default is:%d\n", tsCompressMsgSize);
|
||||||
|
printf(" [-u user]: user name for the connection, default is:%s\n", rpcInit.user);
|
||||||
|
printf(" [-k secret]: password for the connection, default is:%s\n", rpcInit.secret);
|
||||||
|
printf(" [-spi SPI]: security parameter index, default is:%d\n", rpcInit.spi);
|
||||||
|
printf(" [-d debugFlag]: debug flag, default:%d\n", rpcDebugFlag);
|
||||||
|
printf(" [-h help]: print out this help\n\n");
|
||||||
|
exit(0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
taosInitLog("client.log", 100000, 10);
|
||||||
|
|
||||||
|
void *pRpc = rpcOpen(&rpcInit);
|
||||||
|
if (pRpc == NULL) {
|
||||||
|
dError("failed to initialize RPC");
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
tPrint("client is initialized");
|
||||||
|
|
||||||
|
gettimeofday(&systemTime, NULL);
|
||||||
|
startTime = systemTime.tv_sec*1000000 + systemTime.tv_usec;
|
||||||
|
|
||||||
|
SInfo *pInfo = (SInfo *)calloc(1, sizeof(SInfo)*appThreads);
|
||||||
|
|
||||||
|
pthread_attr_init(&thattr);
|
||||||
|
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
|
||||||
|
|
||||||
|
for (int i=0; i<appThreads; ++i) {
|
||||||
|
pInfo->index = i;
|
||||||
|
pInfo->ipSet = ipSet;
|
||||||
|
pInfo->numOfReqs = numOfReqs;
|
||||||
|
pInfo->msgSize = msgSize;
|
||||||
|
sem_init(&pInfo->rspSem, 0, 0);
|
||||||
|
pInfo->pRpc = pRpc;
|
||||||
|
pthread_create(&pInfo->thread, &thattr, sendRequest, pInfo);
|
||||||
|
pInfo++;
|
||||||
|
}
|
||||||
|
|
||||||
|
do {
|
||||||
|
usleep(1);
|
||||||
|
} while ( tcount < appThreads);
|
||||||
|
|
||||||
|
gettimeofday(&systemTime, NULL);
|
||||||
|
endTime = systemTime.tv_sec*1000000 + systemTime.tv_usec;
|
||||||
|
float usedTime = (endTime - startTime)/1000.0; // mseconds
|
||||||
|
|
||||||
|
tPrint("it takes %.3f mseconds to send %d requests to server, error num:%d", usedTime, numOfReqs*appThreads, terror);
|
||||||
|
tPrint("Performance: %.3f requests per second, msgSize:%d bytes", 1000.0*numOfReqs*appThreads/usedTime, msgSize);
|
||||||
|
|
||||||
|
taosCloseLogger();
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
|
@ -72,7 +72,7 @@ void processShellMsg() {
|
||||||
rpcMsg.pCont = rpcMallocCont(msgSize);
|
rpcMsg.pCont = rpcMallocCont(msgSize);
|
||||||
rpcMsg.contLen = msgSize;
|
rpcMsg.contLen = msgSize;
|
||||||
rpcMsg.handle = pRpcMsg->handle;
|
rpcMsg.handle = pRpcMsg->handle;
|
||||||
rpcMsg.code = 1;
|
rpcMsg.code = 0;
|
||||||
rpcSendResponse(&rpcMsg);
|
rpcSendResponse(&rpcMsg);
|
||||||
|
|
||||||
taosFreeQitem(pRpcMsg);
|
taosFreeQitem(pRpcMsg);
|
||||||
|
@ -126,9 +126,10 @@ void processRequestMsg(SRpcMsg *pMsg) {
|
||||||
int main(int argc, char *argv[]) {
|
int main(int argc, char *argv[]) {
|
||||||
SRpcInit rpcInit;
|
SRpcInit rpcInit;
|
||||||
char dataName[20] = "server.data";
|
char dataName[20] = "server.data";
|
||||||
|
char localIp[40] = "0.0.0.0";
|
||||||
|
|
||||||
memset(&rpcInit, 0, sizeof(rpcInit));
|
memset(&rpcInit, 0, sizeof(rpcInit));
|
||||||
rpcInit.localIp = "0.0.0.0";
|
rpcInit.localIp = localIp;
|
||||||
rpcInit.localPort = 7000;
|
rpcInit.localPort = 7000;
|
||||||
rpcInit.label = "SER";
|
rpcInit.label = "SER";
|
||||||
rpcInit.numOfThreads = 1;
|
rpcInit.numOfThreads = 1;
|
||||||
|
@ -201,5 +202,3 @@ int main(int argc, char *argv[]) {
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -1,4 +1,15 @@
|
||||||
|
CMAKE_MINIMUM_REQUIRED(VERSION 2.8)
|
||||||
|
PROJECT(TDengine)
|
||||||
|
|
||||||
|
INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc)
|
||||||
|
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc)
|
||||||
|
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc)
|
||||||
|
INCLUDE_DIRECTORIES(inc)
|
||||||
|
|
||||||
AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR}/src SRC)
|
AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR}/src SRC)
|
||||||
|
|
||||||
ADD_LIBRARY(wal ${SRC})
|
ADD_LIBRARY(twal ${SRC})
|
||||||
TARGET_INCLUDE_DIRECTORIES(wal PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/inc)
|
TARGET_LINK_LIBRARIES(twal tutil)
|
||||||
|
|
||||||
|
ADD_SUBDIRECTORY(test)
|
||||||
|
|
||||||
|
|
|
@ -14,19 +14,37 @@
|
||||||
*/
|
*/
|
||||||
#ifndef _TD_WAL_H_
|
#ifndef _TD_WAL_H_
|
||||||
#define _TD_WAL_H_
|
#define _TD_WAL_H_
|
||||||
#include <stdint.h>
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
typedef void walh; // WAL HANDLE
|
#define TAOS_WAL_NOLOG 0
|
||||||
|
#define TAOS_WAL_WRITE 1
|
||||||
|
#define TAOS_WAL_FSYNC 2
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int8_t msgType;
|
||||||
|
int8_t reserved[3];
|
||||||
|
int32_t len;
|
||||||
|
uint64_t version;
|
||||||
|
uint32_t signature;
|
||||||
|
uint32_t cksum;
|
||||||
|
char cont[];
|
||||||
|
} SWalHead;
|
||||||
|
|
||||||
|
typedef void* twal_h; // WAL HANDLE
|
||||||
|
|
||||||
|
twal_h walOpen(char *path, int max, int level);
|
||||||
|
void walClose(twal_h);
|
||||||
|
int walRenew(twal_h);
|
||||||
|
int walWrite(twal_h, SWalHead *);
|
||||||
|
void walFsync(twal_h);
|
||||||
|
int walRestore(twal_h, void *pVnode, int (*writeFp)(void *ahandle, void *pWalHead));
|
||||||
|
int walGetWalFile(twal_h, char *name, uint32_t *index);
|
||||||
|
|
||||||
|
extern int wDebugFlag;
|
||||||
|
|
||||||
walh *vnodeOpenWal(int vnode, uint8_t op);
|
|
||||||
int vnodeCloseWal(walh *pWal);
|
|
||||||
int vnodeRenewWal(walh *pWal);
|
|
||||||
int vnodeWriteWal(walh *pWal, void *cont, int contLen);
|
|
||||||
int vnodeSyncWal(walh *pWal);
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
|
@ -1,27 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
|
||||||
*
|
|
||||||
* This program is free software: you can use, redistribute, and/or modify
|
|
||||||
* it under the terms of the GNU Affero General Public License, version 3
|
|
||||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
|
||||||
*
|
|
||||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
|
||||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
||||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
|
||||||
*
|
|
||||||
* You should have received a copy of the GNU Affero General Public License
|
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
*/
|
|
||||||
#include <stdlib.h>
|
|
||||||
|
|
||||||
#include "vnodeWal.h"
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
/* TODO */
|
|
||||||
} SWal;
|
|
||||||
|
|
||||||
walh *vnodeOpenWal(int vnode, uint8_t op) { return NULL; }
|
|
||||||
int vnodeCloseWal(walh *pWal) { return 0; }
|
|
||||||
int vnodeRenewWal(walh *pWal) { return 0; }
|
|
||||||
int vnodeWriteWal(walh *pWal, void *cont, int contLen) { return 0; }
|
|
||||||
int vnodeSyncWal(walh *pWal) { return 0; }
|
|
|
@ -0,0 +1,357 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include <stdint.h>
|
||||||
|
#include <stdio.h>
|
||||||
|
#include <stdlib.h>
|
||||||
|
#include <sys/types.h>
|
||||||
|
#include <dirent.h>
|
||||||
|
#include <unistd.h>
|
||||||
|
#include <fcntl.h>
|
||||||
|
|
||||||
|
#include "os.h"
|
||||||
|
#include "tlog.h"
|
||||||
|
#include "tchecksum.h"
|
||||||
|
#include "tutil.h"
|
||||||
|
#include "twal.h"
|
||||||
|
|
||||||
|
#define walPrefix "wal"
|
||||||
|
#define wError(...) if (wDebugFlag & DEBUG_ERROR) {tprintf("ERROR WAL ", wDebugFlag, __VA_ARGS__);}
|
||||||
|
#define wWarn(...) if (wDebugFlag & DEBUG_WARN) {tprintf("WARN WAL ", wDebugFlag, __VA_ARGS__);}
|
||||||
|
#define wTrace(...) if (wDebugFlag & DEBUG_TRACE) {tprintf("WAL ", wDebugFlag, __VA_ARGS__);}
|
||||||
|
#define wPrint(...) {tprintf("WAL ", 255, __VA_ARGS__);}
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int fd;
|
||||||
|
int level;
|
||||||
|
int max; // maximum number of wal files
|
||||||
|
uint32_t id; // increase continuously
|
||||||
|
int num; // number of wal files
|
||||||
|
char path[TSDB_FILENAME_LEN];
|
||||||
|
char name[TSDB_FILENAME_LEN];
|
||||||
|
pthread_mutex_t mutex;
|
||||||
|
} SWal;
|
||||||
|
|
||||||
|
int wDebugFlag = 135;
|
||||||
|
|
||||||
|
static uint32_t walSignature = 0xFAFBFDFE;
|
||||||
|
static int walHandleExistingFiles(char *path);
|
||||||
|
static int walRestoreWalFile(char *name, void *pVnode, int (*writeFp)(void *, void *));
|
||||||
|
static int walRemoveWalFiles(char *path);
|
||||||
|
|
||||||
|
void *walOpen(char *path, int max, int level) {
|
||||||
|
SWal *pWal = calloc(sizeof(SWal), 1);
|
||||||
|
if (pWal == NULL) return NULL;
|
||||||
|
|
||||||
|
pWal->fd = -1;
|
||||||
|
pWal->max = max;
|
||||||
|
pWal->id = 0;
|
||||||
|
pWal->num = 0;
|
||||||
|
pWal->level = level;
|
||||||
|
strcpy(pWal->path, path);
|
||||||
|
pthread_mutex_init(&pWal->mutex, NULL);
|
||||||
|
|
||||||
|
if (access(path, F_OK) != 0) mkdir(path, 0755);
|
||||||
|
|
||||||
|
if (walHandleExistingFiles(path) == 0)
|
||||||
|
walRenew(pWal);
|
||||||
|
|
||||||
|
if (pWal->fd <0) {
|
||||||
|
wError("wal:%s, failed to open", path);
|
||||||
|
pthread_mutex_destroy(&pWal->mutex);
|
||||||
|
free(pWal);
|
||||||
|
pWal = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
return pWal;
|
||||||
|
}
|
||||||
|
|
||||||
|
void walClose(void *handle) {
|
||||||
|
|
||||||
|
SWal *pWal = (SWal *)handle;
|
||||||
|
|
||||||
|
close(pWal->fd);
|
||||||
|
|
||||||
|
// remove all files in the directory
|
||||||
|
for (int i=0; i<pWal->num; ++i) {
|
||||||
|
sprintf(pWal->name, "%s/%s%d", pWal->path, walPrefix, pWal->id-i);
|
||||||
|
if (remove(pWal->name) <0) {
|
||||||
|
wError("wal:%s, failed to remove", pWal->name);
|
||||||
|
} else {
|
||||||
|
wTrace("wal:%s, it is removed", pWal->name);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pthread_mutex_destroy(&pWal->mutex);
|
||||||
|
|
||||||
|
free(pWal);
|
||||||
|
}
|
||||||
|
|
||||||
|
int walRenew(twal_h handle) {
|
||||||
|
SWal *pWal = (SWal *)handle;
|
||||||
|
int code = 0;
|
||||||
|
|
||||||
|
pthread_mutex_lock(&pWal->mutex);
|
||||||
|
|
||||||
|
if (pWal->fd >=0) {
|
||||||
|
close(pWal->fd);
|
||||||
|
pWal->id++;
|
||||||
|
wTrace("wal:%s, it is closed", pWal->name);
|
||||||
|
}
|
||||||
|
|
||||||
|
pWal->num++;
|
||||||
|
|
||||||
|
sprintf(pWal->name, "%s/%s%d", pWal->path, walPrefix, pWal->id);
|
||||||
|
pWal->fd = open(pWal->name, O_WRONLY | O_CREAT, S_IRWXU | S_IRWXG | S_IRWXO);
|
||||||
|
|
||||||
|
if (pWal->fd < 0) {
|
||||||
|
wError("wal:%d, failed to open(%s)", pWal->name, strerror(errno));
|
||||||
|
code = -1;
|
||||||
|
} else {
|
||||||
|
wTrace("wal:%s, it is created", pWal->name);
|
||||||
|
|
||||||
|
if (pWal->num > pWal->max) {
|
||||||
|
// remove the oldest wal file
|
||||||
|
char name[TSDB_FILENAME_LEN];
|
||||||
|
sprintf(name, "%s/%s%d", pWal->path, walPrefix, pWal->id - pWal->max);
|
||||||
|
if (remove(name) <0) {
|
||||||
|
wError("wal:%s, failed to remove(%s)", name, strerror(errno));
|
||||||
|
} else {
|
||||||
|
wTrace("wal:%s, it is removed", name);
|
||||||
|
}
|
||||||
|
|
||||||
|
pWal->num--;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pthread_mutex_unlock(&pWal->mutex);
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int walWrite(void *handle, SWalHead *pHead) {
|
||||||
|
SWal *pWal = (SWal *)handle;
|
||||||
|
int code = 0;
|
||||||
|
|
||||||
|
// no wal
|
||||||
|
if (pWal->level == TAOS_WAL_NOLOG) return 0;
|
||||||
|
|
||||||
|
pHead->signature = walSignature;
|
||||||
|
taosCalcChecksumAppend(0, (uint8_t *)pHead, sizeof(SWal));
|
||||||
|
int contLen = pHead->len + sizeof(SWalHead);
|
||||||
|
|
||||||
|
if(write(pWal->fd, pHead, contLen) != contLen) {
|
||||||
|
wError("wal:%s, failed to write(%s)", pWal->name, strerror(errno));
|
||||||
|
code = -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
void walFsync(void *handle) {
|
||||||
|
|
||||||
|
SWal *pWal = (SWal *)handle;
|
||||||
|
|
||||||
|
if (pWal->level == TAOS_WAL_FSYNC)
|
||||||
|
fsync(pWal->fd);
|
||||||
|
}
|
||||||
|
|
||||||
|
int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *)) {
|
||||||
|
SWal *pWal = (SWal *)handle;
|
||||||
|
int code = 0;
|
||||||
|
struct dirent *ent;
|
||||||
|
int count = 0;
|
||||||
|
uint32_t maxId = 0, minId = -1, index =0;
|
||||||
|
|
||||||
|
int plen = strlen(walPrefix);
|
||||||
|
char opath[TSDB_FILENAME_LEN];
|
||||||
|
sprintf(opath, "%s/old", pWal->path);
|
||||||
|
|
||||||
|
// is there old directory?
|
||||||
|
if (access(opath, F_OK)) return 0;
|
||||||
|
|
||||||
|
DIR *dir = opendir(opath);
|
||||||
|
while ((ent = readdir(dir))!= NULL) {
|
||||||
|
if ( strncmp(ent->d_name, walPrefix, plen) == 0) {
|
||||||
|
index = atol(ent->d_name + plen);
|
||||||
|
if (index > maxId) maxId = index;
|
||||||
|
if (index < minId) minId = index;
|
||||||
|
count++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if ( count != (maxId-minId+1) ) {
|
||||||
|
wError("wal:%s, messed up, count:%d max:%d min:%d", opath, count, maxId, minId);
|
||||||
|
code = -1;
|
||||||
|
} else {
|
||||||
|
wTrace("wal:%s, %d files will be restored", opath, count);
|
||||||
|
|
||||||
|
for (index = minId; index<=maxId; ++index) {
|
||||||
|
sprintf(pWal->name, "%s/old/%s%d", pWal->path, walPrefix, index);
|
||||||
|
code = walRestoreWalFile(pWal->name, pVnode, writeFp);
|
||||||
|
if (code < 0) break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (code == 0) {
|
||||||
|
code = walRemoveWalFiles(opath);
|
||||||
|
if (code == 0) {
|
||||||
|
if (remove(opath) < 0) {
|
||||||
|
wError("wal:%s, failed to remove directory(%s)", opath, strerror(errno));
|
||||||
|
code = -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
closedir(dir);
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int walGetWalFile(void *handle, char *name, uint32_t *index) {
|
||||||
|
SWal *pWal = (SWal *)handle;
|
||||||
|
int code = 1;
|
||||||
|
int32_t first = 0;
|
||||||
|
|
||||||
|
name[0] = 0;
|
||||||
|
if (pWal == NULL || pWal->num == 0) return 0;
|
||||||
|
|
||||||
|
pthread_mutex_lock(&(pWal->mutex));
|
||||||
|
|
||||||
|
first = pWal->id + 1 - pWal->num;
|
||||||
|
if (*index == 0) *index = first; // set to first one
|
||||||
|
|
||||||
|
if (*index < first && *index > pWal->id) {
|
||||||
|
code = -1; // index out of range
|
||||||
|
} else {
|
||||||
|
sprintf(name, "%s/%s%d", pWal->path, walPrefix, *index);
|
||||||
|
code = (*index == pWal->id) ? 0:1;
|
||||||
|
}
|
||||||
|
|
||||||
|
pthread_mutex_unlock(&(pWal->mutex));
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int walRestoreWalFile(char *name, void *pVnode, int (*writeFp)(void *, void *)) {
|
||||||
|
SWalHead walHead;
|
||||||
|
int code = 0;
|
||||||
|
|
||||||
|
int fd = open(name, O_RDONLY);
|
||||||
|
if (fd < 0) {
|
||||||
|
wError("wal:%s, failed to open for restore(%s)", name, strerror(errno));
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
wTrace("wal:%s, start to restore", name);
|
||||||
|
|
||||||
|
while (1) {
|
||||||
|
int ret = read(fd, &walHead, sizeof(walHead));
|
||||||
|
if ( ret == 0) { code = 0; break;}
|
||||||
|
|
||||||
|
if (ret != sizeof(walHead)) {
|
||||||
|
wWarn("wal:%s, failed to read head, skip, ret:%d(%s)", name, ret, strerror(errno));
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (taosCheckChecksumWhole((uint8_t *)&walHead, sizeof(walHead))) {
|
||||||
|
wWarn("wal:%s, cksum is messed up, skip the rest of file", name);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
char *buffer = malloc(sizeof(SWalHead) + walHead.len);
|
||||||
|
memcpy(buffer, &walHead, sizeof(walHead));
|
||||||
|
|
||||||
|
ret = read(fd, buffer+sizeof(walHead), walHead.len);
|
||||||
|
if ( ret != walHead.len) {
|
||||||
|
wWarn("wal:%s, failed to read body, skip, len:%d ret:%d", name, walHead.len, ret);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
// write into queue
|
||||||
|
(*writeFp)(pVnode, buffer);
|
||||||
|
}
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int walHandleExistingFiles(char *path) {
|
||||||
|
int code = 0;
|
||||||
|
char oname[TSDB_FILENAME_LEN];
|
||||||
|
char nname[TSDB_FILENAME_LEN];
|
||||||
|
char opath[TSDB_FILENAME_LEN];
|
||||||
|
|
||||||
|
sprintf(opath, "%s/old", path);
|
||||||
|
|
||||||
|
struct dirent *ent;
|
||||||
|
DIR *dir = opendir(path);
|
||||||
|
int plen = strlen(walPrefix);
|
||||||
|
|
||||||
|
if (access(opath, F_OK) == 0) {
|
||||||
|
// old directory is there, it means restore process is not finished
|
||||||
|
walRemoveWalFiles(path);
|
||||||
|
|
||||||
|
} else {
|
||||||
|
// move all files to old directory
|
||||||
|
int count = 0;
|
||||||
|
while ((ent = readdir(dir))!= NULL) {
|
||||||
|
if ( strncmp(ent->d_name, walPrefix, plen) == 0) {
|
||||||
|
if (access(opath, F_OK) != 0) mkdir(opath, 0755);
|
||||||
|
|
||||||
|
sprintf(oname, "%s/%s", path, ent->d_name);
|
||||||
|
sprintf(nname, "%s/old/%s", path, ent->d_name);
|
||||||
|
if (rename(oname, nname) < 0) {
|
||||||
|
wError("wal:%s, failed to move to new:%s", oname, nname);
|
||||||
|
code = -1;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
count++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
wTrace("wal:%s, %d files are moved for restoration", path, count);
|
||||||
|
}
|
||||||
|
|
||||||
|
closedir(dir);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int walRemoveWalFiles(char *path) {
|
||||||
|
int plen = strlen(walPrefix);
|
||||||
|
char name[TSDB_FILENAME_LEN];
|
||||||
|
int code = 0;
|
||||||
|
|
||||||
|
if (access(path, F_OK) != 0) return 0;
|
||||||
|
|
||||||
|
struct dirent *ent;
|
||||||
|
DIR *dir = opendir(path);
|
||||||
|
|
||||||
|
while ((ent = readdir(dir))!= NULL) {
|
||||||
|
if ( strncmp(ent->d_name, walPrefix, plen) == 0) {
|
||||||
|
sprintf(name, "%s/%s", path, ent->d_name);
|
||||||
|
if (remove(name) <0) {
|
||||||
|
wError("wal:%s, failed to remove(%s)", name, strerror(errno));
|
||||||
|
code = -1; break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
closedir(dir);
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,16 @@
|
||||||
|
CMAKE_MINIMUM_REQUIRED(VERSION 2.8)
|
||||||
|
PROJECT(TDengine)
|
||||||
|
|
||||||
|
IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
|
||||||
|
INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc)
|
||||||
|
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc)
|
||||||
|
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc)
|
||||||
|
INCLUDE_DIRECTORIES(../inc)
|
||||||
|
|
||||||
|
LIST(APPEND WALTEST_SRC ./waltest.c)
|
||||||
|
ADD_EXECUTABLE(waltest ${WALTEST_SRC})
|
||||||
|
TARGET_LINK_LIBRARIES(waltest twal)
|
||||||
|
|
||||||
|
ENDIF ()
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,130 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
//#define _DEFAULT_SOURCE
|
||||||
|
#include <stdint.h>
|
||||||
|
#include "tlog.h"
|
||||||
|
#include "twal.h"
|
||||||
|
|
||||||
|
int64_t ver = 0;
|
||||||
|
void *pWal = NULL;
|
||||||
|
|
||||||
|
int writeToQueue(void *pVnode, void *data) {
|
||||||
|
SWalHead *pHead = (SWalHead *)data;
|
||||||
|
|
||||||
|
// do nothing
|
||||||
|
if (pHead->version > ver)
|
||||||
|
ver = pHead->version;
|
||||||
|
|
||||||
|
walWrite(pWal, pHead);
|
||||||
|
|
||||||
|
free(data);
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int main(int argc, char *argv[]) {
|
||||||
|
char path[128] = "/home/jhtao/test/wal";
|
||||||
|
int max = 3;
|
||||||
|
int level = 2;
|
||||||
|
int total = 5;
|
||||||
|
int rows = 10000;
|
||||||
|
int size = 128;
|
||||||
|
|
||||||
|
for (int i=1; i<argc; ++i) {
|
||||||
|
if (strcmp(argv[i], "-p")==0 && i < argc-1) {
|
||||||
|
strcpy(path, argv[++i]);
|
||||||
|
} else if (strcmp(argv[i], "-m")==0 && i < argc-1) {
|
||||||
|
max = atoi(argv[++i]);
|
||||||
|
} else if (strcmp(argv[i], "-l")==0 && i < argc-1) {
|
||||||
|
level = atoi(argv[++i]);
|
||||||
|
} else if (strcmp(argv[i], "-r")==0 && i < argc-1) {
|
||||||
|
rows = atoi(argv[++i]);
|
||||||
|
} else if (strcmp(argv[i], "-t")==0 && i < argc-1) {
|
||||||
|
total = atoi(argv[++i]);
|
||||||
|
} else if (strcmp(argv[i], "-s")==0 && i < argc-1) {
|
||||||
|
size = atoi(argv[++i]);
|
||||||
|
} else if (strcmp(argv[i], "-v")==0 && i < argc-1) {
|
||||||
|
ver = atoll(argv[++i]);
|
||||||
|
} else if (strcmp(argv[i], "-d")==0 && i < argc-1) {
|
||||||
|
ddebugFlag = atoi(argv[++i]);
|
||||||
|
} else {
|
||||||
|
printf("\nusage: %s [options] \n", argv[0]);
|
||||||
|
printf(" [-p path]: wal file path default is:%s\n", path);
|
||||||
|
printf(" [-m max]: max wal files, default is:%d\n", max);
|
||||||
|
printf(" [-l level]: log level, default is:%d\n", level);
|
||||||
|
printf(" [-t total]: total wal files, default is:%d\n", total);
|
||||||
|
printf(" [-r rows]: rows of records per wal file, default is:%d\n", rows);
|
||||||
|
printf(" [-v version]: initial version, default is:%ld\n", ver);
|
||||||
|
printf(" [-d debugFlag]: debug flag, default:%d\n", ddebugFlag);
|
||||||
|
printf(" [-h help]: print out this help\n\n");
|
||||||
|
exit(0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
taosInitLog("wal.log", 100000, 10);
|
||||||
|
|
||||||
|
pWal = walOpen(path, max, level);
|
||||||
|
if (pWal == NULL) {
|
||||||
|
printf("failed to open wal\n");
|
||||||
|
exit(-1);
|
||||||
|
}
|
||||||
|
|
||||||
|
int ret = walRestore(pWal, NULL, writeToQueue);
|
||||||
|
if (ret <0) {
|
||||||
|
printf("failed to restore wal\n");
|
||||||
|
exit(-1);
|
||||||
|
}
|
||||||
|
|
||||||
|
printf("version starts from:%ld\n", ver);
|
||||||
|
|
||||||
|
int contLen = sizeof(SWalHead) + size;
|
||||||
|
SWalHead *pHead = (SWalHead *) malloc(contLen);
|
||||||
|
|
||||||
|
for (int i=0; i<total; ++i) {
|
||||||
|
for (int k=0; k<rows; ++k) {
|
||||||
|
pHead->version = ++ver;
|
||||||
|
pHead->len = size;
|
||||||
|
walWrite(pWal, pHead);
|
||||||
|
}
|
||||||
|
|
||||||
|
printf("renew a wal, i:%d\n", i);
|
||||||
|
walRenew(pWal);
|
||||||
|
}
|
||||||
|
|
||||||
|
printf("%d wal files are written\n", total);
|
||||||
|
|
||||||
|
uint32_t index = 0;
|
||||||
|
char name[256];
|
||||||
|
|
||||||
|
while (1) {
|
||||||
|
int code = walGetWalFile(pWal, name, &index);
|
||||||
|
if (code == -1) {
|
||||||
|
printf("failed to get wal file, index:%d\n", index);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
printf("index:%d wal:%s\n", index, name);
|
||||||
|
if (code == 0) break;
|
||||||
|
|
||||||
|
index++;
|
||||||
|
}
|
||||||
|
|
||||||
|
getchar();
|
||||||
|
|
||||||
|
walClose(pWal);
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
Loading…
Reference in New Issue