data loss due to too new data

This commit is contained in:
slguan 2019-08-04 20:49:32 +08:00
parent b96f79625a
commit 534c67b6fb
5 changed files with 16 additions and 27 deletions

View File

@ -419,10 +419,6 @@ void vnodeCommitOver(SVnodeObj *pVnode);
TSKEY vnodeGetFirstKey(int vnode); TSKEY vnodeGetFirstKey(int vnode);
int vnodeSyncRetrieveCache(int vnode, int fd);
int vnodeSyncRestoreCache(int vnode, int fd);
pthread_t vnodeCreateCommitThread(SVnodeObj *pVnode); pthread_t vnodeCreateCommitThread(SVnodeObj *pVnode);
void vnodeCancelCommit(SVnodeObj *pVnode); void vnodeCancelCommit(SVnodeObj *pVnode);
@ -448,10 +444,6 @@ void *vnodeCommitToFile(void *param);
void *vnodeCommitMultiToFile(SVnodeObj *pVnode, int ssid, int esid); void *vnodeCommitMultiToFile(SVnodeObj *pVnode, int ssid, int esid);
int vnodeSyncRetrieveFile(int vnode, int fd, uint32_t fileId, uint64_t *fmagic);
int vnodeSyncRestoreFile(int vnode, int sfd);
int vnodeWriteBlockToFile(SMeterObj *pObj, SCompBlock *pBlock, SData *data[], SData *cdata[], int pointsRead); int vnodeWriteBlockToFile(SMeterObj *pObj, SCompBlock *pBlock, SData *data[], SData *cdata[], int pointsRead);
int vnodeSearchPointInFile(SMeterObj *pObj, SQuery *pQuery); int vnodeSearchPointInFile(SMeterObj *pObj, SQuery *pQuery);
@ -477,14 +469,8 @@ void *vnodeGetMeterPeerConnection(SMeterObj *pObj, int index);
int vnodeForwardToPeer(SMeterObj *pObj, char *msg, int msgLen, char action, int sversion); int vnodeForwardToPeer(SMeterObj *pObj, char *msg, int msgLen, char action, int sversion);
void vnodeCloseAllSyncFds(int vnode);
void vnodeConfigVPeers(int vnode, int numOfPeers, SVPeerDesc peerDesc[]); void vnodeConfigVPeers(int vnode, int numOfPeers, SVPeerDesc peerDesc[]);
void vnodeStartSyncProcess(SVnodeObj *pVnode);
void vnodeCancelSync(int vnode);
void vnodeListPeerStatus(char *buffer); void vnodeListPeerStatus(char *buffer);
void vnodeCheckOwnStatus(SVnodeObj *pVnode); void vnodeCheckOwnStatus(SVnodeObj *pVnode);

View File

@ -113,7 +113,7 @@ int vnodeProcessCreateMeterRequest(char *pMsg) {
pVnode = vnodeList + vid; pVnode = vnodeList + vid;
if (pVnode->cfg.maxSessions <= 0) { if (pVnode->cfg.maxSessions <= 0) {
dError("vid:%d, not activated", vid); dError("vid:%d, not activated", vid);
code = TSDB_CODE_INVALID_SESSION_ID; code = TSDB_CODE_NOT_ACTIVE_SESSION;
goto _over; goto _over;
} }
@ -215,7 +215,7 @@ int vnodeProcessCreateMeterMsg(char *pMsg) {
if (pVnode->pCachePool == NULL) { if (pVnode->pCachePool == NULL) {
dError("vid:%d is not activated yet", pCreate->vnode); dError("vid:%d is not activated yet", pCreate->vnode);
vnodeSendVpeerCfgMsg(pCreate->vnode); vnodeSendVpeerCfgMsg(pCreate->vnode);
code = TSDB_CODE_INVALID_SESSION_ID; code = TSDB_CODE_NOT_ACTIVE_SESSION;
goto _create_over; goto _create_over;
} }

View File

@ -44,7 +44,7 @@ void dnodeInitModules() {
tsModule[TSDB_MOD_HTTP].cleanUpFp = httpCleanUpSystem; tsModule[TSDB_MOD_HTTP].cleanUpFp = httpCleanUpSystem;
tsModule[TSDB_MOD_HTTP].startFp = httpStartSystem; tsModule[TSDB_MOD_HTTP].startFp = httpStartSystem;
tsModule[TSDB_MOD_HTTP].stopFp = httpStopSystem; tsModule[TSDB_MOD_HTTP].stopFp = httpStopSystem;
tsModule[TSDB_MOD_HTTP].num = tsEnableHttpModule ? -1 : 0; tsModule[TSDB_MOD_HTTP].num = (tsEnableHttpModule == 1) ? -1 : 0;
tsModule[TSDB_MOD_HTTP].curNum = 0; tsModule[TSDB_MOD_HTTP].curNum = 0;
tsModule[TSDB_MOD_HTTP].equalVnodeNum = 0; tsModule[TSDB_MOD_HTTP].equalVnodeNum = 0;
@ -53,7 +53,7 @@ void dnodeInitModules() {
tsModule[TSDB_MOD_MONITOR].cleanUpFp = monitorCleanUpSystem; tsModule[TSDB_MOD_MONITOR].cleanUpFp = monitorCleanUpSystem;
tsModule[TSDB_MOD_MONITOR].startFp = monitorStartSystem; tsModule[TSDB_MOD_MONITOR].startFp = monitorStartSystem;
tsModule[TSDB_MOD_MONITOR].stopFp = monitorStopSystem; tsModule[TSDB_MOD_MONITOR].stopFp = monitorStopSystem;
tsModule[TSDB_MOD_MONITOR].num = tsEnableMonitorModule ? -1 : 0; tsModule[TSDB_MOD_MONITOR].num = (tsEnableMonitorModule == 1) ? -1 : 0;
tsModule[TSDB_MOD_MONITOR].curNum = 0; tsModule[TSDB_MOD_MONITOR].curNum = 0;
tsModule[TSDB_MOD_MONITOR].equalVnodeNum = 0; tsModule[TSDB_MOD_MONITOR].equalVnodeNum = 0;
} }

View File

@ -408,7 +408,6 @@ void vnodeCloseCommitFiles(SVnodeObj *pVnode) {
char dpath[TSDB_FILENAME_LEN] = "\0"; char dpath[TSDB_FILENAME_LEN] = "\0";
int fileId; int fileId;
int ret; int ret;
int file_removed = 0;
close(pVnode->nfd); close(pVnode->nfd);
pVnode->nfd = 0; pVnode->nfd = 0;
@ -449,14 +448,15 @@ void vnodeCloseCommitFiles(SVnodeObj *pVnode) {
dTrace("vid:%d, %s and %s is saved", pVnode->vnode, pVnode->cfn, pVnode->lfn); dTrace("vid:%d, %s and %s is saved", pVnode->vnode, pVnode->cfn, pVnode->lfn);
if (pVnode->numOfFiles > pVnode->maxFiles) { // Retention policy here
fileId = pVnode->fileId - pVnode->numOfFiles + 1; fileId = pVnode->fileId - pVnode->numOfFiles + 1;
int cfile = taosGetTimestamp(pVnode->cfg.precision)/pVnode->cfg.daysPerFile/tsMsPerDay[pVnode->cfg.precision];
while (fileId <= cfile - pVnode->maxFiles) {
vnodeRemoveFile(pVnode->vnode, fileId); vnodeRemoveFile(pVnode->vnode, fileId);
pVnode->numOfFiles--; pVnode->numOfFiles--;
file_removed = 1; fileId++;
} }
if (!file_removed) vnodeUpdateFileMagic(pVnode->vnode, pVnode->commitFileId);
vnodeSaveAllMeterObjToFile(pVnode->vnode); vnodeSaveAllMeterObjToFile(pVnode->vnode);
return; return;

View File

@ -870,10 +870,13 @@ int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi
} }
payload = pSubmit->payLoad; payload = pSubmit->payLoad;
if (pVnode->lastKeyOnFile > pVnode->cfg.daysToKeep * tsMsPerDay[pVnode->cfg.precision] + *((TSKEY *)(payload))) { int firstId = (*(TSKEY *)payload)/pVnode->cfg.daysPerFile/tsMsPerDay[pVnode->cfg.precision];
dError("vid:%d sid:%d id:%s, vnode lastKeyOnFile:%lld, data is too old to import, key:%lld", int lastId = (*(TSKEY *)(payload+pObj->bytesPerPoint*(rows-1)))/pVnode->cfg.daysPerFile/tsMsPerDay[pVnode->cfg.precision];
pObj->vnode, pObj->sid, pObj->meterId, pVnode->lastKeyOnFile, *(TSKEY *)(payload)); int cfile = taosGetTimestamp(pVnode->cfg.precision)/pVnode->cfg.daysPerFile/tsMsPerDay[pVnode->cfg.precision];
return TSDB_CODE_OTHERS; if ((firstId <= cfile - pVnode->maxFiles) || (firstId > cfile + 1) || (lastId <= cfile - pVnode->maxFiles) || (lastId > cfile + 1)) {
dError("vid:%d sid:%d id:%s, invalid timestamp to import, firstKey: %ld lastKey: %ld",
pObj->vnode, pObj->sid, pObj->meterId, *(TSKEY *)(payload), *(TSKEY *)(payload+pObj->bytesPerPoint*(rows-1)));
return TSDB_CODE_TIMESTAMP_OUT_OF_RANGE;
} }
if ( pVnode->cfg.commitLog && source != TSDB_DATA_SOURCE_LOG) { if ( pVnode->cfg.commitLog && source != TSDB_DATA_SOURCE_LOG) {