Merge pull request #12400 from taosdata/feature/tq

fix(wal): int overflow
This commit is contained in:
Liu Jicong 2022-05-12 21:33:25 +08:00 committed by GitHub
commit 2cc8ff312d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 17 additions and 33 deletions

View File

@ -485,8 +485,10 @@ static int32_t mndProcessDropTopicReq(SNodeMsg *pReq) {
return -1; return -1;
} }
} }
// TODO: check ref
int32_t code = mndDropTopic(pMnode, pReq, pTopic); int32_t code = mndDropTopic(pMnode, pReq, pTopic);
// TODO: iterate and drop related subscriptions and offsets
mndReleaseTopic(pMnode, pTopic); mndReleaseTopic(pMnode, pTopic);
if (code != 0) { if (code != 0) {

View File

@ -55,7 +55,7 @@ int32_t walRegisterRead(SWalReadHandle *pRead, int64_t ver) {
} }
static int32_t walReadSeekFilePos(SWalReadHandle *pRead, int64_t fileFirstVer, int64_t ver) { static int32_t walReadSeekFilePos(SWalReadHandle *pRead, int64_t fileFirstVer, int64_t ver) {
int ret = 0; int64_t ret = 0;
TdFilePtr pIdxTFile = pRead->pReadIdxTFile; TdFilePtr pIdxTFile = pRead->pReadIdxTFile;
TdFilePtr pLogTFile = pRead->pReadLogTFile; TdFilePtr pLogTFile = pRead->pReadLogTFile;
@ -68,14 +68,14 @@ static int32_t walReadSeekFilePos(SWalReadHandle *pRead, int64_t fileFirstVer, i
wError("failed to seek idx file, ver %ld, pos: %ld, since %s", ver, offset, terrstr()); wError("failed to seek idx file, ver %ld, pos: %ld, since %s", ver, offset, terrstr());
return -1; return -1;
} }
SWalIdxEntry entry; SWalIdxEntry entry = {0};
if ((ret = taosReadFile(pIdxTFile, &entry, sizeof(SWalIdxEntry))) != sizeof(SWalIdxEntry)) { if ((ret = taosReadFile(pIdxTFile, &entry, sizeof(SWalIdxEntry))) != sizeof(SWalIdxEntry)) {
if (ret < 0) { if (ret < 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
wError("failed to read idx file, since %s", terrstr()); wError("failed to read idx file, since %s", terrstr());
} else { } else {
terrno = TSDB_CODE_WAL_FILE_CORRUPTED; terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
wError("read idx file incompletely, read bytes %d, bytes should be %lu", ret, sizeof(SWalIdxEntry)); wError("read idx file incompletely, read bytes %ld, bytes should be %lu", ret, sizeof(SWalIdxEntry));
} }
return -1; return -1;
} }
@ -187,7 +187,7 @@ int32_t walFetchHead(SWalReadHandle *pRead, int64_t ver, SWalHead *pHead) {
} }
int32_t walSkipFetchBody(SWalReadHandle *pRead, const SWalHead *pHead) { int32_t walSkipFetchBody(SWalReadHandle *pRead, const SWalHead *pHead) {
int32_t code; int64_t code;
ASSERT(pRead->curVersion == pHead->head.version); ASSERT(pRead->curVersion == pHead->head.version);

View File

@ -30,7 +30,7 @@ int32_t walCommit(SWal *pWal, int64_t ver) {
} }
int32_t walRollback(SWal *pWal, int64_t ver) { int32_t walRollback(SWal *pWal, int64_t ver) {
int code; int64_t code;
char fnameStr[WAL_FILE_LEN]; char fnameStr[WAL_FILE_LEN];
if (ver > pWal->vers.lastVer || ver < pWal->vers.commitVer) { if (ver > pWal->vers.lastVer || ver < pWal->vers.commitVer) {
terrno = TSDB_CODE_WAL_INVALID_VER; terrno = TSDB_CODE_WAL_INVALID_VER;

View File

@ -310,9 +310,6 @@ int64_t taosCloseFile(TdFilePtr *ppFile) {
} }
int64_t taosReadFile(TdFilePtr pFile, void *buf, int64_t count) { int64_t taosReadFile(TdFilePtr pFile, void *buf, int64_t count) {
if (pFile == NULL) {
return 0;
}
#if FILE_WITH_LOCK #if FILE_WITH_LOCK
taosThreadRwlockRdlock(&(pFile->rwlock)); taosThreadRwlockRdlock(&(pFile->rwlock));
#endif #endif
@ -359,7 +356,7 @@ int64_t taosPReadFile(TdFilePtr pFile, void *buf, int64_t count, int64_t offset)
assert(pFile->fd >= 0); // Please check if you have closed the file. assert(pFile->fd >= 0); // Please check if you have closed the file.
#ifdef WINDOWS #ifdef WINDOWS
size_t pos = lseek(pFile->fd, 0, SEEK_CUR); size_t pos = lseek(pFile->fd, 0, SEEK_CUR);
lseek(pFile->fd, (long)offset, SEEK_SET); lseek(pFile->fd, offset, SEEK_SET);
int64_t ret = read(pFile->fd, buf, count); int64_t ret = read(pFile->fd, buf, count);
lseek(pFile->fd, pos, SEEK_SET); lseek(pFile->fd, pos, SEEK_SET);
#else #else
@ -372,9 +369,6 @@ int64_t taosPReadFile(TdFilePtr pFile, void *buf, int64_t count, int64_t offset)
} }
int64_t taosWriteFile(TdFilePtr pFile, const void *buf, int64_t count) { int64_t taosWriteFile(TdFilePtr pFile, const void *buf, int64_t count) {
if (pFile == NULL) {
return 0;
}
#if FILE_WITH_LOCK #if FILE_WITH_LOCK
taosThreadRwlockWrlock(&(pFile->rwlock)); taosThreadRwlockWrlock(&(pFile->rwlock));
#endif #endif
@ -406,14 +400,11 @@ int64_t taosWriteFile(TdFilePtr pFile, const void *buf, int64_t count) {
} }
int64_t taosLSeekFile(TdFilePtr pFile, int64_t offset, int32_t whence) { int64_t taosLSeekFile(TdFilePtr pFile, int64_t offset, int32_t whence) {
if (pFile == NULL) {
return 0;
}
#if FILE_WITH_LOCK #if FILE_WITH_LOCK
taosThreadRwlockRdlock(&(pFile->rwlock)); taosThreadRwlockRdlock(&(pFile->rwlock));
#endif #endif
assert(pFile->fd >= 0); // Please check if you have closed the file. assert(pFile->fd >= 0); // Please check if you have closed the file.
int64_t ret = lseek(pFile->fd, (long)offset, whence); int64_t ret = lseek(pFile->fd, offset, whence);
#if FILE_WITH_LOCK #if FILE_WITH_LOCK
taosThreadRwlockUnlock(&(pFile->rwlock)); taosThreadRwlockUnlock(&(pFile->rwlock));
#endif #endif
@ -424,9 +415,6 @@ int32_t taosFStatFile(TdFilePtr pFile, int64_t *size, int32_t *mtime) {
#ifdef WINDOWS #ifdef WINDOWS
return 0; return 0;
#else #else
if (pFile == NULL) {
return 0;
}
assert(pFile->fd >= 0); // Please check if you have closed the file. assert(pFile->fd >= 0); // Please check if you have closed the file.
struct stat fileStat; struct stat fileStat;
@ -451,9 +439,6 @@ int32_t taosLockFile(TdFilePtr pFile) {
#ifdef WINDOWS #ifdef WINDOWS
return 0; return 0;
#else #else
if (pFile == NULL) {
return 0;
}
assert(pFile->fd >= 0); // Please check if you have closed the file. assert(pFile->fd >= 0); // Please check if you have closed the file.
return (int32_t)flock(pFile->fd, LOCK_EX | LOCK_NB); return (int32_t)flock(pFile->fd, LOCK_EX | LOCK_NB);
@ -464,9 +449,6 @@ int32_t taosUnLockFile(TdFilePtr pFile) {
#ifdef WINDOWS #ifdef WINDOWS
return 0; return 0;
#else #else
if (pFile == NULL) {
return 0;
}
assert(pFile->fd >= 0); // Please check if you have closed the file. assert(pFile->fd >= 0); // Please check if you have closed the file.
return (int32_t)flock(pFile->fd, LOCK_UN | LOCK_NB); return (int32_t)flock(pFile->fd, LOCK_UN | LOCK_NB);