Merge pull request #2147 from taosdata/hotfix/coverity
fix the coveraity scan bugs
This commit is contained in:
commit
d035016817
|
@ -127,7 +127,7 @@ void rpcAddConnIntoCache(void *handle, void *data, char *fqdn, uint16_t port, in
|
||||||
|
|
||||||
hash = rpcHashConn(pCache, fqdn, port, connType);
|
hash = rpcHashConn(pCache, fqdn, port, connType);
|
||||||
pNode = (SConnHash *)taosMemPoolMalloc(pCache->connHashMemPool);
|
pNode = (SConnHash *)taosMemPoolMalloc(pCache->connHashMemPool);
|
||||||
strcpy(pNode->fqdn, fqdn);
|
tstrncpy(pNode->fqdn, fqdn, sizeof(pNode->fqdn));
|
||||||
pNode->port = port;
|
pNode->port = port;
|
||||||
pNode->connType = connType;
|
pNode->connType = connType;
|
||||||
pNode->data = data;
|
pNode->data = data;
|
||||||
|
|
|
@ -60,7 +60,7 @@ typedef struct {
|
||||||
|
|
||||||
void *idPool; // handle to ID pool
|
void *idPool; // handle to ID pool
|
||||||
void *tmrCtrl; // handle to timer
|
void *tmrCtrl; // handle to timer
|
||||||
void *hash; // handle returned by hash utility
|
SHashObj *hash; // handle returned by hash utility
|
||||||
void *tcphandle;// returned handle from TCP initialization
|
void *tcphandle;// returned handle from TCP initialization
|
||||||
void *udphandle;// returned handle from UDP initialization
|
void *udphandle;// returned handle from UDP initialization
|
||||||
void *pCache; // connection cache
|
void *pCache; // connection cache
|
||||||
|
@ -211,7 +211,7 @@ void *rpcOpen(const SRpcInit *pInit) {
|
||||||
pRpc = (SRpcInfo *)calloc(1, sizeof(SRpcInfo));
|
pRpc = (SRpcInfo *)calloc(1, sizeof(SRpcInfo));
|
||||||
if (pRpc == NULL) return NULL;
|
if (pRpc == NULL) return NULL;
|
||||||
|
|
||||||
if(pInit->label) strcpy(pRpc->label, pInit->label);
|
if(pInit->label) tstrncpy(pRpc->label, pInit->label, sizeof(pRpc->label));
|
||||||
pRpc->connType = pInit->connType;
|
pRpc->connType = pInit->connType;
|
||||||
pRpc->idleTime = pInit->idleTime;
|
pRpc->idleTime = pInit->idleTime;
|
||||||
pRpc->numOfThreads = pInit->numOfThreads>TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS:pInit->numOfThreads;
|
pRpc->numOfThreads = pInit->numOfThreads>TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS:pInit->numOfThreads;
|
||||||
|
@ -228,7 +228,7 @@ void *rpcOpen(const SRpcInit *pInit) {
|
||||||
size_t size = sizeof(SRpcConn) * pRpc->sessions;
|
size_t size = sizeof(SRpcConn) * pRpc->sessions;
|
||||||
pRpc->connList = (SRpcConn *)calloc(1, size);
|
pRpc->connList = (SRpcConn *)calloc(1, size);
|
||||||
if (pRpc->connList == NULL) {
|
if (pRpc->connList == NULL) {
|
||||||
tError("%s failed to allocate memory for taos connections, size:%d", pRpc->label, size);
|
tError("%s failed to allocate memory for taos connections, size:%ld", pRpc->label, size);
|
||||||
rpcClose(pRpc);
|
rpcClose(pRpc);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -459,7 +459,7 @@ int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo) {
|
||||||
pInfo->clientPort = pConn->peerPort;
|
pInfo->clientPort = pConn->peerPort;
|
||||||
// pInfo->serverIp = pConn->destIp;
|
// pInfo->serverIp = pConn->destIp;
|
||||||
|
|
||||||
strcpy(pInfo->user, pConn->user);
|
strncpy(pInfo->user, pConn->user, sizeof(pInfo->user));
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -503,10 +503,10 @@ static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerFqdn, uint16_t peerPort,
|
||||||
pConn = rpcAllocateClientConn(pRpc);
|
pConn = rpcAllocateClientConn(pRpc);
|
||||||
|
|
||||||
if (pConn) {
|
if (pConn) {
|
||||||
strcpy(pConn->peerFqdn, peerFqdn);
|
tstrncpy(pConn->peerFqdn, peerFqdn, sizeof(pConn->peerFqdn));
|
||||||
pConn->peerIp = peerIp;
|
pConn->peerIp = peerIp;
|
||||||
pConn->peerPort = peerPort;
|
pConn->peerPort = peerPort;
|
||||||
strcpy(pConn->user, pRpc->user);
|
tstrncpy(pConn->user, pRpc->user, sizeof(pConn->user));
|
||||||
pConn->connType = connType;
|
pConn->connType = connType;
|
||||||
|
|
||||||
if (taosOpenConn[connType]) {
|
if (taosOpenConn[connType]) {
|
||||||
|
@ -804,7 +804,7 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) {
|
||||||
|
|
||||||
pConn = rpcGetConnObj(pRpc, sid, pRecv);
|
pConn = rpcGetConnObj(pRpc, sid, pRecv);
|
||||||
if (pConn == NULL) {
|
if (pConn == NULL) {
|
||||||
tTrace("%s %p, failed to get connection obj(%s)", pRpc->label, pHead->ahandle, tstrerror(terrno));
|
tTrace("%s %p, failed to get connection obj(%s)", pRpc->label, (void *)pHead->ahandle, tstrerror(terrno));
|
||||||
return NULL;
|
return NULL;
|
||||||
} else {
|
} else {
|
||||||
if (rpcIsReq(pHead->msgType)) {
|
if (rpcIsReq(pHead->msgType)) {
|
||||||
|
|
|
@ -73,7 +73,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
|
||||||
pServerObj = (SServerObj *)calloc(sizeof(SServerObj), 1);
|
pServerObj = (SServerObj *)calloc(sizeof(SServerObj), 1);
|
||||||
pServerObj->ip = ip;
|
pServerObj->ip = ip;
|
||||||
pServerObj->port = port;
|
pServerObj->port = port;
|
||||||
strcpy(pServerObj->label, label);
|
tstrncpy(pServerObj->label, label, sizeof(pServerObj->label));
|
||||||
pServerObj->numOfThreads = numOfThreads;
|
pServerObj->numOfThreads = numOfThreads;
|
||||||
|
|
||||||
pServerObj->pThreadObj = (SThreadObj *)calloc(sizeof(SThreadObj), numOfThreads);
|
pServerObj->pThreadObj = (SThreadObj *)calloc(sizeof(SThreadObj), numOfThreads);
|
||||||
|
@ -87,7 +87,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
|
||||||
pThreadObj = pServerObj->pThreadObj;
|
pThreadObj = pServerObj->pThreadObj;
|
||||||
for (int i = 0; i < numOfThreads; ++i) {
|
for (int i = 0; i < numOfThreads; ++i) {
|
||||||
pThreadObj->processData = fp;
|
pThreadObj->processData = fp;
|
||||||
strcpy(pThreadObj->label, label);
|
tstrncpy(pThreadObj->label, label, sizeof(pThreadObj->label));
|
||||||
pThreadObj->shandle = shandle;
|
pThreadObj->shandle = shandle;
|
||||||
|
|
||||||
code = pthread_mutex_init(&(pThreadObj->mutex), NULL);
|
code = pthread_mutex_init(&(pThreadObj->mutex), NULL);
|
||||||
|
@ -247,7 +247,7 @@ void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int num, void *
|
||||||
|
|
||||||
pThreadObj = (SThreadObj *)malloc(sizeof(SThreadObj));
|
pThreadObj = (SThreadObj *)malloc(sizeof(SThreadObj));
|
||||||
memset(pThreadObj, 0, sizeof(SThreadObj));
|
memset(pThreadObj, 0, sizeof(SThreadObj));
|
||||||
strcpy(pThreadObj->label, label);
|
tstrncpy(pThreadObj->label, label, sizeof(pThreadObj->label));
|
||||||
pThreadObj->ip = ip;
|
pThreadObj->ip = ip;
|
||||||
pThreadObj->shandle = shandle;
|
pThreadObj->shandle = shandle;
|
||||||
|
|
||||||
|
|
|
@ -72,7 +72,7 @@ void *taosInitUdpConnection(uint32_t ip, uint16_t port, char *label, int threads
|
||||||
pSet->port = port;
|
pSet->port = port;
|
||||||
pSet->shandle = shandle;
|
pSet->shandle = shandle;
|
||||||
pSet->fp = fp;
|
pSet->fp = fp;
|
||||||
strcpy(pSet->label, label);
|
tstrncpy(pSet->label, label, sizeof(pSet->label));
|
||||||
|
|
||||||
uint16_t ownPort;
|
uint16_t ownPort;
|
||||||
for (int i = 0; i < threads; ++i) {
|
for (int i = 0; i < threads; ++i) {
|
||||||
|
@ -99,7 +99,7 @@ void *taosInitUdpConnection(uint32_t ip, uint16_t port, char *label, int threads
|
||||||
pConn->localPort = (uint16_t)ntohs(sin.sin_port);
|
pConn->localPort = (uint16_t)ntohs(sin.sin_port);
|
||||||
}
|
}
|
||||||
|
|
||||||
strcpy(pConn->label, label);
|
tstrncpy(pConn->label, label, sizeof(pConn->label));
|
||||||
pConn->shandle = shandle;
|
pConn->shandle = shandle;
|
||||||
pConn->processData = fp;
|
pConn->processData = fp;
|
||||||
pConn->index = i;
|
pConn->index = i;
|
||||||
|
|
|
@ -76,6 +76,7 @@ int main(int argc, char *argv[]) {
|
||||||
int numOfReqs = 0;
|
int numOfReqs = 0;
|
||||||
int appThreads = 1;
|
int appThreads = 1;
|
||||||
char serverIp[40] = "127.0.0.1";
|
char serverIp[40] = "127.0.0.1";
|
||||||
|
char secret[TSDB_KEY_LEN] = "mypassword";
|
||||||
struct timeval systemTime;
|
struct timeval systemTime;
|
||||||
int64_t startTime, endTime;
|
int64_t startTime, endTime;
|
||||||
pthread_attr_t thattr;
|
pthread_attr_t thattr;
|
||||||
|
@ -97,7 +98,7 @@ int main(int argc, char *argv[]) {
|
||||||
rpcInit.sessions = 100;
|
rpcInit.sessions = 100;
|
||||||
rpcInit.idleTime = tsShellActivityTimer*1000;
|
rpcInit.idleTime = tsShellActivityTimer*1000;
|
||||||
rpcInit.user = "michael";
|
rpcInit.user = "michael";
|
||||||
rpcInit.secret = "mypassword";
|
rpcInit.secret = secret;
|
||||||
rpcInit.ckey = "key";
|
rpcInit.ckey = "key";
|
||||||
rpcInit.spi = 1;
|
rpcInit.spi = 1;
|
||||||
rpcInit.connType = TAOS_CONN_CLIENT;
|
rpcInit.connType = TAOS_CONN_CLIENT;
|
||||||
|
@ -106,7 +107,7 @@ int main(int argc, char *argv[]) {
|
||||||
if (strcmp(argv[i], "-p")==0 && i < argc-1) {
|
if (strcmp(argv[i], "-p")==0 && i < argc-1) {
|
||||||
ipSet.port[0] = atoi(argv[++i]);
|
ipSet.port[0] = atoi(argv[++i]);
|
||||||
} else if (strcmp(argv[i], "-i") ==0 && i < argc-1) {
|
} else if (strcmp(argv[i], "-i") ==0 && i < argc-1) {
|
||||||
tstrncpy(ipSet.fqdn[0], argv[++i], sizeof(ipSet.fqdn));
|
tstrncpy(ipSet.fqdn[0], argv[++i], sizeof(ipSet.fqdn[0]));
|
||||||
} else if (strcmp(argv[i], "-t")==0 && i < argc-1) {
|
} else if (strcmp(argv[i], "-t")==0 && i < argc-1) {
|
||||||
rpcInit.numOfThreads = atoi(argv[++i]);
|
rpcInit.numOfThreads = atoi(argv[++i]);
|
||||||
} else if (strcmp(argv[i], "-m")==0 && i < argc-1) {
|
} else if (strcmp(argv[i], "-m")==0 && i < argc-1) {
|
||||||
|
|
|
@ -77,6 +77,7 @@ int main(int argc, char *argv[]) {
|
||||||
int numOfReqs = 0;
|
int numOfReqs = 0;
|
||||||
int appThreads = 1;
|
int appThreads = 1;
|
||||||
char serverIp[40] = "127.0.0.1";
|
char serverIp[40] = "127.0.0.1";
|
||||||
|
char secret[TSDB_KEY_LEN] = "mypassword";
|
||||||
struct timeval systemTime;
|
struct timeval systemTime;
|
||||||
int64_t startTime, endTime;
|
int64_t startTime, endTime;
|
||||||
pthread_attr_t thattr;
|
pthread_attr_t thattr;
|
||||||
|
@ -98,7 +99,7 @@ int main(int argc, char *argv[]) {
|
||||||
rpcInit.sessions = 100;
|
rpcInit.sessions = 100;
|
||||||
rpcInit.idleTime = tsShellActivityTimer*1000;
|
rpcInit.idleTime = tsShellActivityTimer*1000;
|
||||||
rpcInit.user = "michael";
|
rpcInit.user = "michael";
|
||||||
rpcInit.secret = "mypassword";
|
rpcInit.secret = secret;
|
||||||
rpcInit.ckey = "key";
|
rpcInit.ckey = "key";
|
||||||
rpcInit.spi = 1;
|
rpcInit.spi = 1;
|
||||||
rpcInit.connType = TAOS_CONN_CLIENT;
|
rpcInit.connType = TAOS_CONN_CLIENT;
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
#include "tutil.h"
|
#include "tutil.h"
|
||||||
|
|
||||||
int taosGetFqdn(char *fqdn) {
|
int taosGetFqdn(char *fqdn) {
|
||||||
|
int code = 0;
|
||||||
char hostname[1024];
|
char hostname[1024];
|
||||||
hostname[1023] = '\0';
|
hostname[1023] = '\0';
|
||||||
gethostname(hostname, 1023);
|
gethostname(hostname, 1023);
|
||||||
|
@ -27,13 +28,15 @@ int taosGetFqdn(char *fqdn) {
|
||||||
h = gethostbyname(hostname);
|
h = gethostbyname(hostname);
|
||||||
if (h != NULL) {
|
if (h != NULL) {
|
||||||
strcpy(fqdn, h->h_name);
|
strcpy(fqdn, h->h_name);
|
||||||
return 0;
|
|
||||||
} else {
|
} else {
|
||||||
uError("failed to get host name");
|
uError("failed to get host name(%s)", strerror(errno));
|
||||||
return -1;
|
code = -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
free(h);
|
// to do: free the resources
|
||||||
|
// free(h);
|
||||||
|
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
uint32_t taosGetIpFromFqdn(const char *fqdn) {
|
uint32_t taosGetIpFromFqdn(const char *fqdn) {
|
||||||
|
@ -47,7 +50,7 @@ uint32_t ip2uint(const char *const ip_addr) {
|
||||||
char ip_addr_cpy[20];
|
char ip_addr_cpy[20];
|
||||||
char ip[5];
|
char ip[5];
|
||||||
|
|
||||||
strcpy(ip_addr_cpy, ip_addr);
|
tstrncpy(ip_addr_cpy, ip_addr, sizeof(ip_addr_cpy));
|
||||||
|
|
||||||
char *s_start, *s_end;
|
char *s_start, *s_end;
|
||||||
s_start = ip_addr_cpy;
|
s_start = ip_addr_cpy;
|
||||||
|
@ -206,7 +209,7 @@ int taosOpenUdpSocket(uint32_t ip, uint16_t port) {
|
||||||
int reuse, nocheck;
|
int reuse, nocheck;
|
||||||
int bufSize = 8192000;
|
int bufSize = 8192000;
|
||||||
|
|
||||||
uTrace("open udp socket:%s:%hu", ip, port);
|
uTrace("open udp socket:0x%x:%hu", ip, port);
|
||||||
|
|
||||||
memset((char *)&localAddr, 0, sizeof(localAddr));
|
memset((char *)&localAddr, 0, sizeof(localAddr));
|
||||||
localAddr.sin_family = AF_INET;
|
localAddr.sin_family = AF_INET;
|
||||||
|
@ -257,7 +260,7 @@ int taosOpenUdpSocket(uint32_t ip, uint16_t port) {
|
||||||
|
|
||||||
/* bind socket to local address */
|
/* bind socket to local address */
|
||||||
if (bind(sockFd, (struct sockaddr *)&localAddr, sizeof(localAddr)) < 0) {
|
if (bind(sockFd, (struct sockaddr *)&localAddr, sizeof(localAddr)) < 0) {
|
||||||
uError("failed to bind udp socket: %d (%s), %s:%hu", errno, strerror(errno), ip, port);
|
uError("failed to bind udp socket: %d (%s), 0x%x:%hu", errno, strerror(errno), ip, port);
|
||||||
taosCloseSocket(sockFd);
|
taosCloseSocket(sockFd);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -363,7 +366,7 @@ int taosOpenTcpServerSocket(uint32_t ip, uint16_t port) {
|
||||||
int sockFd;
|
int sockFd;
|
||||||
int reuse;
|
int reuse;
|
||||||
|
|
||||||
uTrace("open tcp server socket:%s:%hu", ip, port);
|
uTrace("open tcp server socket:0x%x:%hu", ip, port);
|
||||||
|
|
||||||
bzero((char *)&serverAdd, sizeof(serverAdd));
|
bzero((char *)&serverAdd, sizeof(serverAdd));
|
||||||
serverAdd.sin_family = AF_INET;
|
serverAdd.sin_family = AF_INET;
|
||||||
|
|
|
@ -68,11 +68,19 @@ void *walOpen(const char *path, const SWalCfg *pCfg) {
|
||||||
pWal->num = 0;
|
pWal->num = 0;
|
||||||
pWal->level = pCfg->walLevel;
|
pWal->level = pCfg->walLevel;
|
||||||
pWal->keep = pCfg->keep;
|
pWal->keep = pCfg->keep;
|
||||||
strcpy(pWal->path, path);
|
tstrncpy(pWal->path, path, sizeof(pWal->path));
|
||||||
pthread_mutex_init(&pWal->mutex, NULL);
|
pthread_mutex_init(&pWal->mutex, NULL);
|
||||||
|
|
||||||
if (access(path, F_OK) != 0) mkdir(path, 0755);
|
if (access(path, F_OK) != 0) {
|
||||||
|
if (mkdir(path, 0755) != 0) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
wError("wal:%s, failed to create directory(%s)", path, strerror(errno));
|
||||||
|
pthread_mutex_destroy(&pWal->mutex);
|
||||||
|
free(pWal);
|
||||||
|
pWal = NULL;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (pCfg->keep == 1) return pWal;
|
if (pCfg->keep == 1) return pWal;
|
||||||
|
|
||||||
if (walHandleExistingFiles(path) == 0)
|
if (walHandleExistingFiles(path) == 0)
|
||||||
|
@ -80,7 +88,7 @@ void *walOpen(const char *path, const SWalCfg *pCfg) {
|
||||||
|
|
||||||
if (pWal->fd <0) {
|
if (pWal->fd <0) {
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
wError("wal:%s, failed to open", path);
|
wError("wal:%s, failed to open(%s)", path, strerror(errno));
|
||||||
pthread_mutex_destroy(&pWal->mutex);
|
pthread_mutex_destroy(&pWal->mutex);
|
||||||
free(pWal);
|
free(pWal);
|
||||||
pWal = NULL;
|
pWal = NULL;
|
||||||
|
@ -119,7 +127,8 @@ void walClose(void *handle) {
|
||||||
int walRenew(void *handle) {
|
int walRenew(void *handle) {
|
||||||
if (handle == NULL) return 0;
|
if (handle == NULL) return 0;
|
||||||
SWal *pWal = handle;
|
SWal *pWal = handle;
|
||||||
int code = 0;
|
|
||||||
|
terrno = 0;
|
||||||
|
|
||||||
pthread_mutex_lock(&pWal->mutex);
|
pthread_mutex_lock(&pWal->mutex);
|
||||||
|
|
||||||
|
@ -135,8 +144,8 @@ int walRenew(void *handle) {
|
||||||
pWal->fd = open(pWal->name, O_WRONLY | O_CREAT, S_IRWXU | S_IRWXG | S_IRWXO);
|
pWal->fd = open(pWal->name, O_WRONLY | O_CREAT, S_IRWXU | S_IRWXG | S_IRWXO);
|
||||||
|
|
||||||
if (pWal->fd < 0) {
|
if (pWal->fd < 0) {
|
||||||
wError("wal:%d, failed to open(%s)", pWal->name, strerror(errno));
|
wError("wal:%s, failed to open(%s)", pWal->name, strerror(errno));
|
||||||
code = -1;
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
} else {
|
} else {
|
||||||
wTrace("wal:%s, it is created", pWal->name);
|
wTrace("wal:%s, it is created", pWal->name);
|
||||||
|
|
||||||
|
@ -156,14 +165,15 @@ int walRenew(void *handle) {
|
||||||
|
|
||||||
pthread_mutex_unlock(&pWal->mutex);
|
pthread_mutex_unlock(&pWal->mutex);
|
||||||
|
|
||||||
return code;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
int walWrite(void *handle, SWalHead *pHead) {
|
int walWrite(void *handle, SWalHead *pHead) {
|
||||||
SWal *pWal = handle;
|
SWal *pWal = handle;
|
||||||
int code = 0;
|
|
||||||
if (pWal == NULL) return -1;
|
if (pWal == NULL) return -1;
|
||||||
|
|
||||||
|
terrno = 0;
|
||||||
|
|
||||||
// no wal
|
// no wal
|
||||||
if (pWal->level == TAOS_WAL_NOLOG) return 0;
|
if (pWal->level == TAOS_WAL_NOLOG) return 0;
|
||||||
if (pHead->version <= pWal->version) return 0;
|
if (pHead->version <= pWal->version) return 0;
|
||||||
|
@ -174,12 +184,12 @@ int walWrite(void *handle, SWalHead *pHead) {
|
||||||
|
|
||||||
if(write(pWal->fd, pHead, contLen) != contLen) {
|
if(write(pWal->fd, pHead, contLen) != contLen) {
|
||||||
wError("wal:%s, failed to write(%s)", pWal->name, strerror(errno));
|
wError("wal:%s, failed to write(%s)", pWal->name, strerror(errno));
|
||||||
code = -1;
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
} else {
|
} else {
|
||||||
pWal->version = pHead->version;
|
pWal->version = pHead->version;
|
||||||
}
|
}
|
||||||
|
|
||||||
return code;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
void walFsync(void *handle) {
|
void walFsync(void *handle) {
|
||||||
|
@ -196,11 +206,11 @@ void walFsync(void *handle) {
|
||||||
|
|
||||||
int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *, int)) {
|
int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *, int)) {
|
||||||
SWal *pWal = handle;
|
SWal *pWal = handle;
|
||||||
int code = 0;
|
|
||||||
struct dirent *ent;
|
struct dirent *ent;
|
||||||
int count = 0;
|
int count = 0;
|
||||||
uint32_t maxId = 0, minId = -1, index =0;
|
uint32_t maxId = 0, minId = -1, index =0;
|
||||||
|
|
||||||
|
terrno = 0;
|
||||||
int plen = strlen(walPrefix);
|
int plen = strlen(walPrefix);
|
||||||
char opath[TSDB_FILENAME_LEN+5];
|
char opath[TSDB_FILENAME_LEN+5];
|
||||||
|
|
||||||
|
@ -224,30 +234,30 @@ int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *, int))
|
||||||
closedir(dir);
|
closedir(dir);
|
||||||
|
|
||||||
if (count == 0) {
|
if (count == 0) {
|
||||||
if (pWal->keep) code = walRenew(pWal);
|
if (pWal->keep) terrno = walRenew(pWal);
|
||||||
return code;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
if ( count != (maxId-minId+1) ) {
|
if ( count != (maxId-minId+1) ) {
|
||||||
wError("wal:%s, messed up, count:%d max:%d min:%d", opath, count, maxId, minId);
|
wError("wal:%s, messed up, count:%d max:%d min:%d", opath, count, maxId, minId);
|
||||||
code = -1;
|
terrno = TAOS_SYSTEM_ERROR(TSDB_CODE_APP_ERROR);
|
||||||
} else {
|
} else {
|
||||||
wTrace("wal:%s, %d files will be restored", opath, count);
|
wTrace("wal:%s, %d files will be restored", opath, count);
|
||||||
|
|
||||||
for (index = minId; index<=maxId; ++index) {
|
for (index = minId; index<=maxId; ++index) {
|
||||||
sprintf(pWal->name, "%s/%s%d", opath, walPrefix, index);
|
sprintf(pWal->name, "%s/%s%d", opath, walPrefix, index);
|
||||||
code = walRestoreWalFile(pWal, pVnode, writeFp);
|
terrno = walRestoreWalFile(pWal, pVnode, writeFp);
|
||||||
if (code < 0) break;
|
if (terrno < 0) break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (code == 0) {
|
if (terrno == 0) {
|
||||||
if (pWal->keep == 0) {
|
if (pWal->keep == 0) {
|
||||||
code = walRemoveWalFiles(opath);
|
terrno = walRemoveWalFiles(opath);
|
||||||
if (code == 0) {
|
if (terrno == 0) {
|
||||||
if (remove(opath) < 0) {
|
if (remove(opath) < 0) {
|
||||||
wError("wal:%s, failed to remove directory(%s)", opath, strerror(errno));
|
wError("wal:%s, failed to remove directory(%s)", opath, strerror(errno));
|
||||||
code = -1;
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -258,12 +268,12 @@ int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *, int))
|
||||||
pWal->fd = open(pWal->name, O_WRONLY | O_CREAT | O_APPEND, S_IRWXU | S_IRWXG | S_IRWXO);
|
pWal->fd = open(pWal->name, O_WRONLY | O_CREAT | O_APPEND, S_IRWXU | S_IRWXG | S_IRWXO);
|
||||||
if (pWal->fd < 0) {
|
if (pWal->fd < 0) {
|
||||||
wError("wal:%s, failed to open file(%s)", pWal->name, strerror(errno));
|
wError("wal:%s, failed to open file(%s)", pWal->name, strerror(errno));
|
||||||
code = -1;
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return code;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
int walGetWalFile(void *handle, char *name, uint32_t *index) {
|
int walGetWalFile(void *handle, char *name, uint32_t *index) {
|
||||||
|
@ -292,40 +302,47 @@ int walGetWalFile(void *handle, char *name, uint32_t *index) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp) {
|
static int walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp) {
|
||||||
int code = 0;
|
|
||||||
char *name = pWal->name;
|
char *name = pWal->name;
|
||||||
|
|
||||||
|
terrno = 0;
|
||||||
char *buffer = malloc(1024000); // size for one record
|
char *buffer = malloc(1024000); // size for one record
|
||||||
if (buffer == NULL) return -1;
|
if (buffer == NULL) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
|
|
||||||
SWalHead *pHead = (SWalHead *)buffer;
|
SWalHead *pHead = (SWalHead *)buffer;
|
||||||
|
|
||||||
int fd = open(name, O_RDONLY);
|
int fd = open(name, O_RDONLY);
|
||||||
if (fd < 0) {
|
if (fd < 0) {
|
||||||
wError("wal:%s, failed to open for restore(%s)", name, strerror(errno));
|
wError("wal:%s, failed to open for restore(%s)", name, strerror(errno));
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
free(buffer);
|
free(buffer);
|
||||||
return -1;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
wTrace("wal:%s, start to restore", name);
|
wTrace("wal:%s, start to restore", name);
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
int ret = read(fd, pHead, sizeof(SWalHead));
|
int ret = read(fd, pHead, sizeof(SWalHead));
|
||||||
if ( ret == 0) { code = 0; break;}
|
if ( ret == 0) break;
|
||||||
|
|
||||||
if (ret != sizeof(SWalHead)) {
|
if (ret != sizeof(SWalHead)) {
|
||||||
wWarn("wal:%s, failed to read head, skip, ret:%d(%s)", name, ret, strerror(errno));
|
wWarn("wal:%s, failed to read head, skip, ret:%d(%s)", name, ret, strerror(errno));
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!taosCheckChecksumWhole((uint8_t *)pHead, sizeof(SWalHead))) {
|
if (!taosCheckChecksumWhole((uint8_t *)pHead, sizeof(SWalHead))) {
|
||||||
wWarn("wal:%s, cksum is messed up, skip the rest of file", name);
|
wWarn("wal:%s, cksum is messed up, skip the rest of file", name);
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
ret = read(fd, pHead->cont, pHead->len);
|
ret = read(fd, pHead->cont, pHead->len);
|
||||||
if ( ret != pHead->len) {
|
if ( ret != pHead->len) {
|
||||||
wWarn("wal:%s, failed to read body, skip, len:%d ret:%d", name, pHead->len, ret);
|
wWarn("wal:%s, failed to read body, skip, len:%d ret:%d", name, pHead->len, ret);
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -336,11 +353,10 @@ static int walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp) {
|
||||||
close(fd);
|
close(fd);
|
||||||
free(buffer);
|
free(buffer);
|
||||||
|
|
||||||
return code;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
int walHandleExistingFiles(const char *path) {
|
int walHandleExistingFiles(const char *path) {
|
||||||
int code = 0;
|
|
||||||
char oname[TSDB_FILENAME_LEN * 3];
|
char oname[TSDB_FILENAME_LEN * 3];
|
||||||
char nname[TSDB_FILENAME_LEN * 3];
|
char nname[TSDB_FILENAME_LEN * 3];
|
||||||
char opath[TSDB_FILENAME_LEN];
|
char opath[TSDB_FILENAME_LEN];
|
||||||
|
@ -350,6 +366,7 @@ int walHandleExistingFiles(const char *path) {
|
||||||
struct dirent *ent;
|
struct dirent *ent;
|
||||||
DIR *dir = opendir(path);
|
DIR *dir = opendir(path);
|
||||||
int plen = strlen(walPrefix);
|
int plen = strlen(walPrefix);
|
||||||
|
terrno = 0;
|
||||||
|
|
||||||
if (access(opath, F_OK) == 0) {
|
if (access(opath, F_OK) == 0) {
|
||||||
// old directory is there, it means restore process is not finished
|
// old directory is there, it means restore process is not finished
|
||||||
|
@ -360,13 +377,19 @@ int walHandleExistingFiles(const char *path) {
|
||||||
int count = 0;
|
int count = 0;
|
||||||
while ((ent = readdir(dir))!= NULL) {
|
while ((ent = readdir(dir))!= NULL) {
|
||||||
if ( strncmp(ent->d_name, walPrefix, plen) == 0) {
|
if ( strncmp(ent->d_name, walPrefix, plen) == 0) {
|
||||||
if (access(opath, F_OK) != 0) mkdir(opath, 0755);
|
|
||||||
|
|
||||||
sprintf(oname, "%s/%s", path, ent->d_name);
|
sprintf(oname, "%s/%s", path, ent->d_name);
|
||||||
sprintf(nname, "%s/old/%s", path, ent->d_name);
|
sprintf(nname, "%s/old/%s", path, ent->d_name);
|
||||||
|
if (access(opath, F_OK) != 0) {
|
||||||
|
if (mkdir(opath, 0755) != 0) {
|
||||||
|
wError("wal:%s, failed to create directory:%s(%s)", oname, opath, strerror(errno));
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (rename(oname, nname) < 0) {
|
if (rename(oname, nname) < 0) {
|
||||||
wError("wal:%s, failed to move to new:%s", oname, nname);
|
wError("wal:%s, failed to move to new:%s", oname, nname);
|
||||||
code = -1;
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -378,14 +401,14 @@ int walHandleExistingFiles(const char *path) {
|
||||||
}
|
}
|
||||||
|
|
||||||
closedir(dir);
|
closedir(dir);
|
||||||
return code;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int walRemoveWalFiles(const char *path) {
|
static int walRemoveWalFiles(const char *path) {
|
||||||
int plen = strlen(walPrefix);
|
int plen = strlen(walPrefix);
|
||||||
char name[TSDB_FILENAME_LEN * 3];
|
char name[TSDB_FILENAME_LEN * 3];
|
||||||
int code = 0;
|
|
||||||
|
terrno = 0;
|
||||||
if (access(path, F_OK) != 0) return 0;
|
if (access(path, F_OK) != 0) return 0;
|
||||||
|
|
||||||
struct dirent *ent;
|
struct dirent *ent;
|
||||||
|
@ -396,13 +419,13 @@ static int walRemoveWalFiles(const char *path) {
|
||||||
sprintf(name, "%s/%s", path, ent->d_name);
|
sprintf(name, "%s/%s", path, ent->d_name);
|
||||||
if (remove(name) <0) {
|
if (remove(name) <0) {
|
||||||
wError("wal:%s, failed to remove(%s)", name, strerror(errno));
|
wError("wal:%s, failed to remove(%s)", name, strerror(errno));
|
||||||
code = -1; break;
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
closedir(dir);
|
closedir(dir);
|
||||||
|
|
||||||
return code;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue