Merge branch 'main' of https://github.com/taosdata/TDengine into fix/TD-21761
This commit is contained in:
commit
f3e199dda2
|
@ -201,6 +201,7 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead);
|
||||||
int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead);
|
int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead);
|
||||||
int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead);
|
int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead);
|
||||||
|
|
||||||
|
SWalRef *walRefFirstVer(SWal *, SWalRef *);
|
||||||
SWalRef *walRefCommittedVer(SWal *);
|
SWalRef *walRefCommittedVer(SWal *);
|
||||||
|
|
||||||
SWalRef *walOpenRef(SWal *);
|
SWalRef *walOpenRef(SWal *);
|
||||||
|
|
|
@ -521,7 +521,12 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
tqOffsetResetToData(&fetchOffsetNew, 0, 0);
|
tqOffsetResetToData(&fetchOffsetNew, 0, 0);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
tqOffsetResetToLog(&fetchOffsetNew, walGetFirstVer(pTq->pVnode->pWal));
|
pHandle->pRef = walRefFirstVer(pTq->pVnode->pWal, pHandle->pRef);
|
||||||
|
if (pHandle->pRef == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
tqOffsetResetToLog(&fetchOffsetNew, pHandle->pRef->refVer - 1);
|
||||||
}
|
}
|
||||||
} else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
|
} else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
|
||||||
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||||
|
@ -719,6 +724,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
|
int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
|
||||||
SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
|
SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
|
||||||
|
|
||||||
|
tqDebug("vgId:%d, delete sub: %s", pTq->pVnode->config.vgId, pReq->subKey);
|
||||||
|
|
||||||
taosWLockLatch(&pTq->pushLock);
|
taosWLockLatch(&pTq->pushLock);
|
||||||
int32_t code = taosHashRemove(pTq->pPushMgr, pReq->subKey, strlen(pReq->subKey));
|
int32_t code = taosHashRemove(pTq->pPushMgr, pReq->subKey, strlen(pReq->subKey));
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
|
|
|
@ -77,14 +77,41 @@ void walUnrefVer(SWalRef *pRef) {
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
SWalRef *walRefCommittedVer(SWal *pWal) {
|
SWalRef *walRefFirstVer(SWal *pWal, SWalRef *pRef) {
|
||||||
SWalRef *pRef = walOpenRef(pWal);
|
|
||||||
if (pRef == NULL) {
|
if (pRef == NULL) {
|
||||||
return NULL;
|
pRef = walOpenRef(pWal);
|
||||||
|
if (pRef == NULL) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
taosThreadMutexLock(&pWal->mutex);
|
taosThreadMutexLock(&pWal->mutex);
|
||||||
|
|
||||||
int64_t ver = walGetCommittedVer(pWal);
|
int64_t ver = walGetFirstVer(pWal);
|
||||||
|
|
||||||
|
wDebug("vgId:%d, wal ref version %" PRId64 " for first", pWal->cfg.vgId, ver);
|
||||||
|
|
||||||
|
pRef->refVer = ver;
|
||||||
|
// bsearch in fileSet
|
||||||
|
SWalFileInfo tmpInfo;
|
||||||
|
tmpInfo.firstVer = ver;
|
||||||
|
SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE);
|
||||||
|
ASSERT(pRet != NULL);
|
||||||
|
pRef->refFile = pRet->firstVer;
|
||||||
|
|
||||||
|
taosThreadMutexUnlock(&pWal->mutex);
|
||||||
|
return pRef;
|
||||||
|
}
|
||||||
|
|
||||||
|
SWalRef *walRefCommittedVer(SWal *pWal) {
|
||||||
|
SWalRef *pRef = walOpenRef(pWal);
|
||||||
|
if (pRef == NULL) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
taosThreadMutexLock(&pWal->mutex);
|
||||||
|
|
||||||
|
int64_t ver = walGetCommittedVer(pWal);
|
||||||
|
|
||||||
|
wDebug("vgId:%d, wal ref version %" PRId64 " for committed", pWal->cfg.vgId, ver);
|
||||||
|
|
||||||
pRef->refVer = ver;
|
pRef->refVer = ver;
|
||||||
// bsearch in fileSet
|
// bsearch in fileSet
|
||||||
|
|
|
@ -33,6 +33,11 @@
|
||||||
#include <time.h>
|
#include <time.h>
|
||||||
//#define TM_YEAR_BASE 1970 //origin
|
//#define TM_YEAR_BASE 1970 //origin
|
||||||
#define TM_YEAR_BASE 1900 // slguan
|
#define TM_YEAR_BASE 1900 // slguan
|
||||||
|
|
||||||
|
// This magic number is the number of 100 nanosecond intervals since January 1, 1601 (UTC)
|
||||||
|
// until 00:00:00 January 1, 1970
|
||||||
|
static const uint64_t TIMEEPOCH = ((uint64_t)116444736000000000ULL);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* We do not implement alternate representations. However, we always
|
* We do not implement alternate representations. However, we always
|
||||||
* check whether a given modifier is allowed for a certain conversion.
|
* check whether a given modifier is allowed for a certain conversion.
|
||||||
|
@ -341,15 +346,17 @@ char *taosStrpTime(const char *buf, const char *fmt, struct tm *tm) {
|
||||||
|
|
||||||
int32_t taosGetTimeOfDay(struct timeval *tv) {
|
int32_t taosGetTimeOfDay(struct timeval *tv) {
|
||||||
#ifdef WINDOWS
|
#ifdef WINDOWS
|
||||||
time_t t;
|
LARGE_INTEGER t;
|
||||||
t = taosGetTimestampSec();
|
FILETIME f;
|
||||||
SYSTEMTIME st;
|
|
||||||
GetLocalTime(&st);
|
|
||||||
|
|
||||||
tv->tv_sec = (long)t;
|
GetSystemTimeAsFileTime(&f);
|
||||||
tv->tv_usec = st.wMilliseconds * 1000;
|
t.QuadPart = f.dwHighDateTime;
|
||||||
|
t.QuadPart <<= 32;
|
||||||
|
t.QuadPart |= f.dwLowDateTime;
|
||||||
|
|
||||||
return 0;
|
t.QuadPart -= TIMEEPOCH;
|
||||||
|
tv->tv_sec = t.QuadPart / 10000000;
|
||||||
|
tv->tv_usec = (t.QuadPart % 10000000) / 10;
|
||||||
#else
|
#else
|
||||||
return gettimeofday(tv, NULL);
|
return gettimeofday(tv, NULL);
|
||||||
#endif
|
#endif
|
||||||
|
@ -550,37 +557,13 @@ int32_t taosClockGetTime(int clock_id, struct timespec *pTS) {
|
||||||
#ifdef WINDOWS
|
#ifdef WINDOWS
|
||||||
LARGE_INTEGER t;
|
LARGE_INTEGER t;
|
||||||
FILETIME f;
|
FILETIME f;
|
||||||
static FILETIME ff;
|
|
||||||
static SYSTEMTIME ss;
|
|
||||||
static LARGE_INTEGER offset;
|
|
||||||
|
|
||||||
static int8_t offsetInit = 0;
|
|
||||||
static volatile bool offsetInitFinished = false;
|
|
||||||
int8_t old = atomic_val_compare_exchange_8(&offsetInit, 0, 1);
|
|
||||||
if (0 == old) {
|
|
||||||
ss.wYear = 1970;
|
|
||||||
ss.wMonth = 1;
|
|
||||||
ss.wDay = 1;
|
|
||||||
ss.wHour = 0;
|
|
||||||
ss.wMinute = 0;
|
|
||||||
ss.wSecond = 0;
|
|
||||||
ss.wMilliseconds = 0;
|
|
||||||
SystemTimeToFileTime(&ss, &ff);
|
|
||||||
offset.QuadPart = ff.dwHighDateTime;
|
|
||||||
offset.QuadPart <<= 32;
|
|
||||||
offset.QuadPart |= ff.dwLowDateTime;
|
|
||||||
offsetInitFinished = true;
|
|
||||||
} else {
|
|
||||||
while (!offsetInitFinished)
|
|
||||||
; // Ensure initialization is completed.
|
|
||||||
}
|
|
||||||
|
|
||||||
GetSystemTimeAsFileTime(&f);
|
GetSystemTimeAsFileTime(&f);
|
||||||
t.QuadPart = f.dwHighDateTime;
|
t.QuadPart = f.dwHighDateTime;
|
||||||
t.QuadPart <<= 32;
|
t.QuadPart <<= 32;
|
||||||
t.QuadPart |= f.dwLowDateTime;
|
t.QuadPart |= f.dwLowDateTime;
|
||||||
|
|
||||||
t.QuadPart -= offset.QuadPart;
|
t.QuadPart -= TIMEEPOCH;
|
||||||
pTS->tv_sec = t.QuadPart / 10000000;
|
pTS->tv_sec = t.QuadPart / 10000000;
|
||||||
pTS->tv_nsec = (t.QuadPart % 10000000) * 100;
|
pTS->tv_nsec = (t.QuadPart % 10000000) * 100;
|
||||||
return (0);
|
return (0);
|
||||||
|
|
Loading…
Reference in New Issue