diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index a1ae1e429d..a0f421212a 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -201,6 +201,7 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead); int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead); int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead); +SWalRef *walRefFirstVer(SWal *, SWalRef *); SWalRef *walRefCommittedVer(SWal *); SWalRef *walOpenRef(SWal *); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 1d5fae33eb..b195cfafb0 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -521,7 +521,12 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { tqOffsetResetToData(&fetchOffsetNew, 0, 0); } } else { - tqOffsetResetToLog(&fetchOffsetNew, walGetFirstVer(pTq->pVnode->pWal)); + pHandle->pRef = walRefFirstVer(pTq->pVnode->pWal, pHandle->pRef); + if (pHandle->pRef == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + tqOffsetResetToLog(&fetchOffsetNew, pHandle->pRef->refVer - 1); } } else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) { if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { @@ -719,6 +724,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) { SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg; + tqDebug("vgId:%d, delete sub: %s", pTq->pVnode->config.vgId, pReq->subKey); + taosWLockLatch(&pTq->pushLock); int32_t code = taosHashRemove(pTq->pPushMgr, pReq->subKey, strlen(pReq->subKey)); if (code != 0) { diff --git a/source/libs/wal/src/walRef.c b/source/libs/wal/src/walRef.c index e86111109c..43470f4c82 100644 --- a/source/libs/wal/src/walRef.c +++ b/source/libs/wal/src/walRef.c @@ -77,14 +77,41 @@ void walUnrefVer(SWalRef *pRef) { } #endif -SWalRef *walRefCommittedVer(SWal *pWal) { - SWalRef *pRef = walOpenRef(pWal); +SWalRef *walRefFirstVer(SWal *pWal, SWalRef *pRef) { if (pRef == NULL) { - return NULL; + pRef = walOpenRef(pWal); + if (pRef == NULL) { + return NULL; + } } taosThreadMutexLock(&pWal->mutex); - int64_t ver = walGetCommittedVer(pWal); + int64_t ver = walGetFirstVer(pWal); + + wDebug("vgId:%d, wal ref version %" PRId64 " for first", pWal->cfg.vgId, ver); + + pRef->refVer = ver; + // bsearch in fileSet + SWalFileInfo tmpInfo; + tmpInfo.firstVer = ver; + SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE); + ASSERT(pRet != NULL); + pRef->refFile = pRet->firstVer; + + taosThreadMutexUnlock(&pWal->mutex); + return pRef; +} + +SWalRef *walRefCommittedVer(SWal *pWal) { + SWalRef *pRef = walOpenRef(pWal); + if (pRef == NULL) { + return NULL; + } + taosThreadMutexLock(&pWal->mutex); + + int64_t ver = walGetCommittedVer(pWal); + + wDebug("vgId:%d, wal ref version %" PRId64 " for committed", pWal->cfg.vgId, ver); pRef->refVer = ver; // bsearch in fileSet diff --git a/source/os/src/osTime.c b/source/os/src/osTime.c index cd4324a592..685693a709 100644 --- a/source/os/src/osTime.c +++ b/source/os/src/osTime.c @@ -33,6 +33,11 @@ #include //#define TM_YEAR_BASE 1970 //origin #define TM_YEAR_BASE 1900 // slguan + +// This magic number is the number of 100 nanosecond intervals since January 1, 1601 (UTC) +// until 00:00:00 January 1, 1970 +static const uint64_t TIMEEPOCH = ((uint64_t)116444736000000000ULL); + /* * We do not implement alternate representations. However, we always * check whether a given modifier is allowed for a certain conversion. @@ -341,15 +346,17 @@ char *taosStrpTime(const char *buf, const char *fmt, struct tm *tm) { int32_t taosGetTimeOfDay(struct timeval *tv) { #ifdef WINDOWS - time_t t; - t = taosGetTimestampSec(); - SYSTEMTIME st; - GetLocalTime(&st); + LARGE_INTEGER t; + FILETIME f; - tv->tv_sec = (long)t; - tv->tv_usec = st.wMilliseconds * 1000; + GetSystemTimeAsFileTime(&f); + t.QuadPart = f.dwHighDateTime; + t.QuadPart <<= 32; + t.QuadPart |= f.dwLowDateTime; - return 0; + t.QuadPart -= TIMEEPOCH; + tv->tv_sec = t.QuadPart / 10000000; + tv->tv_usec = (t.QuadPart % 10000000) / 10; #else return gettimeofday(tv, NULL); #endif @@ -550,37 +557,13 @@ int32_t taosClockGetTime(int clock_id, struct timespec *pTS) { #ifdef WINDOWS LARGE_INTEGER t; FILETIME f; - static FILETIME ff; - static SYSTEMTIME ss; - static LARGE_INTEGER offset; - - static int8_t offsetInit = 0; - static volatile bool offsetInitFinished = false; - int8_t old = atomic_val_compare_exchange_8(&offsetInit, 0, 1); - if (0 == old) { - ss.wYear = 1970; - ss.wMonth = 1; - ss.wDay = 1; - ss.wHour = 0; - ss.wMinute = 0; - ss.wSecond = 0; - ss.wMilliseconds = 0; - SystemTimeToFileTime(&ss, &ff); - offset.QuadPart = ff.dwHighDateTime; - offset.QuadPart <<= 32; - offset.QuadPart |= ff.dwLowDateTime; - offsetInitFinished = true; - } else { - while (!offsetInitFinished) - ; // Ensure initialization is completed. - } GetSystemTimeAsFileTime(&f); t.QuadPart = f.dwHighDateTime; t.QuadPart <<= 32; t.QuadPart |= f.dwLowDateTime; - t.QuadPart -= offset.QuadPart; + t.QuadPart -= TIMEEPOCH; pTS->tv_sec = t.QuadPart / 10000000; pTS->tv_nsec = (t.QuadPart % 10000000) * 100; return (0);