From 2a22e55705c49dbcc0170dc20511a3eb2e1bb033 Mon Sep 17 00:00:00 2001 From: kailixu Date: Tue, 1 Aug 2023 19:42:05 +0800 Subject: [PATCH 1/5] fix: null pointer check for mutex on windows --- source/os/src/osThread.c | 42 ++++++++++++++++++++++++++++++++++------ 1 file changed, 36 insertions(+), 6 deletions(-) diff --git a/source/os/src/osThread.c b/source/os/src/osThread.c index 39ba92fdc5..59b1ab5b3e 100644 --- a/source/os/src/osThread.c +++ b/source/os/src/osThread.c @@ -17,6 +17,15 @@ #include #include "os.h" +#ifdef WINDOWS +#define THREAD_PTR_CHECK(p) \ + do { \ + if (!(p) || !(*(p))) return 0; \ + } while (0); +#else +#define THREAD_PTR_CHECK(p) +#endif + int32_t taosThreadCreate(TdThread *tid, const TdThreadAttr *attr, void *(*start)(void *), void *arg) { return pthread_create(tid, attr, start, arg); } @@ -83,9 +92,13 @@ int32_t taosThreadCondSignal(TdThreadCond *cond) { return pthread_cond_signal(co int32_t taosThreadCondBroadcast(TdThreadCond *cond) { return pthread_cond_broadcast(cond); } -int32_t taosThreadCondWait(TdThreadCond *cond, TdThreadMutex *mutex) { return pthread_cond_wait(cond, mutex); } +int32_t taosThreadCondWait(TdThreadCond *cond, TdThreadMutex *mutex) { + THREAD_PTR_CHECK(mutex) + return pthread_cond_wait(cond, mutex); +} int32_t taosThreadCondTimedWait(TdThreadCond *cond, TdThreadMutex *mutex, const struct timespec *abstime) { + THREAD_PTR_CHECK(mutex) return pthread_cond_timedwait(cond, mutex, abstime); } @@ -124,24 +137,37 @@ int32_t taosThreadKeyDelete(TdThreadKey key) { return pthread_key_delete(key); } int32_t taosThreadKill(TdThread thread, int32_t sig) { return pthread_kill(thread, sig); } // int32_t taosThreadMutexConsistent(TdThreadMutex* mutex) { +// THREAD_PTR_CHECK(mutex) // return pthread_mutex_consistent(mutex); // } -int32_t taosThreadMutexDestroy(TdThreadMutex *mutex) { return pthread_mutex_destroy(mutex); } +int32_t taosThreadMutexDestroy(TdThreadMutex *mutex) { + THREAD_PTR_CHECK(mutex) + return pthread_mutex_destroy(mutex); +} int32_t taosThreadMutexInit(TdThreadMutex *mutex, const TdThreadMutexAttr *attr) { return pthread_mutex_init(mutex, attr); } -int32_t taosThreadMutexLock(TdThreadMutex *mutex) { return pthread_mutex_lock(mutex); } +int32_t taosThreadMutexLock(TdThreadMutex *mutex) { + THREAD_PTR_CHECK(mutex) + return pthread_mutex_lock(mutex); +} // int32_t taosThreadMutexTimedLock(TdThreadMutex * mutex, const struct timespec *abstime) { // return pthread_mutex_timedlock(mutex, abstime); // } -int32_t taosThreadMutexTryLock(TdThreadMutex *mutex) { return pthread_mutex_trylock(mutex); } +int32_t taosThreadMutexTryLock(TdThreadMutex *mutex) { + THREAD_PTR_CHECK(mutex) + return pthread_mutex_trylock(mutex); +} -int32_t taosThreadMutexUnlock(TdThreadMutex *mutex) { return pthread_mutex_unlock(mutex); } +int32_t taosThreadMutexUnlock(TdThreadMutex *mutex) { + THREAD_PTR_CHECK(mutex) + return pthread_mutex_unlock(mutex); +} int32_t taosThreadMutexAttrDestroy(TdThreadMutexAttr *attr) { return pthread_mutexattr_destroy(attr); } @@ -224,6 +250,7 @@ int32_t taosThreadSetSchedParam(TdThread thread, int32_t policy, const struct sc int32_t taosThreadSetSpecific(TdThreadKey key, const void *value) { return pthread_setspecific(key, value); } int32_t taosThreadSpinDestroy(TdThreadSpinlock *lock) { + THREAD_PTR_CHECK(lock) #ifdef TD_USE_SPINLOCK_AS_MUTEX return pthread_mutex_destroy((pthread_mutex_t *)lock); #else @@ -242,6 +269,7 @@ int32_t taosThreadSpinInit(TdThreadSpinlock *lock, int32_t pshared) { } int32_t taosThreadSpinLock(TdThreadSpinlock *lock) { + THREAD_PTR_CHECK(lock) #ifdef TD_USE_SPINLOCK_AS_MUTEX return pthread_mutex_lock((pthread_mutex_t *)lock); #else @@ -250,6 +278,7 @@ int32_t taosThreadSpinLock(TdThreadSpinlock *lock) { } int32_t taosThreadSpinTrylock(TdThreadSpinlock *lock) { + THREAD_PTR_CHECK(lock) #ifdef TD_USE_SPINLOCK_AS_MUTEX return pthread_mutex_trylock((pthread_mutex_t *)lock); #else @@ -258,6 +287,7 @@ int32_t taosThreadSpinTrylock(TdThreadSpinlock *lock) { } int32_t taosThreadSpinUnlock(TdThreadSpinlock *lock) { + THREAD_PTR_CHECK(lock) #ifdef TD_USE_SPINLOCK_AS_MUTEX return pthread_mutex_unlock((pthread_mutex_t *)lock); #else @@ -267,4 +297,4 @@ int32_t taosThreadSpinUnlock(TdThreadSpinlock *lock) { void taosThreadTestCancel(void) { return pthread_testcancel(); } -void taosThreadClear(TdThread *thread) { memset(thread, 0, sizeof(TdThread)); } \ No newline at end of file +void taosThreadClear(TdThread *thread) { memset(thread, 0, sizeof(TdThread)); } From 42da90e459fb985012530a908ec116c054126319 Mon Sep 17 00:00:00 2001 From: kailixu Date: Tue, 1 Aug 2023 19:45:14 +0800 Subject: [PATCH 2/5] chore: code revert --- source/os/src/osThread.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/os/src/osThread.c b/source/os/src/osThread.c index 59b1ab5b3e..4c4e22bdd9 100644 --- a/source/os/src/osThread.c +++ b/source/os/src/osThread.c @@ -297,4 +297,4 @@ int32_t taosThreadSpinUnlock(TdThreadSpinlock *lock) { void taosThreadTestCancel(void) { return pthread_testcancel(); } -void taosThreadClear(TdThread *thread) { memset(thread, 0, sizeof(TdThread)); } +void taosThreadClear(TdThread *thread) { memset(thread, 0, sizeof(TdThread)); } \ No newline at end of file From 4ba4e83b5b7bc50829c831746a8bb6fe82928c80 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Tue, 1 Aug 2023 20:19:13 +0800 Subject: [PATCH 3/5] fix: retention file corruption --- source/dnode/vnode/src/tsdb/tsdbRetention.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRetention.c b/source/dnode/vnode/src/tsdb/tsdbRetention.c index 3af9d2a07a..a4d5715083 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRetention.c +++ b/source/dnode/vnode/src/tsdb/tsdbRetention.c @@ -100,7 +100,7 @@ static int32_t tsdbDoMigrateFileObj(SRTNer *rtner, const STFileObj *fobj, const .type = fobj->f->type, .did = did[0], .fid = fobj->f->fid, - .cid = rtner->cid, + .cid = fobj->f->cid, .size = fobj->f->size, .stt[0] = { From 2c357f1958e4880763fdd499844b7142c264e0d3 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 2 Aug 2023 11:18:35 +0800 Subject: [PATCH 4/5] fix(stream): enable filter table based on table statistics. --- source/dnode/vnode/src/tsdb/tsdbMergeTree.c | 69 ++++++++++++++------- 1 file changed, 46 insertions(+), 23 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c index ea5b574ced..bbb5ce79fa 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c +++ b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c @@ -15,6 +15,7 @@ #include "tsdb.h" #include "tsdbFSet2.h" +#include "tsdbMerge.h" #include "tsdbReadUtil.h" #include "tsdbSttFileRW.h" @@ -352,10 +353,14 @@ static int32_t extractSttBlockInfo(SLDataIter *pIter, const TSttBlkArray *pArray return TSDB_CODE_SUCCESS; } -static int32_t uidComparFn(const void *p1, const void *p2) { - const uint64_t *uid1 = p1; +static int32_t suidComparFn(const void *target, const void *p2) { + const uint64_t *targetUid = target; const uint64_t *uid2 = p2; - return (*uid1) - (*uid2); + if (*uid2 == (*targetUid)) { + return 0; + } else { + return (*targetUid) < (*uid2) ? -1:1; + } } static bool existsFromSttBlkStatis(const TStatisBlkArray *pStatisBlkArray, uint64_t suid, uint64_t uid, @@ -372,17 +377,6 @@ static bool existsFromSttBlkStatis(const TStatisBlkArray *pStatisBlkArray, uint6 } } - // for (; i < TARRAY2_SIZE(pStatisBlkArray); ++i) { - // SStatisBlk *p = &pStatisBlkArray->data[i]; - // if (p->minTbid.uid <= uid && p->maxTbid.uid >= uid) { - // break; - // } - // - // if (p->maxTbid.uid < uid) { - // break; - // } - // } - if (i >= TARRAY2_SIZE(pStatisBlkArray)) { return false; } @@ -391,10 +385,39 @@ static bool existsFromSttBlkStatis(const TStatisBlkArray *pStatisBlkArray, uint6 STbStatisBlock block = {0}; tsdbSttFileReadStatisBlock(pReader, p, &block); - int32_t index = tarray2SearchIdx(block.uid, &uid, sizeof(int64_t), uidComparFn, TD_EQ); - tStatisBlockDestroy(&block); + int32_t index = tarray2SearchIdx(block.suid, &suid, sizeof(int64_t), suidComparFn, TD_EQ); + if (index == -1) { + tStatisBlockDestroy(&block); + return false; + } + + int32_t j = index; + if (block.uid->data[j] == uid) { + tStatisBlockDestroy(&block); + return true; + } else if (block.uid->data[j] > uid) { + while (j >= 0 && block.suid->data[j] == suid) { + if (block.uid->data[j] == uid) { + tStatisBlockDestroy(&block); + return true; + } else { + j -= 1; + } + } + } else { + j = index + 1; + while (j < block.suid->size && block.suid->data[j] == suid) { + if (block.uid->data[j] == uid) { + tStatisBlockDestroy(&block); + return true; + } else { + j += 1; + } + } + } - return (index != -1); + tStatisBlockDestroy(&block); + return false; } int32_t tLDataIterOpen2(struct SLDataIter *pIter, SSttFileReader *pSttFileReader, int32_t iStt, int8_t backward, @@ -445,12 +468,12 @@ int32_t tLDataIterOpen2(struct SLDataIter *pIter, SSttFileReader *pSttFileReader tsdbDebug("load the stt file info completed, elapsed time:%.2fms, %s", el, idStr); } - // bool exists = existsFromSttBlkStatis(pBlockLoadInfo->pSttStatisBlkArray, suid, uid, pIter->pReader); - // if (!exists) { - // pIter->iSttBlk = -1; - // pIter->pSttBlk = NULL; - // return TSDB_CODE_SUCCESS; - // } + bool exists = existsFromSttBlkStatis(pBlockLoadInfo->pSttStatisBlkArray, suid, uid, pIter->pReader); + if (!exists) { + pIter->iSttBlk = -1; + pIter->pSttBlk = NULL; + return TSDB_CODE_SUCCESS; + } // find the start block, actually we could load the position to avoid repeatly searching for the start position when // the skey is updated. From 0feffc687b3b80ce55f140d0fa08112e86d930d2 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 2 Aug 2023 12:44:45 +0800 Subject: [PATCH 5/5] fix(tsdb): check uid in multiple stt statistics blocks. --- source/dnode/vnode/src/tsdb/tsdbMergeTree.c | 70 ++++++++++++--------- 1 file changed, 39 insertions(+), 31 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c index bbb5ce79fa..d74584f844 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c +++ b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c @@ -381,42 +381,50 @@ static bool existsFromSttBlkStatis(const TStatisBlkArray *pStatisBlkArray, uint6 return false; } - SStatisBlk *p = &pStatisBlkArray->data[i]; - STbStatisBlock block = {0}; - tsdbSttFileReadStatisBlock(pReader, p, &block); + while(i < TARRAY2_SIZE(pStatisBlkArray)) { + SStatisBlk *p = &pStatisBlkArray->data[i]; + if (p->minTbid.suid > suid) { + return false; + } - int32_t index = tarray2SearchIdx(block.suid, &suid, sizeof(int64_t), suidComparFn, TD_EQ); - if (index == -1) { - tStatisBlockDestroy(&block); - return false; - } - - int32_t j = index; - if (block.uid->data[j] == uid) { - tStatisBlockDestroy(&block); - return true; - } else if (block.uid->data[j] > uid) { - while (j >= 0 && block.suid->data[j] == suid) { - if (block.uid->data[j] == uid) { - tStatisBlockDestroy(&block); - return true; - } else { - j -= 1; - } - } - } else { - j = index + 1; - while (j < block.suid->size && block.suid->data[j] == suid) { - if (block.uid->data[j] == uid) { - tStatisBlockDestroy(&block); - return true; - } else { - j += 1; + STbStatisBlock block = {0}; + tsdbSttFileReadStatisBlock(pReader, p, &block); + + int32_t index = tarray2SearchIdx(block.suid, &suid, sizeof(int64_t), suidComparFn, TD_EQ); + if (index == -1) { + tStatisBlockDestroy(&block); + return false; + } + + int32_t j = index; + if (block.uid->data[j] == uid) { + tStatisBlockDestroy(&block); + return true; + } else if (block.uid->data[j] > uid) { + while (j >= 0 && block.suid->data[j] == suid) { + if (block.uid->data[j] == uid) { + tStatisBlockDestroy(&block); + return true; + } else { + j -= 1; + } + } + } else { + j = index + 1; + while (j < block.suid->size && block.suid->data[j] == suid) { + if (block.uid->data[j] == uid) { + tStatisBlockDestroy(&block); + return true; + } else { + j += 1; + } } } + + tStatisBlockDestroy(&block); + i += 1; } - tStatisBlockDestroy(&block); return false; }