fix possible commit log init failure
This commit is contained in:
parent
d6cfd14e8b
commit
fbdeadbc23
|
@ -28,10 +28,11 @@
|
||||||
#include "vnode.h"
|
#include "vnode.h"
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char action;
|
|
||||||
int sversion;
|
int sversion;
|
||||||
int sid;
|
int sid;
|
||||||
int contLen;
|
int contLen;
|
||||||
|
int action:8;
|
||||||
|
int simpleCheck:24;
|
||||||
} SCommitHead;
|
} SCommitHead;
|
||||||
|
|
||||||
int vnodeOpenCommitLog(int vnode, uint64_t firstV) {
|
int vnodeOpenCommitLog(int vnode, uint64_t firstV) {
|
||||||
|
@ -118,7 +119,7 @@ size_t vnodeRestoreDataFromLog(int vnode, char *fileName, uint64_t *firstV) {
|
||||||
|
|
||||||
fd = open(fileName, O_RDWR);
|
fd = open(fileName, O_RDWR);
|
||||||
if (fd < 0) {
|
if (fd < 0) {
|
||||||
dError("vid:%d, failed to open:%s, reason:%s", vnode, fileName);
|
dError("vid:%d, failed to open:%s, reason:%s", vnode, fileName, strerror(errno));
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -137,22 +138,26 @@ size_t vnodeRestoreDataFromLog(int vnode, char *fileName, uint64_t *firstV) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SCommitHead head;
|
SCommitHead head;
|
||||||
|
int simpleCheck = 0;
|
||||||
while (1) {
|
while (1) {
|
||||||
ret = read(fd, &head, sizeof(head));
|
ret = read(fd, &head, sizeof(head));
|
||||||
if (ret < 0) goto _error;
|
if (ret < 0) goto _error;
|
||||||
if (ret == 0) break;
|
if (ret == 0) break;
|
||||||
|
if (((head.sversion+head.sid+head.contLen+head.action) & 0xFFFFFF) != head.simpleCheck) break;
|
||||||
|
simpleCheck = head.simpleCheck;
|
||||||
|
|
||||||
// head.contLen validation is removed
|
// head.contLen validation is removed
|
||||||
if (head.sid >= pVnode->cfg.maxSessions || head.sid < 0 || head.action >= TSDB_ACTION_MAX) {
|
if (head.sid >= pVnode->cfg.maxSessions || head.sid < 0 || head.action >= TSDB_ACTION_MAX) {
|
||||||
dError("vid, invalid commit head, sid:%d contLen:%d action:%d", head.sid, head.contLen, head.action);
|
dError("vid, invalid commit head, sid:%d contLen:%d action:%d", head.sid, head.contLen, head.action);
|
||||||
} else {
|
} else {
|
||||||
if (head.contLen > 0) {
|
if (head.contLen > 0) {
|
||||||
if (bufLen < head.contLen) { // pre-allocated buffer is not enough
|
if (bufLen < head.contLen+sizeof(simpleCheck)) { // pre-allocated buffer is not enough
|
||||||
cont = realloc(cont, head.contLen);
|
cont = realloc(cont, head.contLen+sizeof(simpleCheck));
|
||||||
bufLen = head.contLen;
|
bufLen = head.contLen+sizeof(simpleCheck);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (read(fd, cont, head.contLen) < 0) goto _error;
|
if (read(fd, cont, head.contLen+sizeof(simpleCheck)) < 0) goto _error;
|
||||||
|
if (*(int *)(cont+head.contLen) != simpleCheck) break;
|
||||||
SMeterObj *pObj = pVnode->meterList[head.sid];
|
SMeterObj *pObj = pVnode->meterList[head.sid];
|
||||||
if (pObj == NULL) {
|
if (pObj == NULL) {
|
||||||
dError(
|
dError(
|
||||||
|
@ -171,7 +176,7 @@ size_t vnodeRestoreDataFromLog(int vnode, char *fileName, uint64_t *firstV) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
totalLen += sizeof(head) + head.contLen;
|
totalLen += sizeof(head) + head.contLen + sizeof(simpleCheck);
|
||||||
}
|
}
|
||||||
|
|
||||||
tclose(fd);
|
tclose(fd);
|
||||||
|
@ -253,19 +258,22 @@ int vnodeWriteToCommitLog(SMeterObj *pObj, char action, char *cont, int contLen,
|
||||||
head.action = action;
|
head.action = action;
|
||||||
head.sversion = pObj->sversion;
|
head.sversion = pObj->sversion;
|
||||||
head.contLen = contLen;
|
head.contLen = contLen;
|
||||||
|
head.simpleCheck = (head.sversion+head.sid+head.contLen+head.action) & 0xFFFFFF;
|
||||||
|
int simpleCheck = head.simpleCheck;
|
||||||
|
|
||||||
pthread_mutex_lock(&(pVnode->logMutex));
|
pthread_mutex_lock(&(pVnode->logMutex));
|
||||||
// 100 bytes redundant mem space
|
// 100 bytes redundant mem space
|
||||||
if (pVnode->mappingSize - (pVnode->pWrite - pVnode->pMem) < contLen + sizeof(SCommitHead) + 100) {
|
if (pVnode->mappingSize - (pVnode->pWrite - pVnode->pMem) < contLen + sizeof(SCommitHead) + sizeof(simpleCheck) + 100) {
|
||||||
pthread_mutex_unlock(&(pVnode->logMutex));
|
pthread_mutex_unlock(&(pVnode->logMutex));
|
||||||
dTrace("vid:%d, mem mapping space is not enough, wait for commit", pObj->vnode);
|
dTrace("vid:%d, mem mapping space is not enough, wait for commit", pObj->vnode);
|
||||||
vnodeProcessCommitTimer(pVnode, NULL);
|
vnodeProcessCommitTimer(pVnode, NULL);
|
||||||
return TSDB_CODE_ACTION_IN_PROGRESS;
|
return TSDB_CODE_ACTION_IN_PROGRESS;
|
||||||
}
|
}
|
||||||
char *pWrite = pVnode->pWrite;
|
char *pWrite = pVnode->pWrite;
|
||||||
pVnode->pWrite += sizeof(head) + contLen;
|
pVnode->pWrite += sizeof(head) + contLen + sizeof(simpleCheck);
|
||||||
memcpy(pWrite, (char *)&head, sizeof(head));
|
memcpy(pWrite, (char *)&head, sizeof(head));
|
||||||
memcpy(pWrite + sizeof(head), cont, contLen);
|
memcpy(pWrite + sizeof(head), cont, contLen);
|
||||||
|
memcpy(pWrite + sizeof(head) + contLen, &simpleCheck, sizeof(simpleCheck));
|
||||||
pthread_mutex_unlock(&(pVnode->logMutex));
|
pthread_mutex_unlock(&(pVnode->logMutex));
|
||||||
|
|
||||||
if (pVnode->pWrite - pVnode->pMem > pVnode->mappingThreshold) {
|
if (pVnode->pWrite - pVnode->pMem > pVnode->mappingThreshold) {
|
||||||
|
|
Loading…
Reference in New Issue