From 4c60448baf9438d512dc0c252a955aa8c9bcb99b Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Fri, 27 Mar 2020 20:51:51 +0800 Subject: [PATCH 1/7] fix the bug for code add the code to handle idle timer --- src/rpc/src/rpcMain.c | 70 +++++++++++++++++++++++++++++++++--------- src/rpc/test/rserver.c | 36 ++++++++-------------- 2 files changed, 68 insertions(+), 38 deletions(-) diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index c6f527a7d2..3b0137231f 100755 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -182,6 +182,7 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext); static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code); static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code); static void rpcSendMsgToPeer(SRpcConn *pConn, void *data, int dataLen); +static void rpcSendReqHead(SRpcConn *pConn); static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv); static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead); @@ -412,12 +413,12 @@ void rpcSendResponse(SRpcMsg *pMsg) { rpcFreeMsg(pConn->pRspMsg); pConn->pRspMsg = msg; pConn->rspMsgLen = msgLen; - if (pHead->content[0] == TSDB_CODE_ACTION_IN_PROGRESS) pConn->inTranId--; + if (pMsg->code == TSDB_CODE_ACTION_IN_PROGRESS) pConn->inTranId--; rpcUnlockConn(pConn); taosTmrStopA(&pConn->pTimer); - taosTmrReset(rpcProcessIdleTimer, pRpc->idleTime, pConn, pRpc->tmrCtrl, &pConn->pIdleTimer); + // taosTmrReset(rpcProcessIdleTimer, pRpc->idleTime, pConn, pRpc->tmrCtrl, &pConn->pIdleTimer); rpcSendMsgToPeer(pConn, msg, msgLen); pConn->secured = 1; // connection shall be secured @@ -653,8 +654,12 @@ static int rpcProcessReqHead(SRpcConn *pConn, SRpcHead *pHead) { if (pConn->inTranId == pHead->tranId) { if (pConn->inType == pHead->msgType) { - tTrace("%s %p, %s is retransmitted", pRpc->label, pConn, taosMsg[pHead->msgType]); - rpcSendQuickRsp(pConn, TSDB_CODE_ACTION_IN_PROGRESS); + if (pHead->code == 0) { + tTrace("%s %p, %s is retransmitted", pRpc->label, pConn, taosMsg[pHead->msgType]); + rpcSendQuickRsp(pConn, TSDB_CODE_ACTION_IN_PROGRESS); + } else { + // do nothing, it is heart beat from client + } } else if (pConn->inType == 0) { tTrace("%s %p, %s is already processed, tranId:%d", pRpc->label, pConn, taosMsg[pHead->msgType], pConn->inTranId); @@ -695,22 +700,23 @@ static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) { return TSDB_CODE_INVALID_RESPONSE_TYPE; } - if (*pHead->content == TSDB_CODE_NOT_READY) { + if (pHead->code == TSDB_CODE_NOT_READY) { return TSDB_CODE_ALREADY_PROCESSED; } taosTmrStopA(&pConn->pTimer); pConn->retry = 0; - if (*pHead->content == TSDB_CODE_ACTION_IN_PROGRESS) { + if (pHead->code == TSDB_CODE_ACTION_IN_PROGRESS) { if (pConn->tretry <= tsRpcMaxRetry) { - pConn->tretry++; tTrace("%s %p, peer is still processing the transaction", pRpc->label, pConn); - taosTmrReset(rpcProcessRetryTimer, tsRpcProgressTime, pConn, pRpc->tmrCtrl, &pConn->pTimer); + pConn->tretry++; + rpcSendReqHead(pConn); + taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer); return TSDB_CODE_ALREADY_PROCESSED; } else { // peer still in processing, give up - *pHead->content = TSDB_CODE_TOO_SLOW; + return TSDB_CODE_TOO_SLOW; } } @@ -771,6 +777,7 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) { if ( rpcIsReq(pHead->msgType) ) { terrno = rpcProcessReqHead(pConn, pHead); pConn->connType = pRecv->connType; + taosTmrReset(rpcProcessIdleTimer, pRpc->idleTime, pConn, pRpc->tmrCtrl, &pConn->pIdleTimer); } else { terrno = rpcProcessRspHead(pConn, pHead); } @@ -816,7 +823,7 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) { pConn = rpcProcessMsgHead(pRpc, pRecv); if (pHead->msgType < TSDB_MSG_TYPE_CM_HEARTBEAT || (rpcDebugFlag & 16)) { - tTrace("%s %p, %s received from 0x%x:%hu, parse code:%x len:%d sig:0x%08x:0x%08x:%d", + tTrace("%s %p, %s received from 0x%x:%hu, parse code:0x%x len:%d sig:0x%08x:0x%08x:%d", pRpc->label, pConn, taosMsg[pHead->msgType], pRecv->ip, pRecv->port, terrno, pRecv->msgLen, pHead->sourceId, pHead->destId, pHead->tranId, pHead->port); } @@ -825,7 +832,7 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) { if (terrno != 0) { // parsing error if ( rpcIsReq(pHead->msgType) ) { rpcSendErrorMsgToPeer(pRecv, terrno); - tTrace("%s %p, %s is sent with error code:%x", pRpc->label, pConn, taosMsg[pHead->msgType+1], terrno); + tTrace("%s %p, %s is sent with error code:0x%x", pRpc->label, pConn, taosMsg[pHead->msgType+1], terrno); } } else { // parsing OK rpcProcessIncomingMsg(pConn, pHead); @@ -885,7 +892,7 @@ static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code) { pHead = (SRpcHead *)msg; pHead->version = 1; pHead->msgType = pConn->inType+1; - pHead->spi = 0; + pHead->spi = pConn->spi; pHead->encrypt = 0; pHead->tranId = pConn->inTranId; pHead->sourceId = pConn->ownId; @@ -894,7 +901,29 @@ static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code) { memcpy(pHead->user, pConn->user, tListLen(pHead->user)); pHead->code = htonl(code); - rpcSendMsgToPeer(pConn, msg, 0); + rpcSendMsgToPeer(pConn, msg, sizeof(SRpcHead)); + pConn->secured = 1; // connection shall be secured +} + +static void rpcSendReqHead(SRpcConn *pConn) { + char msg[RPC_MSG_OVERHEAD]; + SRpcHead *pHead; + + // set msg header + memset(msg, 0, sizeof(SRpcHead)); + pHead = (SRpcHead *)msg; + pHead->version = 1; + pHead->msgType = pConn->outType; + pHead->spi = pConn->spi; + pHead->encrypt = 0; + pHead->tranId = pConn->outTranId; + pHead->sourceId = pConn->ownId; + pHead->destId = pConn->peerId; + pHead->linkUid = pConn->linkUid; + memcpy(pHead->user, pConn->user, tListLen(pHead->user)); + pHead->code = 1; + + rpcSendMsgToPeer(pConn, msg, sizeof(SRpcHead)); } static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code) { @@ -990,9 +1019,9 @@ static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) { pConn->peerPort, msgLen, pHead->sourceId, pHead->destId, pHead->tranId); } else { if (pHead->msgType < TSDB_MSG_TYPE_CM_HEARTBEAT || (rpcDebugFlag & 16)) - tTrace( "%s %p, %s is sent to %s:%hu, code:%u len:%d sig:0x%08x:0x%08x:%d", + tTrace( "%s %p, %s is sent to %s:%hu, code:0x%x len:%d sig:0x%08x:0x%08x:%d", pRpc->label, pConn, taosMsg[pHead->msgType], pConn->peerIpstr, pConn->peerPort, - (uint8_t)pHead->content[0], msgLen, pHead->sourceId, pHead->destId, pHead->tranId); + pHead->code, msgLen, pHead->sourceId, pHead->destId, pHead->tranId); } writtenLen = (*taosSendData[pConn->connType])(pConn->peerIp, pConn->peerPort, pHead, msgLen, pConn->chandle); @@ -1070,6 +1099,17 @@ static void rpcProcessIdleTimer(void *param, void *tmrId) { if (pConn->user[0]) { tTrace("%s %p, close the connection since no activity", pRpc->label, pConn); + if (pConn->inType && pRpc->cfp) { + // if there are pending request, notify the app + tTrace("%s %p, notify the app, connection is gone", pRpc->label, pConn); + SRpcMsg rpcMsg; + rpcMsg.pCont = NULL; + rpcMsg.contLen = 0; + rpcMsg.handle = pConn; + rpcMsg.msgType = pConn->inType; + rpcMsg.code = TSDB_CODE_NETWORK_UNAVAIL; + (*(pRpc->cfp))(&rpcMsg); + } rpcCloseConn(pConn); } else { tTrace("%s %p, idle timer:%p not processed", pRpc->label, pConn, tmrId); diff --git a/src/rpc/test/rserver.c b/src/rpc/test/rserver.c index 6c5b320809..deb6135cef 100644 --- a/src/rpc/test/rserver.c +++ b/src/rpc/test/rserver.c @@ -28,25 +28,23 @@ void *qhandle = NULL; void processShellMsg() { static int num = 0; taos_qall qall; - SRpcMsg *pRpcMsg, rpcMsg; - int type; - - qall = taosAllocateQall(); + SRpcMsg rpcMsg; while (1) { - int numOfMsgs = taosReadAllQitems(qhandle, qall); + int numOfMsgs = taosReadAllQitems(qhandle, &qall); if (numOfMsgs <= 0) { usleep(1000); continue; } tTrace("%d shell msgs are received", numOfMsgs); + sleep(5); for (int i=0; i=0) { - if ( write(dataFd, pRpcMsg->pCont, pRpcMsg->contLen) <0 ) { + if ( write(dataFd, rpcMsg.pCont, rpcMsg.contLen) <0 ) { tPrint("failed to write data file, reason:%s", strerror(errno)); } } @@ -65,22 +63,19 @@ void processShellMsg() { taosResetQitems(qall); for (int i=0; ipCont); - + rpcFreeCont(rpcMsg.pCont); rpcMsg.pCont = rpcMallocCont(msgSize); rpcMsg.contLen = msgSize; - rpcMsg.handle = pRpcMsg->handle; + rpcMsg.handle = rpcMsg.handle; rpcMsg.code = 1; rpcSendResponse(&rpcMsg); - - taosFreeQitem(pRpcMsg); } + taosFreeQitems(qall); } - taosFreeQall(qall); /* SRpcIpSet ipSet; ipSet.numOfIps = 1; @@ -114,21 +109,17 @@ int retrieveAuthInfo(char *meterId, char *spi, char *encrypt, char *secret, char } void processRequestMsg(SRpcMsg *pMsg) { - SRpcMsg *pTemp; - - pTemp = taosAllocateQitem(sizeof(SRpcMsg)); - memcpy(pTemp, pMsg, sizeof(SRpcMsg)); - - tTrace("request is received, type:%d, contLen:%d, item:%p", pMsg->msgType, pMsg->contLen, pTemp); - taosWriteQitem(qhandle, TAOS_QTYPE_RPC, pTemp); + tTrace("request is received, type:%d, contLen:%d", pMsg->msgType, pMsg->contLen); + taosWriteQitem(qhandle, pMsg); } int main(int argc, char *argv[]) { SRpcInit rpcInit; char dataName[20] = "server.data"; + char localIp[40] = "0.0.0.0"; memset(&rpcInit, 0, sizeof(rpcInit)); - rpcInit.localIp = "0.0.0.0"; + rpcInit.localIp = localIp; rpcInit.localPort = 7000; rpcInit.label = "SER"; rpcInit.numOfThreads = 1; @@ -154,7 +145,6 @@ int main(int argc, char *argv[]) { commit = atoi(argv[++i]); } else if (strcmp(argv[i], "-d")==0 && i < argc-1) { rpcDebugFlag = atoi(argv[++i]); - ddebugFlag = rpcDebugFlag; uDebugFlag = rpcDebugFlag; } else { printf("\nusage: %s [options] \n", argv[0]); From e485bb24b20612539ae6c6764cbb69aa11b253e5 Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Fri, 27 Mar 2020 21:39:11 +0800 Subject: [PATCH 2/7] roll back the rserver.c --- src/rpc/test/rserver.c | 35 ++++++++++++++++++++++------------- 1 file changed, 22 insertions(+), 13 deletions(-) diff --git a/src/rpc/test/rserver.c b/src/rpc/test/rserver.c index deb6135cef..d39caed5b0 100644 --- a/src/rpc/test/rserver.c +++ b/src/rpc/test/rserver.c @@ -28,23 +28,25 @@ void *qhandle = NULL; void processShellMsg() { static int num = 0; taos_qall qall; - SRpcMsg rpcMsg; + SRpcMsg *pRpcMsg, rpcMsg; + int type; + + qall = taosAllocateQall(); while (1) { - int numOfMsgs = taosReadAllQitems(qhandle, &qall); + int numOfMsgs = taosReadAllQitems(qhandle, qall); if (numOfMsgs <= 0) { usleep(1000); continue; } tTrace("%d shell msgs are received", numOfMsgs); - sleep(5); for (int i=0; i=0) { - if ( write(dataFd, rpcMsg.pCont, rpcMsg.contLen) <0 ) { + if ( write(dataFd, pRpcMsg->pCont, pRpcMsg->contLen) <0 ) { tPrint("failed to write data file, reason:%s", strerror(errno)); } } @@ -63,19 +65,22 @@ void processShellMsg() { taosResetQitems(qall); for (int i=0; ipCont); + rpcMsg.pCont = rpcMallocCont(msgSize); rpcMsg.contLen = msgSize; - rpcMsg.handle = rpcMsg.handle; + rpcMsg.handle = pRpcMsg->handle; rpcMsg.code = 1; rpcSendResponse(&rpcMsg); + + taosFreeQitem(pRpcMsg); } - taosFreeQitems(qall); } + taosFreeQall(qall); /* SRpcIpSet ipSet; ipSet.numOfIps = 1; @@ -109,8 +114,13 @@ int retrieveAuthInfo(char *meterId, char *spi, char *encrypt, char *secret, char } void processRequestMsg(SRpcMsg *pMsg) { - tTrace("request is received, type:%d, contLen:%d", pMsg->msgType, pMsg->contLen); - taosWriteQitem(qhandle, pMsg); + SRpcMsg *pTemp; + + pTemp = taosAllocateQitem(sizeof(SRpcMsg)); + memcpy(pTemp, pMsg, sizeof(SRpcMsg)); + + tTrace("request is received, type:%d, contLen:%d, item:%p", pMsg->msgType, pMsg->contLen, pTemp); + taosWriteQitem(qhandle, TAOS_QTYPE_RPC, pTemp); } int main(int argc, char *argv[]) { @@ -145,6 +155,7 @@ int main(int argc, char *argv[]) { commit = atoi(argv[++i]); } else if (strcmp(argv[i], "-d")==0 && i < argc-1) { rpcDebugFlag = atoi(argv[++i]); + ddebugFlag = rpcDebugFlag; uDebugFlag = rpcDebugFlag; } else { printf("\nusage: %s [options] \n", argv[0]); @@ -191,5 +202,3 @@ int main(int argc, char *argv[]) { return 0; } - - From 5e5f32b085df30dab5470d69cc9441b26690e730 Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Fri, 27 Mar 2020 22:19:32 +0800 Subject: [PATCH 3/7] add the code to handle the broken link --- src/rpc/src/rpcMain.c | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index 3b0137231f..9748f5d730 100755 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -799,6 +799,18 @@ static void rpcProcessBrokenLink(SRpcConn *pConn) { pContext->code = TSDB_CODE_NETWORK_UNAVAIL; taosTmrStart(rpcProcessConnError, 0, pContext, pRpc->tmrCtrl); } + + if (pConn->inType) { + // if there are pending request, notify the app + tTrace("%s %p, connection is gone, notify the app", pRpc->label, pConn); + SRpcMsg rpcMsg; + rpcMsg.pCont = NULL; + rpcMsg.contLen = 0; + rpcMsg.handle = pConn; + rpcMsg.msgType = pConn->inType; + rpcMsg.code = TSDB_CODE_NETWORK_UNAVAIL; + (*(pRpc->cfp))(&rpcMsg); + } rpcCloseConn(pConn); } From 92fa341fc56edeb94bf0d679ade08d493fed3683 Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Sat, 28 Mar 2020 15:45:17 +0800 Subject: [PATCH 4/7] add the code for synchronous client API --- src/inc/trpc.h | 1 + src/rpc/src/rpcMain.c | 52 ++++++++- src/rpc/test/CMakeLists.txt | 4 + src/rpc/test/rclient.c | 8 +- src/rpc/test/rsclient.c | 212 ++++++++++++++++++++++++++++++++++++ src/rpc/test/rserver.c | 2 +- 6 files changed, 268 insertions(+), 11 deletions(-) create mode 100644 src/rpc/test/rsclient.c diff --git a/src/inc/trpc.h b/src/inc/trpc.h index e545abfed3..a34d107474 100644 --- a/src/inc/trpc.h +++ b/src/inc/trpc.h @@ -85,6 +85,7 @@ void rpcSendRequest(void *thandle, SRpcIpSet *pIpSet, SRpcMsg *pMsg); void rpcSendResponse(SRpcMsg *pMsg); void rpcSendRedirectRsp(void *pConn, SRpcIpSet *pIpSet); int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo); +void rpcSendRecv(void *shandle, SRpcIpSet *pIpSet, SRpcMsg *pOut, SRpcMsg *pRsp); #ifdef __cplusplus } diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index 9748f5d730..e0d2191888 100755 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -82,6 +82,9 @@ typedef struct { int8_t oldInUse; // server IP inUse passed by app int8_t redirect; // flag to indicate redirect int8_t connType; // connection type + SRpcMsg *pRsp; // for synchronous API + tsem_t *pSem; // for synchronous API + SRpcIpSet *pSet; // for synchronous API char msg[0]; // RpcHead starts from here } SRpcReqContext; @@ -454,6 +457,26 @@ int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo) { return 0; } +void rpcSendRecv(void *shandle, SRpcIpSet *pIpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) { + SRpcReqContext *pContext; + pContext = (SRpcReqContext *) (pMsg->pCont-sizeof(SRpcHead)-sizeof(SRpcReqContext)); + + memset(pRsp, 0, sizeof(SRpcMsg)); + + tsem_t sem; + tsem_init(&sem, 0, 0); + pContext->pSem = &sem; + pContext->pRsp = pRsp; + pContext->pSet = pIpSet; + + rpcSendRequest(shandle, pIpSet, pMsg); + + tsem_wait(&sem); + tsem_destroy(&sem); + + return; +} + static void rpcFreeMsg(void *msg) { if ( msg ) { char *temp = (char *)msg - sizeof(SRpcReqContext); @@ -855,6 +878,26 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) { return pConn; } +static void rpcNotifyClient(SRpcReqContext *pContext, SRpcMsg *pMsg) { + SRpcInfo *pRpc = pContext->pRpc; + + if (pContext->pRsp) { + // for synchronous API + tsem_post(pContext->pSem); + memcpy(pContext->pSet, &pContext->ipSet, sizeof(SRpcIpSet)); + memcpy(pContext->pRsp, pMsg, sizeof(SRpcMsg)); + } else { + // for asynchronous API + if (pRpc->ufp && (pContext->ipSet.inUse != pContext->oldInUse || pContext->redirect)) + (*pRpc->ufp)(pContext->ahandle, &pContext->ipSet); // notify the update of ipSet + + (*pRpc->cfp)(pMsg); + } + + // free the request message + rpcFreeCont(pContext->pCont); +} + static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) { SRpcInfo *pRpc = pConn->pRpc; @@ -887,10 +930,7 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) { tTrace("%s %p, redirect is received, numOfIps:%d", pRpc->label, pConn, pContext->ipSet.numOfIps); rpcSendReqToServer(pRpc, pContext); } else { - if ( pRpc->ufp && (pContext->ipSet.inUse != pContext->oldInUse || pContext->redirect) ) - (*pRpc->ufp)(pContext->ahandle, &pContext->ipSet); // notify the update of ipSet - (*pRpc->cfp)(&rpcMsg); - rpcFreeCont(pContext->pCont); // free the request msg + rpcNotifyClient(pContext, &rpcMsg); } } } @@ -1059,8 +1099,8 @@ static void rpcProcessConnError(void *param, void *id) { rpcMsg.code = pContext->code; rpcMsg.pCont = NULL; rpcMsg.contLen = 0; - (*(pRpc->cfp))(&rpcMsg); - rpcFreeCont(pContext->pCont); // free the request msg + + rpcNotifyClient(pContext, &rpcMsg); } else { // move to next IP pContext->ipSet.inUse++; diff --git a/src/rpc/test/CMakeLists.txt b/src/rpc/test/CMakeLists.txt index 15780a396c..b519ae7578 100644 --- a/src/rpc/test/CMakeLists.txt +++ b/src/rpc/test/CMakeLists.txt @@ -11,6 +11,10 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) ADD_EXECUTABLE(rclient ${CLIENT_SRC}) TARGET_LINK_LIBRARIES(rclient trpc) + LIST(APPEND SCLIENT_SRC ./rsclient.c) + ADD_EXECUTABLE(rsclient ${SCLIENT_SRC}) + TARGET_LINK_LIBRARIES(rsclient trpc) + LIST(APPEND SERVER_SRC ./rserver.c) ADD_EXECUTABLE(rserver ${SERVER_SRC}) TARGET_LINK_LIBRARIES(rserver trpc) diff --git a/src/rpc/test/rclient.c b/src/rpc/test/rclient.c index 562d0fff96..f000ab91a2 100644 --- a/src/rpc/test/rclient.c +++ b/src/rpc/test/rclient.c @@ -40,7 +40,7 @@ typedef struct { void *pRpc; } SInfo; -void processResponse(SRpcMsg *pMsg) { +static void processResponse(SRpcMsg *pMsg) { SInfo *pInfo = (SInfo *)pMsg->handle; tTrace("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen, pMsg->code); @@ -49,16 +49,16 @@ void processResponse(SRpcMsg *pMsg) { sem_post(&pInfo->rspSem); } -void processUpdateIpSet(void *handle, SRpcIpSet *pIpSet) { +static void processUpdateIpSet(void *handle, SRpcIpSet *pIpSet) { SInfo *pInfo = (SInfo *)handle; tTrace("thread:%d, ip set is changed, index:%d", pInfo->index, pIpSet->inUse); pInfo->ipSet = *pIpSet; } -int tcount = 0; +static int tcount = 0; -void *sendRequest(void *param) { +static void *sendRequest(void *param) { SInfo *pInfo = (SInfo *)param; SRpcMsg rpcMsg; diff --git a/src/rpc/test/rsclient.c b/src/rpc/test/rsclient.c new file mode 100644 index 0000000000..b99387e097 --- /dev/null +++ b/src/rpc/test/rsclient.c @@ -0,0 +1,212 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include "os.h" +#include "tlog.h" +#include "trpc.h" +#include "taoserror.h" +#include +#include + +typedef struct { + int index; + SRpcIpSet ipSet; + int num; + int numOfReqs; + int msgSize; + sem_t rspSem; + sem_t *pOverSem; + pthread_t thread; + void *pRpc; +} SInfo; + +static void processUpdateIpSet(void *handle, SRpcIpSet *pIpSet) { + SInfo *pInfo = (SInfo *)handle; + + tTrace("thread:%d, ip set is changed, index:%d", pInfo->index, pIpSet->inUse); + pInfo->ipSet = *pIpSet; +} + +static int tcount = 0; +static int terror = 0; + +static void *sendRequest(void *param) { + SInfo *pInfo = (SInfo *)param; + SRpcMsg rpcMsg, rspMsg; + + tTrace("thread:%d, start to send request", pInfo->index); + + while ( pInfo->numOfReqs == 0 || pInfo->num < pInfo->numOfReqs) { + pInfo->num++; + rpcMsg.pCont = rpcMallocCont(pInfo->msgSize); + rpcMsg.contLen = pInfo->msgSize; + rpcMsg.handle = pInfo; + rpcMsg.msgType = 1; + tTrace("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num); + + rpcSendRecv(pInfo->pRpc, &pInfo->ipSet, &rpcMsg, &rspMsg); + + // handle response + if (rspMsg.code != 0) terror++; + + tTrace("thread:%d, rspLen:%d code:%d", pInfo->index, rspMsg.contLen, rspMsg.code); + + rpcFreeCont(rspMsg.pCont); + + if ( pInfo->num % 20000 == 0 ) + tPrint("thread:%d, %d requests have been sent", pInfo->index, pInfo->num); + } + + tTrace("thread:%d, it is over", pInfo->index); + tcount++; + + return NULL; +} + +int main(int argc, char *argv[]) { + SRpcInit rpcInit; + SRpcIpSet ipSet; + int msgSize = 128; + int numOfReqs = 0; + int appThreads = 1; + char serverIp[40] = "127.0.0.1"; + struct timeval systemTime; + int64_t startTime, endTime; + pthread_attr_t thattr; + + // server info + ipSet.numOfIps = 1; + ipSet.inUse = 0; + ipSet.port = 7000; + ipSet.ip[0] = inet_addr(serverIp); + ipSet.ip[1] = inet_addr("192.168.0.1"); + + // client info + memset(&rpcInit, 0, sizeof(rpcInit)); + rpcInit.localIp = "0.0.0.0"; + rpcInit.localPort = 0; + rpcInit.label = "APP"; + rpcInit.numOfThreads = 1; + // rpcInit.cfp = processResponse; + rpcInit.ufp = processUpdateIpSet; + rpcInit.sessions = 100; + rpcInit.idleTime = tsShellActivityTimer*1000; + rpcInit.user = "michael"; + rpcInit.secret = "mypassword"; + rpcInit.ckey = "key"; + rpcInit.spi = 1; + rpcInit.connType = TAOS_CONN_CLIENT; + + for (int i=1; iindex = i; + pInfo->ipSet = ipSet; + pInfo->numOfReqs = numOfReqs; + pInfo->msgSize = msgSize; + sem_init(&pInfo->rspSem, 0, 0); + pInfo->pRpc = pRpc; + pthread_create(&pInfo->thread, &thattr, sendRequest, pInfo); + pInfo++; + } + + do { + usleep(1); + } while ( tcount < appThreads); + + gettimeofday(&systemTime, NULL); + endTime = systemTime.tv_sec*1000000 + systemTime.tv_usec; + float usedTime = (endTime - startTime)/1000.0; // mseconds + + tPrint("it takes %.3f mseconds to send %d requests to server, error num:%d", usedTime, numOfReqs*appThreads, terror); + tPrint("Performance: %.3f requests per second, msgSize:%d bytes", 1000.0*numOfReqs*appThreads/usedTime, msgSize); + + taosCloseLogger(); + + return 0; +} + + diff --git a/src/rpc/test/rserver.c b/src/rpc/test/rserver.c index d39caed5b0..48ae02a1d5 100644 --- a/src/rpc/test/rserver.c +++ b/src/rpc/test/rserver.c @@ -72,7 +72,7 @@ void processShellMsg() { rpcMsg.pCont = rpcMallocCont(msgSize); rpcMsg.contLen = msgSize; rpcMsg.handle = pRpcMsg->handle; - rpcMsg.code = 1; + rpcMsg.code = 0; rpcSendResponse(&rpcMsg); taosFreeQitem(pRpcMsg); From 87bf983d488ad915853d2e79e4e58ca248ef6c59 Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Sun, 29 Mar 2020 23:28:06 +0800 Subject: [PATCH 5/7] first draft for WAL --- src/vnode/wal/CMakeLists.txt | 15 +- src/vnode/wal/inc/{vnodeWal.h => twal.h} | 31 ++- src/vnode/wal/src/vnodeWal.c | 27 -- src/vnode/wal/src/walMain.c | 316 +++++++++++++++++++++++ src/vnode/wal/test/CMakeLists.txt | 16 ++ src/vnode/wal/test/waltest.c | 112 ++++++++ 6 files changed, 481 insertions(+), 36 deletions(-) rename src/vnode/wal/inc/{vnodeWal.h => twal.h} (56%) delete mode 100644 src/vnode/wal/src/vnodeWal.c create mode 100644 src/vnode/wal/src/walMain.c create mode 100644 src/vnode/wal/test/CMakeLists.txt create mode 100644 src/vnode/wal/test/waltest.c diff --git a/src/vnode/wal/CMakeLists.txt b/src/vnode/wal/CMakeLists.txt index 1de958f84e..d77a235bb9 100644 --- a/src/vnode/wal/CMakeLists.txt +++ b/src/vnode/wal/CMakeLists.txt @@ -1,4 +1,15 @@ +CMAKE_MINIMUM_REQUIRED(VERSION 2.8) +PROJECT(TDengine) + +INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc) +INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc) +INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc) +INCLUDE_DIRECTORIES(inc) + AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR}/src SRC) -ADD_LIBRARY(wal ${SRC}) -TARGET_INCLUDE_DIRECTORIES(wal PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/inc) \ No newline at end of file +ADD_LIBRARY(twal ${SRC}) +TARGET_LINK_LIBRARIES(twal tutil) + +ADD_SUBDIRECTORY(test) + diff --git a/src/vnode/wal/inc/vnodeWal.h b/src/vnode/wal/inc/twal.h similarity index 56% rename from src/vnode/wal/inc/vnodeWal.h rename to src/vnode/wal/inc/twal.h index 7753e4ecca..34491c993d 100644 --- a/src/vnode/wal/inc/vnodeWal.h +++ b/src/vnode/wal/inc/twal.h @@ -14,19 +14,36 @@ */ #ifndef _TD_WAL_H_ #define _TD_WAL_H_ -#include #ifdef __cplusplus extern "C" { #endif -typedef void walh; // WAL HANDLE +#define TAOS_WAL_NOLOG 0 +#define TAOS_WAL_WRITE 1 +#define TAOS_WAL_FSYNC 2 + +typedef struct { + uint32_t signature; + uint32_t cksum; + int8_t msgType; + int8_t reserved[3]; + int32_t len; + uint64_t version; + char cont[]; +} SWalHead; + +typedef void* twal_h; // WAL HANDLE + +twal_h walOpen(char *path, int max, int level); +void walClose(twal_h); +int walRenew(twal_h); +int walWrite(twal_h, SWalHead *); +void walFsync(twal_h); +int walRestore(twal_h, void *pVnode, int (*writeFp)(void *ahandle, void *pWalHead)); + +extern int wDebugFlag; -walh *vnodeOpenWal(int vnode, uint8_t op); -int vnodeCloseWal(walh *pWal); -int vnodeRenewWal(walh *pWal); -int vnodeWriteWal(walh *pWal, void *cont, int contLen); -int vnodeSyncWal(walh *pWal); #ifdef __cplusplus } diff --git a/src/vnode/wal/src/vnodeWal.c b/src/vnode/wal/src/vnodeWal.c deleted file mode 100644 index 528cc97ed6..0000000000 --- a/src/vnode/wal/src/vnodeWal.c +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ -#include - -#include "vnodeWal.h" - -typedef struct { - /* TODO */ -} SWal; - -walh *vnodeOpenWal(int vnode, uint8_t op) { return NULL; } -int vnodeCloseWal(walh *pWal) { return 0; } -int vnodeRenewWal(walh *pWal) { return 0; } -int vnodeWriteWal(walh *pWal, void *cont, int contLen) { return 0; } -int vnodeSyncWal(walh *pWal) { return 0; } \ No newline at end of file diff --git a/src/vnode/wal/src/walMain.c b/src/vnode/wal/src/walMain.c new file mode 100644 index 0000000000..b3d14eb179 --- /dev/null +++ b/src/vnode/wal/src/walMain.c @@ -0,0 +1,316 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include +#include +#include +#include +#include +#include +#include + +#include "os.h" +#include "tlog.h" +#include "tutil.h" +#include "twal.h" + +#define walPrefix "wal" +#define wError(...) if (wDebugFlag & DEBUG_ERROR) {tprintf("ERROR WAL ", wDebugFlag, __VA_ARGS__);} +#define wWarn(...) if (wDebugFlag & DEBUG_WARN) {tprintf("WARN WAL ", wDebugFlag, __VA_ARGS__);} +#define wTrace(...) if (wDebugFlag & DEBUG_TRACE) {tprintf("WAL ", wDebugFlag, __VA_ARGS__);} +#define wPrint(...) {tprintf("WAL ", 255, __VA_ARGS__);} + +typedef struct { + int fd; + int level; + int max; // maximum number of wal files + uint32_t id; // increase continuously + int num; // number of wal files + char path[TSDB_FILENAME_LEN]; + char name[TSDB_FILENAME_LEN]; +} SWal; + +int wDebugFlag = 135; + +static uint32_t walSignature = 0xFAFBFDFE; +static int walHandleExistingFiles(char *path); +static int walRestoreWalFile(char *name, void *pVnode, int (*writeFp)(void *, void *)); +static int walRemoveWalFiles(char *path); + +void *walOpen(char *path, int max, int level) { + SWal *pWal = calloc(sizeof(SWal), 1); + if (pWal == NULL) return NULL; + + pWal->fd = -1; + pWal->max = max; + pWal->id = 0; + pWal->num = 0; + pWal->level = level; + strcpy(pWal->path, path); + + if (access(path, F_OK) != 0) mkdir(path, 0755); + + if (walHandleExistingFiles(path) == 0) + walRenew(pWal); + + if (pWal->fd <0) { + wError("wal:%s, failed to open", path); + free(pWal); + pWal = NULL; + } + + return pWal; +} + +void walClose(void *handle) { + + SWal *pWal = (SWal *)handle; + + close(pWal->fd); + + // remove all files in the directory + for (int i=0; inum; ++i) { + sprintf(pWal->name, "%s/%s%010d", pWal->path, walPrefix, pWal->id-i); + if (remove(pWal->name) <0) { + wError("wal:%s, failed to remove", pWal->name); + } else { + wTrace("wal:%s, it is removed", pWal->name); + } + } +} + +int walRenew(twal_h handle) { + SWal *pWal = (SWal *)handle; + + if (pWal->fd >=0) { + close(pWal->fd); + pWal->id++; + wTrace("wal:%s, it is closed", pWal->name); + } + + sprintf(pWal->name, "%s/%s%010d", pWal->path, walPrefix, pWal->id); + pWal->fd = open(pWal->name, O_WRONLY | O_CREAT, S_IRWXU | S_IRWXG | S_IRWXO); + if (pWal->fd < 0) { + wError("wal:%d, failed to open(%s)", pWal->name, strerror(errno)); + return -1; + } + + wTrace("wal:%s, it is open", pWal->name); + + pWal->num++; + if (pWal->num > pWal->max) { + // remove the oldest wal file + char name[TSDB_FILENAME_LEN]; + sprintf(name, "%s/%s%010d", pWal->path, walPrefix, pWal->id - pWal->max); + if (remove(name) <0) { + wError("wal:%s, failed to remove(%s)", name, strerror(errno)); + } else { + wTrace("wal:%s, it is removed", name); + } + + pWal->num--; + } + + return 0; +} + +int walWrite(void *handle, SWalHead *pHead) { + SWal *pWal = (SWal *)handle; + int code = 0; + + // no wal + if (pWal->level == TAOS_WAL_NOLOG) return 0; + + pHead->signature = walSignature; + int contLen = pHead->len + sizeof(SWalHead); + + if(write(pWal->fd, pHead, contLen) != contLen) { + wError("wal:%s, failed to write(%s)", pWal->name, strerror(errno)); + code = -1; + } + + return code; +} + +void walFsync(void *handle) { + + SWal *pWal = (SWal *)handle; + + if (pWal->level == TAOS_WAL_FSYNC) + fsync(pWal->fd); +} + +int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *)) { + SWal *pWal = (SWal *)handle; + int code = 0; + struct dirent *ent; + int count = 0; + uint32_t maxId = 0, minId = -1, index =0; + + int plen = strlen(walPrefix); + char opath[TSDB_FILENAME_LEN]; + sprintf(opath, "%s/old", pWal->path); + + // is there old directory? + if (access(opath, F_OK)) return 0; + + DIR *dir = opendir(opath); + while ((ent = readdir(dir))!= NULL) { + if ( strncmp(ent->d_name, walPrefix, plen) == 0) { + index = atol(ent->d_name + plen); + if (index > maxId) maxId = index; + if (index < minId) minId = index; + count++; + } + } + + if ( count != (maxId-minId+1) ) { + wError("wal:%s, messed up, count:%d max:%ld min:%ld", opath, count, maxId, minId); + code = -1; + } else { + wTrace("wal:%s, %d files will be restored", opath, count); + + for (index = minId; index<=maxId; ++index) { + sprintf(pWal->name, "%s/old/%s%010d", pWal->path, walPrefix, index); + code = walRestoreWalFile(pWal->name, pVnode, writeFp); + if (code < 0) break; + } + } + + if (code == 0) { + code = walRemoveWalFiles(opath); + if (code == 0) { + if (remove(opath) < 0) { + wError("wal:%s, failed to remove directory(%s)", opath, strerror(errno)); + code = -1; + } + } + } + + closedir(dir); + + return code; +} + +static int walRestoreWalFile(char *name, void *pVnode, int (*writeFp)(void *, void *)) { + SWalHead walHead; + int code = -1; + + int fd = open(name, O_RDONLY); + if (fd < 0) { + wError("wal:%s, failed to open for restore(%s)", name, strerror(errno)); + return -1; + } + + wTrace("wal:%s, start to restore", name); + + while (1) { + int ret = read(fd, &walHead, sizeof(walHead)); + if ( ret == 0) { code = 0; break;} + + if (ret != sizeof(walHead)) { + wError("wal:%s, failed to read(%s)", name, strerror(errno)); + break; + } + + if (walHead.signature != walSignature) { + wError("wal:%s, file is messed up, signature:", name, walHead.signature); + break; + } + + char *buffer = malloc(sizeof(SWalHead) + walHead.len); + memcpy(buffer, &walHead, sizeof(walHead)); + + ret = read(fd, buffer+sizeof(walHead), walHead.len); + if ( ret != walHead.len) { + wError("wal:%s, failed to read(%s)", name, strerror(errno)); + break; + } + + // write into queue + (*writeFp)(pVnode, buffer); + } + + return code; +} + +int walHandleExistingFiles(char *path) { + int code = 0; + char oname[TSDB_FILENAME_LEN]; + char nname[TSDB_FILENAME_LEN]; + char opath[TSDB_FILENAME_LEN]; + + sprintf(opath, "%s/old", path); + + struct dirent *ent; + DIR *dir = opendir(path); + int plen = strlen(walPrefix); + + if (access(opath, F_OK) == 0) { + // old directory is there, it means restore process is not finished + walRemoveWalFiles(path); + + } else { + // move all files to old directory + int count = 0; + while ((ent = readdir(dir))!= NULL) { + if ( strncmp(ent->d_name, walPrefix, plen) == 0) { + if (access(opath, F_OK) != 0) mkdir(opath, 0755); + + sprintf(oname, "%s/%s", path, ent->d_name); + sprintf(nname, "%s/old/%s", path, ent->d_name); + if (rename(oname, nname) < 0) { + wError("wal:%s, failed to move to new:%s", oname, nname); + code = -1; + break; + } + + count++; + } + } + + wTrace("wal:%s, %d files are moved for restoration", path, count); + } + + closedir(dir); + return code; +} + +static int walRemoveWalFiles(char *path) { + int plen = strlen(walPrefix); + char name[TSDB_FILENAME_LEN]; + int code = 0; + + if (access(path, F_OK) != 0) return 0; + + struct dirent *ent; + DIR *dir = opendir(path); + + while ((ent = readdir(dir))!= NULL) { + if ( strncmp(ent->d_name, walPrefix, plen) == 0) { + sprintf(name, "%s/%s", path, ent->d_name); + if (remove(name) <0) { + wError("wal:%s, failed to remove(%s)", name, strerror(errno)); + code = -1; break; + } + } + } + + closedir(dir); + + return code; +} + + diff --git a/src/vnode/wal/test/CMakeLists.txt b/src/vnode/wal/test/CMakeLists.txt new file mode 100644 index 0000000000..06591def40 --- /dev/null +++ b/src/vnode/wal/test/CMakeLists.txt @@ -0,0 +1,16 @@ +CMAKE_MINIMUM_REQUIRED(VERSION 2.8) +PROJECT(TDengine) + +IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) + INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc) + INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc) + INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc) + INCLUDE_DIRECTORIES(../inc) + + LIST(APPEND WALTEST_SRC ./waltest.c) + ADD_EXECUTABLE(waltest ${WALTEST_SRC}) + TARGET_LINK_LIBRARIES(waltest twal) + +ENDIF () + + diff --git a/src/vnode/wal/test/waltest.c b/src/vnode/wal/test/waltest.c new file mode 100644 index 0000000000..cdeed6b3d4 --- /dev/null +++ b/src/vnode/wal/test/waltest.c @@ -0,0 +1,112 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +//#define _DEFAULT_SOURCE +#include +#include "tlog.h" +#include "twal.h" + +int64_t ver = 0; +void *pWal = NULL; + +int writeToQueue(void *pVnode, void *data) { + SWalHead *pHead = (SWalHead *)data; + + // do nothing + if (pHead->version > ver) + ver = pHead->version; + + walWrite(pWal, pHead); + + free(data); + + return 0; +} + +int main(int argc, char *argv[]) { + char path[128] = "/home/jhtao/test/wal"; + int max = 3; + int level = 2; + int total = 5; + int rows = 10000; + int size = 128; + + for (int i=1; iversion = ++ver; + walWrite(pWal, pHead); + } + + printf("renew a wal, i:%d\n", i); + walRenew(pWal); + } + + printf("%d wal files are written\n", total); + getchar(); + + walClose(pWal); + + return 0; +} From 6b02bd2630339024f009e2d479c0396ecd8b01ed Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Mon, 30 Mar 2020 08:53:44 +0800 Subject: [PATCH 6/7] add API walGetWalFile for synchronization --- src/vnode/wal/inc/twal.h | 5 +- src/vnode/wal/src/walMain.c | 97 +++++++++++++++++++++++++----------- src/vnode/wal/test/waltest.c | 18 +++++++ 3 files changed, 90 insertions(+), 30 deletions(-) diff --git a/src/vnode/wal/inc/twal.h b/src/vnode/wal/inc/twal.h index 34491c993d..fffbb45db2 100644 --- a/src/vnode/wal/inc/twal.h +++ b/src/vnode/wal/inc/twal.h @@ -24,12 +24,12 @@ extern "C" { #define TAOS_WAL_FSYNC 2 typedef struct { - uint32_t signature; - uint32_t cksum; int8_t msgType; int8_t reserved[3]; int32_t len; uint64_t version; + uint32_t signature; + uint32_t cksum; char cont[]; } SWalHead; @@ -41,6 +41,7 @@ int walRenew(twal_h); int walWrite(twal_h, SWalHead *); void walFsync(twal_h); int walRestore(twal_h, void *pVnode, int (*writeFp)(void *ahandle, void *pWalHead)); +int walGetWalFile(twal_h, char *name, int32_t *index); extern int wDebugFlag; diff --git a/src/vnode/wal/src/walMain.c b/src/vnode/wal/src/walMain.c index b3d14eb179..6751940d1b 100644 --- a/src/vnode/wal/src/walMain.c +++ b/src/vnode/wal/src/walMain.c @@ -23,6 +23,7 @@ #include "os.h" #include "tlog.h" +#include "tchecksum.h" #include "tutil.h" #include "twal.h" @@ -36,10 +37,11 @@ typedef struct { int fd; int level; int max; // maximum number of wal files - uint32_t id; // increase continuously + uint64_t id; // increase continuously int num; // number of wal files char path[TSDB_FILENAME_LEN]; char name[TSDB_FILENAME_LEN]; + pthread_mutex_t mutex; } SWal; int wDebugFlag = 135; @@ -59,6 +61,7 @@ void *walOpen(char *path, int max, int level) { pWal->num = 0; pWal->level = level; strcpy(pWal->path, path); + pthread_mutex_init(&pWal->mutex, NULL); if (access(path, F_OK) != 0) mkdir(path, 0755); @@ -67,9 +70,10 @@ void *walOpen(char *path, int max, int level) { if (pWal->fd <0) { wError("wal:%s, failed to open", path); + pthread_mutex_destroy(&pWal->mutex); free(pWal); pWal = NULL; - } + } return pWal; } @@ -82,48 +86,59 @@ void walClose(void *handle) { // remove all files in the directory for (int i=0; inum; ++i) { - sprintf(pWal->name, "%s/%s%010d", pWal->path, walPrefix, pWal->id-i); + sprintf(pWal->name, "%s/%s%ld", pWal->path, walPrefix, pWal->id-i); if (remove(pWal->name) <0) { wError("wal:%s, failed to remove", pWal->name); } else { wTrace("wal:%s, it is removed", pWal->name); } } + + pthread_mutex_destroy(&pWal->mutex); + + free(pWal); } int walRenew(twal_h handle) { SWal *pWal = (SWal *)handle; + int code = 0; + pthread_mutex_lock(&pWal->mutex); + if (pWal->fd >=0) { close(pWal->fd); pWal->id++; wTrace("wal:%s, it is closed", pWal->name); } - sprintf(pWal->name, "%s/%s%010d", pWal->path, walPrefix, pWal->id); + pWal->num++; + + sprintf(pWal->name, "%s/%s%ld", pWal->path, walPrefix, pWal->id); pWal->fd = open(pWal->name, O_WRONLY | O_CREAT, S_IRWXU | S_IRWXG | S_IRWXO); + if (pWal->fd < 0) { wError("wal:%d, failed to open(%s)", pWal->name, strerror(errno)); - return -1; - } + code = -1; + } else { + wTrace("wal:%s, it is created", pWal->name); - wTrace("wal:%s, it is open", pWal->name); + if (pWal->num > pWal->max) { + // remove the oldest wal file + char name[TSDB_FILENAME_LEN]; + sprintf(name, "%s/%s%ld", pWal->path, walPrefix, pWal->id - pWal->max); + if (remove(name) <0) { + wError("wal:%s, failed to remove(%s)", name, strerror(errno)); + } else { + wTrace("wal:%s, it is removed", name); + } - pWal->num++; - if (pWal->num > pWal->max) { - // remove the oldest wal file - char name[TSDB_FILENAME_LEN]; - sprintf(name, "%s/%s%010d", pWal->path, walPrefix, pWal->id - pWal->max); - if (remove(name) <0) { - wError("wal:%s, failed to remove(%s)", name, strerror(errno)); - } else { - wTrace("wal:%s, it is removed", name); + pWal->num--; } - - pWal->num--; } - return 0; + pthread_mutex_unlock(&pWal->mutex); + + return code; } int walWrite(void *handle, SWalHead *pHead) { @@ -134,6 +149,7 @@ int walWrite(void *handle, SWalHead *pHead) { if (pWal->level == TAOS_WAL_NOLOG) return 0; pHead->signature = walSignature; + taosCalcChecksumAppend(0, (uint8_t *)pHead, sizeof(SWal)); int contLen = pHead->len + sizeof(SWalHead); if(write(pWal->fd, pHead, contLen) != contLen) { @@ -157,7 +173,7 @@ int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *)) { int code = 0; struct dirent *ent; int count = 0; - uint32_t maxId = 0, minId = -1, index =0; + uint64_t maxId = 0, minId = -1, index =0; int plen = strlen(walPrefix); char opath[TSDB_FILENAME_LEN]; @@ -169,7 +185,7 @@ int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *)) { DIR *dir = opendir(opath); while ((ent = readdir(dir))!= NULL) { if ( strncmp(ent->d_name, walPrefix, plen) == 0) { - index = atol(ent->d_name + plen); + index = atoll(ent->d_name + plen); if (index > maxId) maxId = index; if (index < minId) minId = index; count++; @@ -183,7 +199,7 @@ int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *)) { wTrace("wal:%s, %d files will be restored", opath, count); for (index = minId; index<=maxId; ++index) { - sprintf(pWal->name, "%s/old/%s%010d", pWal->path, walPrefix, index); + sprintf(pWal->name, "%s/old/%s%ld", pWal->path, walPrefix, index); code = walRestoreWalFile(pWal->name, pVnode, writeFp); if (code < 0) break; } @@ -204,9 +220,34 @@ int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *)) { return code; } +int walGetWalFile(void *handle, char *name, int32_t *index) { + SWal *pWal = (SWal *)handle; + int code = 1; + int32_t first = 0; + + name[0] = 0; + if (pWal == NULL || pWal->num == 0) return 0; + + pthread_mutex_lock(&(pWal->mutex)); + + first = pWal->id + 1 - pWal->num; + if (*index == 0) *index = first; // set to first one + + if (*index < first && *index > pWal->id) { + code = -1; // index out of range + } else { + sprintf(name, "%s/%s%ld", pWal->path, walPrefix, *index); + code = (*index == pWal->id) ? 0:1; + } + + pthread_mutex_unlock(&(pWal->mutex)); + + return code; +} + static int walRestoreWalFile(char *name, void *pVnode, int (*writeFp)(void *, void *)) { SWalHead walHead; - int code = -1; + int code = 0; int fd = open(name, O_RDONLY); if (fd < 0) { @@ -221,21 +262,21 @@ static int walRestoreWalFile(char *name, void *pVnode, int (*writeFp)(void *, vo if ( ret == 0) { code = 0; break;} if (ret != sizeof(walHead)) { - wError("wal:%s, failed to read(%s)", name, strerror(errno)); + wWarn("wal:%s, failed to read head, skip, ret:%d(%s)", name, ret, strerror(errno)); break; } - if (walHead.signature != walSignature) { - wError("wal:%s, file is messed up, signature:", name, walHead.signature); + if (taosCheckChecksumWhole((uint8_t *)&walHead, sizeof(walHead))) { + wWarn("wal:%s, cksum is messed up, skip the rest of file", name); break; - } + } char *buffer = malloc(sizeof(SWalHead) + walHead.len); memcpy(buffer, &walHead, sizeof(walHead)); ret = read(fd, buffer+sizeof(walHead), walHead.len); if ( ret != walHead.len) { - wError("wal:%s, failed to read(%s)", name, strerror(errno)); + wWarn("wal:%s, failed to read body, skip, len:%d ret:%d", name, walHead.len, ret); break; } diff --git a/src/vnode/wal/test/waltest.c b/src/vnode/wal/test/waltest.c index cdeed6b3d4..768c9d54d7 100644 --- a/src/vnode/wal/test/waltest.c +++ b/src/vnode/wal/test/waltest.c @@ -96,6 +96,7 @@ int main(int argc, char *argv[]) { for (int i=0; iversion = ++ver; + pHead->len = size; walWrite(pWal, pHead); } @@ -104,6 +105,23 @@ int main(int argc, char *argv[]) { } printf("%d wal files are written\n", total); + + int32_t index = 0; + char name[256]; + + while (1) { + int code = walGetWalFile(pWal, name, &index); + if (code == -1) { + printf("failed to get wal file, index:%d\n", index); + break; + } + + printf("index:%d wal:%s\n", index, name); + if (code == 0) break; + + index++; + } + getchar(); walClose(pWal); From 7337f7593da6ef04bab81b8eda4c726494431006 Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Mon, 30 Mar 2020 12:13:09 +0800 Subject: [PATCH 7/7] add new API walGetWalFile --- src/vnode/wal/inc/twal.h | 2 +- src/vnode/wal/src/walMain.c | 20 ++++++++++---------- src/vnode/wal/test/waltest.c | 2 +- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/src/vnode/wal/inc/twal.h b/src/vnode/wal/inc/twal.h index fffbb45db2..49fcde9e28 100644 --- a/src/vnode/wal/inc/twal.h +++ b/src/vnode/wal/inc/twal.h @@ -41,7 +41,7 @@ int walRenew(twal_h); int walWrite(twal_h, SWalHead *); void walFsync(twal_h); int walRestore(twal_h, void *pVnode, int (*writeFp)(void *ahandle, void *pWalHead)); -int walGetWalFile(twal_h, char *name, int32_t *index); +int walGetWalFile(twal_h, char *name, uint32_t *index); extern int wDebugFlag; diff --git a/src/vnode/wal/src/walMain.c b/src/vnode/wal/src/walMain.c index 6751940d1b..f327c28ce3 100644 --- a/src/vnode/wal/src/walMain.c +++ b/src/vnode/wal/src/walMain.c @@ -37,7 +37,7 @@ typedef struct { int fd; int level; int max; // maximum number of wal files - uint64_t id; // increase continuously + uint32_t id; // increase continuously int num; // number of wal files char path[TSDB_FILENAME_LEN]; char name[TSDB_FILENAME_LEN]; @@ -86,7 +86,7 @@ void walClose(void *handle) { // remove all files in the directory for (int i=0; inum; ++i) { - sprintf(pWal->name, "%s/%s%ld", pWal->path, walPrefix, pWal->id-i); + sprintf(pWal->name, "%s/%s%d", pWal->path, walPrefix, pWal->id-i); if (remove(pWal->name) <0) { wError("wal:%s, failed to remove", pWal->name); } else { @@ -113,7 +113,7 @@ int walRenew(twal_h handle) { pWal->num++; - sprintf(pWal->name, "%s/%s%ld", pWal->path, walPrefix, pWal->id); + sprintf(pWal->name, "%s/%s%d", pWal->path, walPrefix, pWal->id); pWal->fd = open(pWal->name, O_WRONLY | O_CREAT, S_IRWXU | S_IRWXG | S_IRWXO); if (pWal->fd < 0) { @@ -125,7 +125,7 @@ int walRenew(twal_h handle) { if (pWal->num > pWal->max) { // remove the oldest wal file char name[TSDB_FILENAME_LEN]; - sprintf(name, "%s/%s%ld", pWal->path, walPrefix, pWal->id - pWal->max); + sprintf(name, "%s/%s%d", pWal->path, walPrefix, pWal->id - pWal->max); if (remove(name) <0) { wError("wal:%s, failed to remove(%s)", name, strerror(errno)); } else { @@ -173,7 +173,7 @@ int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *)) { int code = 0; struct dirent *ent; int count = 0; - uint64_t maxId = 0, minId = -1, index =0; + uint32_t maxId = 0, minId = -1, index =0; int plen = strlen(walPrefix); char opath[TSDB_FILENAME_LEN]; @@ -185,7 +185,7 @@ int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *)) { DIR *dir = opendir(opath); while ((ent = readdir(dir))!= NULL) { if ( strncmp(ent->d_name, walPrefix, plen) == 0) { - index = atoll(ent->d_name + plen); + index = atol(ent->d_name + plen); if (index > maxId) maxId = index; if (index < minId) minId = index; count++; @@ -193,13 +193,13 @@ int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *)) { } if ( count != (maxId-minId+1) ) { - wError("wal:%s, messed up, count:%d max:%ld min:%ld", opath, count, maxId, minId); + wError("wal:%s, messed up, count:%d max:%d min:%d", opath, count, maxId, minId); code = -1; } else { wTrace("wal:%s, %d files will be restored", opath, count); for (index = minId; index<=maxId; ++index) { - sprintf(pWal->name, "%s/old/%s%ld", pWal->path, walPrefix, index); + sprintf(pWal->name, "%s/old/%s%d", pWal->path, walPrefix, index); code = walRestoreWalFile(pWal->name, pVnode, writeFp); if (code < 0) break; } @@ -220,7 +220,7 @@ int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *)) { return code; } -int walGetWalFile(void *handle, char *name, int32_t *index) { +int walGetWalFile(void *handle, char *name, uint32_t *index) { SWal *pWal = (SWal *)handle; int code = 1; int32_t first = 0; @@ -236,7 +236,7 @@ int walGetWalFile(void *handle, char *name, int32_t *index) { if (*index < first && *index > pWal->id) { code = -1; // index out of range } else { - sprintf(name, "%s/%s%ld", pWal->path, walPrefix, *index); + sprintf(name, "%s/%s%d", pWal->path, walPrefix, *index); code = (*index == pWal->id) ? 0:1; } diff --git a/src/vnode/wal/test/waltest.c b/src/vnode/wal/test/waltest.c index 768c9d54d7..e90b54d1f3 100644 --- a/src/vnode/wal/test/waltest.c +++ b/src/vnode/wal/test/waltest.c @@ -106,7 +106,7 @@ int main(int argc, char *argv[]) { printf("%d wal files are written\n", total); - int32_t index = 0; + uint32_t index = 0; char name[256]; while (1) {