commit
12ac393fb0
|
@ -15,6 +15,7 @@
|
||||||
|
|
||||||
#include "tsdb.h"
|
#include "tsdb.h"
|
||||||
#include "tsdbFSet2.h"
|
#include "tsdbFSet2.h"
|
||||||
|
#include "tsdbMerge.h"
|
||||||
#include "tsdbReadUtil.h"
|
#include "tsdbReadUtil.h"
|
||||||
#include "tsdbSttFileRW.h"
|
#include "tsdbSttFileRW.h"
|
||||||
|
|
||||||
|
@ -352,10 +353,14 @@ static int32_t extractSttBlockInfo(SLDataIter *pIter, const TSttBlkArray *pArray
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t uidComparFn(const void *p1, const void *p2) {
|
static int32_t suidComparFn(const void *target, const void *p2) {
|
||||||
const uint64_t *uid1 = p1;
|
const uint64_t *targetUid = target;
|
||||||
const uint64_t *uid2 = p2;
|
const uint64_t *uid2 = p2;
|
||||||
return (*uid1) - (*uid2);
|
if (*uid2 == (*targetUid)) {
|
||||||
|
return 0;
|
||||||
|
} else {
|
||||||
|
return (*targetUid) < (*uid2) ? -1:1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool existsFromSttBlkStatis(const TStatisBlkArray *pStatisBlkArray, uint64_t suid, uint64_t uid,
|
static bool existsFromSttBlkStatis(const TStatisBlkArray *pStatisBlkArray, uint64_t suid, uint64_t uid,
|
||||||
|
@ -372,29 +377,55 @@ static bool existsFromSttBlkStatis(const TStatisBlkArray *pStatisBlkArray, uint6
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// for (; i < TARRAY2_SIZE(pStatisBlkArray); ++i) {
|
|
||||||
// SStatisBlk *p = &pStatisBlkArray->data[i];
|
|
||||||
// if (p->minTbid.uid <= uid && p->maxTbid.uid >= uid) {
|
|
||||||
// break;
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// if (p->maxTbid.uid < uid) {
|
|
||||||
// break;
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
|
|
||||||
if (i >= TARRAY2_SIZE(pStatisBlkArray)) {
|
if (i >= TARRAY2_SIZE(pStatisBlkArray)) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
while(i < TARRAY2_SIZE(pStatisBlkArray)) {
|
||||||
SStatisBlk *p = &pStatisBlkArray->data[i];
|
SStatisBlk *p = &pStatisBlkArray->data[i];
|
||||||
|
if (p->minTbid.suid > suid) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
STbStatisBlock block = {0};
|
STbStatisBlock block = {0};
|
||||||
tsdbSttFileReadStatisBlock(pReader, p, &block);
|
tsdbSttFileReadStatisBlock(pReader, p, &block);
|
||||||
|
|
||||||
int32_t index = tarray2SearchIdx(block.uid, &uid, sizeof(int64_t), uidComparFn, TD_EQ);
|
int32_t index = tarray2SearchIdx(block.suid, &suid, sizeof(int64_t), suidComparFn, TD_EQ);
|
||||||
|
if (index == -1) {
|
||||||
tStatisBlockDestroy(&block);
|
tStatisBlockDestroy(&block);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
return (index != -1);
|
int32_t j = index;
|
||||||
|
if (block.uid->data[j] == uid) {
|
||||||
|
tStatisBlockDestroy(&block);
|
||||||
|
return true;
|
||||||
|
} else if (block.uid->data[j] > uid) {
|
||||||
|
while (j >= 0 && block.suid->data[j] == suid) {
|
||||||
|
if (block.uid->data[j] == uid) {
|
||||||
|
tStatisBlockDestroy(&block);
|
||||||
|
return true;
|
||||||
|
} else {
|
||||||
|
j -= 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
j = index + 1;
|
||||||
|
while (j < block.suid->size && block.suid->data[j] == suid) {
|
||||||
|
if (block.uid->data[j] == uid) {
|
||||||
|
tStatisBlockDestroy(&block);
|
||||||
|
return true;
|
||||||
|
} else {
|
||||||
|
j += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
tStatisBlockDestroy(&block);
|
||||||
|
i += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tLDataIterOpen2(struct SLDataIter *pIter, SSttFileReader *pSttFileReader, int32_t iStt, int8_t backward,
|
int32_t tLDataIterOpen2(struct SLDataIter *pIter, SSttFileReader *pSttFileReader, int32_t iStt, int8_t backward,
|
||||||
|
@ -445,12 +476,12 @@ int32_t tLDataIterOpen2(struct SLDataIter *pIter, SSttFileReader *pSttFileReader
|
||||||
tsdbDebug("load the stt file info completed, elapsed time:%.2fms, %s", el, idStr);
|
tsdbDebug("load the stt file info completed, elapsed time:%.2fms, %s", el, idStr);
|
||||||
}
|
}
|
||||||
|
|
||||||
// bool exists = existsFromSttBlkStatis(pBlockLoadInfo->pSttStatisBlkArray, suid, uid, pIter->pReader);
|
bool exists = existsFromSttBlkStatis(pBlockLoadInfo->pSttStatisBlkArray, suid, uid, pIter->pReader);
|
||||||
// if (!exists) {
|
if (!exists) {
|
||||||
// pIter->iSttBlk = -1;
|
pIter->iSttBlk = -1;
|
||||||
// pIter->pSttBlk = NULL;
|
pIter->pSttBlk = NULL;
|
||||||
// return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
// }
|
}
|
||||||
|
|
||||||
// find the start block, actually we could load the position to avoid repeatly searching for the start position when
|
// find the start block, actually we could load the position to avoid repeatly searching for the start position when
|
||||||
// the skey is updated.
|
// the skey is updated.
|
||||||
|
|
|
@ -100,7 +100,7 @@ static int32_t tsdbDoMigrateFileObj(SRTNer *rtner, const STFileObj *fobj, const
|
||||||
.type = fobj->f->type,
|
.type = fobj->f->type,
|
||||||
.did = did[0],
|
.did = did[0],
|
||||||
.fid = fobj->f->fid,
|
.fid = fobj->f->fid,
|
||||||
.cid = rtner->cid,
|
.cid = fobj->f->cid,
|
||||||
.size = fobj->f->size,
|
.size = fobj->f->size,
|
||||||
.stt[0] =
|
.stt[0] =
|
||||||
{
|
{
|
||||||
|
|
|
@ -17,6 +17,15 @@
|
||||||
#include <pthread.h>
|
#include <pthread.h>
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
|
|
||||||
|
#ifdef WINDOWS
|
||||||
|
#define THREAD_PTR_CHECK(p) \
|
||||||
|
do { \
|
||||||
|
if (!(p) || !(*(p))) return 0; \
|
||||||
|
} while (0);
|
||||||
|
#else
|
||||||
|
#define THREAD_PTR_CHECK(p)
|
||||||
|
#endif
|
||||||
|
|
||||||
int32_t taosThreadCreate(TdThread *tid, const TdThreadAttr *attr, void *(*start)(void *), void *arg) {
|
int32_t taosThreadCreate(TdThread *tid, const TdThreadAttr *attr, void *(*start)(void *), void *arg) {
|
||||||
return pthread_create(tid, attr, start, arg);
|
return pthread_create(tid, attr, start, arg);
|
||||||
}
|
}
|
||||||
|
@ -83,9 +92,13 @@ int32_t taosThreadCondSignal(TdThreadCond *cond) { return pthread_cond_signal(co
|
||||||
|
|
||||||
int32_t taosThreadCondBroadcast(TdThreadCond *cond) { return pthread_cond_broadcast(cond); }
|
int32_t taosThreadCondBroadcast(TdThreadCond *cond) { return pthread_cond_broadcast(cond); }
|
||||||
|
|
||||||
int32_t taosThreadCondWait(TdThreadCond *cond, TdThreadMutex *mutex) { return pthread_cond_wait(cond, mutex); }
|
int32_t taosThreadCondWait(TdThreadCond *cond, TdThreadMutex *mutex) {
|
||||||
|
THREAD_PTR_CHECK(mutex)
|
||||||
|
return pthread_cond_wait(cond, mutex);
|
||||||
|
}
|
||||||
|
|
||||||
int32_t taosThreadCondTimedWait(TdThreadCond *cond, TdThreadMutex *mutex, const struct timespec *abstime) {
|
int32_t taosThreadCondTimedWait(TdThreadCond *cond, TdThreadMutex *mutex, const struct timespec *abstime) {
|
||||||
|
THREAD_PTR_CHECK(mutex)
|
||||||
return pthread_cond_timedwait(cond, mutex, abstime);
|
return pthread_cond_timedwait(cond, mutex, abstime);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -124,24 +137,37 @@ int32_t taosThreadKeyDelete(TdThreadKey key) { return pthread_key_delete(key); }
|
||||||
int32_t taosThreadKill(TdThread thread, int32_t sig) { return pthread_kill(thread, sig); }
|
int32_t taosThreadKill(TdThread thread, int32_t sig) { return pthread_kill(thread, sig); }
|
||||||
|
|
||||||
// int32_t taosThreadMutexConsistent(TdThreadMutex* mutex) {
|
// int32_t taosThreadMutexConsistent(TdThreadMutex* mutex) {
|
||||||
|
// THREAD_PTR_CHECK(mutex)
|
||||||
// return pthread_mutex_consistent(mutex);
|
// return pthread_mutex_consistent(mutex);
|
||||||
// }
|
// }
|
||||||
|
|
||||||
int32_t taosThreadMutexDestroy(TdThreadMutex *mutex) { return pthread_mutex_destroy(mutex); }
|
int32_t taosThreadMutexDestroy(TdThreadMutex *mutex) {
|
||||||
|
THREAD_PTR_CHECK(mutex)
|
||||||
|
return pthread_mutex_destroy(mutex);
|
||||||
|
}
|
||||||
|
|
||||||
int32_t taosThreadMutexInit(TdThreadMutex *mutex, const TdThreadMutexAttr *attr) {
|
int32_t taosThreadMutexInit(TdThreadMutex *mutex, const TdThreadMutexAttr *attr) {
|
||||||
return pthread_mutex_init(mutex, attr);
|
return pthread_mutex_init(mutex, attr);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t taosThreadMutexLock(TdThreadMutex *mutex) { return pthread_mutex_lock(mutex); }
|
int32_t taosThreadMutexLock(TdThreadMutex *mutex) {
|
||||||
|
THREAD_PTR_CHECK(mutex)
|
||||||
|
return pthread_mutex_lock(mutex);
|
||||||
|
}
|
||||||
|
|
||||||
// int32_t taosThreadMutexTimedLock(TdThreadMutex * mutex, const struct timespec *abstime) {
|
// int32_t taosThreadMutexTimedLock(TdThreadMutex * mutex, const struct timespec *abstime) {
|
||||||
// return pthread_mutex_timedlock(mutex, abstime);
|
// return pthread_mutex_timedlock(mutex, abstime);
|
||||||
// }
|
// }
|
||||||
|
|
||||||
int32_t taosThreadMutexTryLock(TdThreadMutex *mutex) { return pthread_mutex_trylock(mutex); }
|
int32_t taosThreadMutexTryLock(TdThreadMutex *mutex) {
|
||||||
|
THREAD_PTR_CHECK(mutex)
|
||||||
|
return pthread_mutex_trylock(mutex);
|
||||||
|
}
|
||||||
|
|
||||||
int32_t taosThreadMutexUnlock(TdThreadMutex *mutex) { return pthread_mutex_unlock(mutex); }
|
int32_t taosThreadMutexUnlock(TdThreadMutex *mutex) {
|
||||||
|
THREAD_PTR_CHECK(mutex)
|
||||||
|
return pthread_mutex_unlock(mutex);
|
||||||
|
}
|
||||||
|
|
||||||
int32_t taosThreadMutexAttrDestroy(TdThreadMutexAttr *attr) { return pthread_mutexattr_destroy(attr); }
|
int32_t taosThreadMutexAttrDestroy(TdThreadMutexAttr *attr) { return pthread_mutexattr_destroy(attr); }
|
||||||
|
|
||||||
|
@ -224,6 +250,7 @@ int32_t taosThreadSetSchedParam(TdThread thread, int32_t policy, const struct sc
|
||||||
int32_t taosThreadSetSpecific(TdThreadKey key, const void *value) { return pthread_setspecific(key, value); }
|
int32_t taosThreadSetSpecific(TdThreadKey key, const void *value) { return pthread_setspecific(key, value); }
|
||||||
|
|
||||||
int32_t taosThreadSpinDestroy(TdThreadSpinlock *lock) {
|
int32_t taosThreadSpinDestroy(TdThreadSpinlock *lock) {
|
||||||
|
THREAD_PTR_CHECK(lock)
|
||||||
#ifdef TD_USE_SPINLOCK_AS_MUTEX
|
#ifdef TD_USE_SPINLOCK_AS_MUTEX
|
||||||
return pthread_mutex_destroy((pthread_mutex_t *)lock);
|
return pthread_mutex_destroy((pthread_mutex_t *)lock);
|
||||||
#else
|
#else
|
||||||
|
@ -242,6 +269,7 @@ int32_t taosThreadSpinInit(TdThreadSpinlock *lock, int32_t pshared) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t taosThreadSpinLock(TdThreadSpinlock *lock) {
|
int32_t taosThreadSpinLock(TdThreadSpinlock *lock) {
|
||||||
|
THREAD_PTR_CHECK(lock)
|
||||||
#ifdef TD_USE_SPINLOCK_AS_MUTEX
|
#ifdef TD_USE_SPINLOCK_AS_MUTEX
|
||||||
return pthread_mutex_lock((pthread_mutex_t *)lock);
|
return pthread_mutex_lock((pthread_mutex_t *)lock);
|
||||||
#else
|
#else
|
||||||
|
@ -250,6 +278,7 @@ int32_t taosThreadSpinLock(TdThreadSpinlock *lock) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t taosThreadSpinTrylock(TdThreadSpinlock *lock) {
|
int32_t taosThreadSpinTrylock(TdThreadSpinlock *lock) {
|
||||||
|
THREAD_PTR_CHECK(lock)
|
||||||
#ifdef TD_USE_SPINLOCK_AS_MUTEX
|
#ifdef TD_USE_SPINLOCK_AS_MUTEX
|
||||||
return pthread_mutex_trylock((pthread_mutex_t *)lock);
|
return pthread_mutex_trylock((pthread_mutex_t *)lock);
|
||||||
#else
|
#else
|
||||||
|
@ -258,6 +287,7 @@ int32_t taosThreadSpinTrylock(TdThreadSpinlock *lock) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t taosThreadSpinUnlock(TdThreadSpinlock *lock) {
|
int32_t taosThreadSpinUnlock(TdThreadSpinlock *lock) {
|
||||||
|
THREAD_PTR_CHECK(lock)
|
||||||
#ifdef TD_USE_SPINLOCK_AS_MUTEX
|
#ifdef TD_USE_SPINLOCK_AS_MUTEX
|
||||||
return pthread_mutex_unlock((pthread_mutex_t *)lock);
|
return pthread_mutex_unlock((pthread_mutex_t *)lock);
|
||||||
#else
|
#else
|
||||||
|
|
Loading…
Reference in New Issue