From f28cf740b09fc4e0d539f3b722f7c5216e56a174 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 24 Nov 2023 14:44:00 +0800 Subject: [PATCH 01/20] fix(stream): fix status check for stream-scan-history status when drop stream task msg is missing. --- include/libs/stream/tstream.h | 2 ++ source/dnode/mnode/impl/src/mndStream.c | 19 +++++++++++++++-- source/util/src/tdecompress.c | 27 +++++++++++++------------ 3 files changed, 33 insertions(+), 15 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index cf9fc1d826..b8ceb31b80 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -664,6 +664,8 @@ typedef struct STaskStatusEntry { int32_t relatedHTask; // has related fill-history task int64_t activeCheckpointId; // current active checkpoint id bool checkpointFailed; // denote if the checkpoint is failed or not + bool inputQChanging; // inputQ is changing or not + int64_t inputQUnchangeCounter; double inputQUsed; // in MiB double inputRate; double sinkQuota; // existed quota size for sink task diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index db013017e3..5490d53587 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -2790,7 +2790,7 @@ static SStreamTask* mndGetStreamTask(STaskId* pId, SStreamObj* pStream) { static bool needDropRelatedFillhistoryTask(STaskStatusEntry *pTaskEntry, SStreamExecInfo *pExecNode) { if (pTaskEntry->status == TASK_STATUS__STREAM_SCAN_HISTORY && pTaskEntry->statusLastDuration >= 10) { - if (fabs(pTaskEntry->inputQUsed) <= DBL_EPSILON) { + if (!pTaskEntry->inputQChanging && pTaskEntry->inputQUnchangeCounter > 10) { int32_t numOfReady = 0; int32_t numOfTotal = 0; for (int32_t k = 0; k < taosArrayGetSize(pExecNode->pTaskList); ++k) { @@ -2933,6 +2933,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { bool snodeChanged = false; for (int32_t i = 0; i < req.numOfTasks; ++i) { STaskStatusEntry *p = taosArrayGet(req.pTaskStatus, i); + STaskStatusEntry *pTaskEntry = taosHashGet(execInfo.pTaskMap, &p->id, sizeof(p->id)); if (pTaskEntry == NULL) { mError("s-task:0x%" PRIx64 " not found in mnode task list", p->id.taskId); @@ -2941,8 +2942,22 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { if (pTaskEntry->stage != p->stage && pTaskEntry->stage != -1) { updateStageInfo(pTaskEntry, p->stage); - if(pTaskEntry->nodeId == SNODE_HANDLE) snodeChanged = true; + if(pTaskEntry->nodeId == SNODE_HANDLE) { + snodeChanged = true; + } } else { + // task is idle for more than 50 sec. + if (fabs(pTaskEntry->inputQUsed - p->inputQUsed) <= DBL_EPSILON) { + if (!pTaskEntry->inputQChanging) { + pTaskEntry->inputQUnchangeCounter++; + } else { + pTaskEntry->inputQChanging = false; + } + } else { + pTaskEntry->inputQChanging = true; + pTaskEntry->inputQUnchangeCounter = 0; + } + streamTaskStatusCopy(pTaskEntry, p); if (p->activeCheckpointId != 0) { if (activeCheckpointId != 0) { diff --git a/source/util/src/tdecompress.c b/source/util/src/tdecompress.c index 68841941db..19f4e1fd92 100644 --- a/source/util/src/tdecompress.c +++ b/source/util/src/tdecompress.c @@ -53,11 +53,8 @@ int32_t tsDecompressIntImpl_Hw(const char *const input, const int32_t nelements, int64_t prevValue = 0; #if __AVX2__ - while (1) { - if (_pos == nelements) break; - - uint64_t w = 0; - memcpy(&w, ip, LONG_BYTES); + while (_pos < nelements) { + uint64_t w = *(uint64_t*) ip; char selector = (char)(w & INT64MASK(4)); // selector = 4 char bit = bit_per_integer[(int32_t)selector]; // bit = 3 @@ -261,17 +258,20 @@ int32_t tsDecompressTimestampAvx2(const char* const input, const int32_t nelemen int64_t deltaOfDelta = 0; int32_t longBytes = LONG_BYTES; + // _mm_maskz_loadu_epi8 #if __AVX2__ - int32_t batch = nelements >> 2; - int32_t remainder = nelements & 0x1; + // _mm_blendv_epi8 + int32_t batch = nelements >> 4; + int32_t remainder = nelements & 0x03; - while (1) { + for(int32_t i = 0; i < batch; ++i) { uint8_t flags = input[ipos++]; // Decode dd1 uint64_t dd1 = 0; - nbytes = flags & INT8MASK(4); + nbytes = flags & INT8MASK(4); // range of nbytes starts from 0 to 7 + // __m128i mask = {};//[0], [] if (nbytes == 0) { deltaOfDelta = 0; @@ -289,10 +289,6 @@ int32_t tsDecompressTimestampAvx2(const char* const input, const int32_t nelemen prevValue += prevDelta; ostream[opos++] = prevValue; - if (opos == nelements) { - return nelements * longBytes; - } - // Decode dd2 uint64_t dd2 = 0; nbytes = (flags >> 4) & INT8MASK(4); @@ -317,6 +313,11 @@ int32_t tsDecompressTimestampAvx2(const char* const input, const int32_t nelemen return nelements * longBytes; } } + + if (remainder > 0) { + + } + #endif return 0; } \ No newline at end of file From 3deab3cfbfdfde63bf550f46bbf8d948e6c4c10f Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 24 Nov 2023 15:37:30 +0800 Subject: [PATCH 02/20] fix(stream):set failure for the related fill-history task. --- source/libs/stream/src/streamStart.c | 13 ++++++++++++- source/util/src/tdecompress.c | 2 +- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index 469813defc..ce18e770b1 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -471,6 +471,14 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs addIntoNodeUpdateList(pTask, pRsp->downstreamNodeId); streamMetaUpdateTaskDownstreamStatus(pTask, pTask->execInfo.init, taosGetTimestampMs(), false); + // automatically set the related fill-history task to be failed. + if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { + STaskId* pId = &pTask->hTaskInfo.id; + + SStreamTask* pHTask = streamMetaAcquireTask(pTask->pMeta, pId->streamId, pId->taskId); + streamMetaUpdateTaskDownstreamStatus(pHTask, pHTask->execInfo.init, taosGetTimestampMs(), false); + streamMetaReleaseTask(pTask->pMeta, pHTask); + } } else { // TASK_DOWNSTREAM_NOT_READY, let's retry in 100ms STaskRecheckInfo* pInfo = createRecheckInfo(pTask, pRsp); @@ -1072,8 +1080,9 @@ int32_t streamMetaUpdateTaskDownstreamStatus(SStreamTask* pTask, int64_t startTs taosHashPut(pDst, &id, sizeof(id), &initTs, sizeof(STaskInitTs)); int32_t numOfTotal = streamMetaGetNumOfTasks(pMeta); + int32_t numOfRecv = taosHashGetSize(pStartInfo->pReadyTaskSet) + taosHashGetSize(pStartInfo->pFailedTaskSet); - if (taosHashGetSize(pStartInfo->pReadyTaskSet) + taosHashGetSize(pStartInfo->pFailedTaskSet) == numOfTotal) { + if (numOfRecv == numOfTotal) { pStartInfo->readyTs = taosGetTimestampMs(); pStartInfo->elapsedTime = (pStartInfo->startTs != 0) ? pStartInfo->readyTs - pStartInfo->startTs : 0; @@ -1087,6 +1096,8 @@ int32_t streamMetaUpdateTaskDownstreamStatus(SStreamTask* pTask, int64_t startTs displayStatusInfo(pMeta, pStartInfo->pFailedTaskSet, false); streamMetaResetStartInfo(pStartInfo); + } else { + stDebug("vgId:%d recv check down results:%d, total:%d", pMeta->vgId, numOfRecv, numOfTotal); } streamMetaWUnLock(pMeta); diff --git a/source/util/src/tdecompress.c b/source/util/src/tdecompress.c index 19f4e1fd92..ac9d70b2e7 100644 --- a/source/util/src/tdecompress.c +++ b/source/util/src/tdecompress.c @@ -259,7 +259,7 @@ int32_t tsDecompressTimestampAvx2(const char* const input, const int32_t nelemen int32_t longBytes = LONG_BYTES; // _mm_maskz_loadu_epi8 -#if __AVX2__ +#if __AVX512F__ // _mm_blendv_epi8 int32_t batch = nelements >> 4; From 788194c74890585fbbf3dcdbc83a93337e6b3db0 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 24 Nov 2023 22:53:55 +0800 Subject: [PATCH 03/20] fix(stream): fix deadlock. --- source/dnode/mnode/impl/inc/mndStream.h | 2 +- source/dnode/mnode/impl/src/mndStream.c | 8 ++-- source/dnode/mnode/impl/src/mndStreamTrans.c | 41 ++++++++++++-------- 3 files changed, 30 insertions(+), 21 deletions(-) diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index 244a6d08dd..f2b3ce6cac 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -52,7 +52,7 @@ int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb); int32_t mndPersistStream(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); int32_t mndStreamRegisterTrans(STrans* pTrans, const char* pName, const char* pSrcDb, const char* pDstDb); -bool streamTransConflictOtherTrans(SMnode *pMnode, const char *pSrcDb, const char *pDstDb); +bool streamTransConflictOtherTrans(SMnode *pMnode, const char *pSrcDb, const char *pDstDb, bool lock); // for sma // TODO refactor diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 5490d53587..81cb4703e1 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1348,7 +1348,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { } // check if it is conflict with other trans in both sourceDb and targetDb. - bool conflict = streamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb); + bool conflict = streamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb, true); if (conflict) { sdbRelease(pMnode->pSdb, pStream); tFreeMDropStreamReq(&dropReq); @@ -1831,7 +1831,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { } // check if it is conflict with other trans in both sourceDb and targetDb. - bool conflict = streamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb); + bool conflict = streamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb, true); if (conflict) { sdbRelease(pMnode->pSdb, pStream); return -1; @@ -1966,7 +1966,7 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) { } // check if it is conflict with other trans in both sourceDb and targetDb. - bool conflict = streamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb); + bool conflict = streamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb, true); if (conflict) { sdbRelease(pMnode->pSdb, pStream); return -1; @@ -2754,7 +2754,7 @@ static int32_t mndResetStatusFromCheckpoint(SMnode *pMnode, int32_t transId) { break; } - bool conflict = streamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb); + bool conflict = streamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb, false); if (conflict) { mError("stream:%s other trans exists in DB:%s & %s failed to start reset-status trans", pStream->name, pStream->sourceDb, pStream->targetDb); diff --git a/source/dnode/mnode/impl/src/mndStreamTrans.c b/source/dnode/mnode/impl/src/mndStreamTrans.c index 2345de290a..9dd9f64037 100644 --- a/source/dnode/mnode/impl/src/mndStreamTrans.c +++ b/source/dnode/mnode/impl/src/mndStreamTrans.c @@ -35,17 +35,15 @@ int32_t mndStreamRegisterTrans(STrans* pTrans, const char* pName, const char* pS } int32_t clearFinishedTrans(SMnode* pMnode) { - SArray* pList = taosArrayInit(4, sizeof(SKeyInfo)); size_t keyLen = 0; + SArray* pList = taosArrayInit(4, sizeof(SKeyInfo)); + void* pIter = NULL; - taosThreadMutexLock(&execInfo.lock); - - void* pIter = NULL; while ((pIter = taosHashIterate(execInfo.transMgmt.pDBTrans, pIter)) != NULL) { - SStreamTransInfo *pEntry = (SStreamTransInfo *)pIter; - STrans* pTrans = mndAcquireTrans(pMnode, pEntry->transId); + SStreamTransInfo* pEntry = (SStreamTransInfo*)pIter; // let's clear the finished trans + STrans* pTrans = mndAcquireTrans(pMnode, pEntry->transId); if (pTrans == NULL) { void* pKey = taosHashGetKey(pEntry, &keyLen); // key is the name of src/dst db name @@ -60,44 +58,55 @@ int32_t clearFinishedTrans(SMnode* pMnode) { } size_t num = taosArrayGetSize(pList); - for(int32_t i = 0; i < num; ++i) { + for (int32_t i = 0; i < num; ++i) { SKeyInfo* pKey = taosArrayGet(pList, i); taosHashRemove(execInfo.transMgmt.pDBTrans, pKey->pKey, pKey->keyLen); } - mDebug("clear %d finished stream-trans, remained:%d", (int32_t) num, taosHashGetSize(execInfo.transMgmt.pDBTrans)); - taosThreadMutexUnlock(&execInfo.lock); + mDebug("clear %d finished stream-trans, remained:%d", (int32_t)num, taosHashGetSize(execInfo.transMgmt.pDBTrans)); terrno = TSDB_CODE_SUCCESS; taosArrayDestroy(pList); return 0; } -bool streamTransConflictOtherTrans(SMnode* pMnode, const char* pSrcDb, const char* pDstDb) { - clearFinishedTrans(pMnode); +bool streamTransConflictOtherTrans(SMnode* pMnode, const char* pSrcDb, const char* pDstDb, bool lock) { + if (lock) { + taosThreadMutexLock(&execInfo.lock); + } - taosThreadMutexLock(&execInfo.lock); int32_t num = taosHashGetSize(execInfo.transMgmt.pDBTrans); if (num <= 0) { - taosThreadMutexUnlock(&execInfo.lock); + if (lock) { + taosThreadMutexUnlock(&execInfo.lock); + } return false; } + clearFinishedTrans(pMnode); + SStreamTransInfo *pEntry = taosHashGet(execInfo.transMgmt.pDBTrans, pSrcDb, strlen(pSrcDb)); if (pEntry != NULL) { - taosThreadMutexUnlock(&execInfo.lock); + if (lock) { + taosThreadMutexUnlock(&execInfo.lock); + } mWarn("conflict with other transId:%d in Db:%s, trans:%s", pEntry->transId, pSrcDb, pEntry->name); return true; } pEntry = taosHashGet(execInfo.transMgmt.pDBTrans, pDstDb, strlen(pDstDb)); if (pEntry != NULL) { - taosThreadMutexUnlock(&execInfo.lock); + if (lock) { + taosThreadMutexUnlock(&execInfo.lock); + } mWarn("conflict with other transId:%d in Db:%s, trans:%s", pEntry->transId, pSrcDb, pEntry->name); return true; } - taosThreadMutexUnlock(&execInfo.lock); + if (lock) { + taosThreadMutexUnlock(&execInfo.lock); + } + return false; } From 30c776cc0a2c82a2aafeb3f865caf8ffd575d310 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 25 Nov 2023 00:54:32 +0800 Subject: [PATCH 04/20] fix(stream): reset the scheduler status. --- source/dnode/vnode/src/tq/tq.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 9d16402ee6..821adc9953 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -2041,6 +2041,8 @@ int32_t tqProcessTaskDropHTask(STQ* pTq, SRpcMsg* pMsg) { SStreamTaskId id = {.streamId = pTask->hTaskInfo.id.streamId, .taskId = pTask->hTaskInfo.id.taskId}; streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &id); + // clear the scheduler status + streamTaskSetSchedStatusInactive(pTask); streamMetaReleaseTask(pMeta, pTask); return TSDB_CODE_SUCCESS; } From 9194c0c0cd929f1d78cddb7fbd532170d0756292 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 25 Nov 2023 00:56:18 +0800 Subject: [PATCH 05/20] fix(stream): add logs. --- source/dnode/vnode/src/tq/tq.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 821adc9953..de8305e6d9 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -2043,6 +2043,7 @@ int32_t tqProcessTaskDropHTask(STQ* pTq, SRpcMsg* pMsg) { // clear the scheduler status streamTaskSetSchedStatusInactive(pTask); + tqDebug("s-task:%s set scheduler status:%d after drop fill-history task", pTask->id.idStr, pTask->status.schedStatus); streamMetaReleaseTask(pMeta, pTask); return TSDB_CODE_SUCCESS; } From 1dd5cd17a04a4785026e48291c8088d1431850cb Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 26 Nov 2023 23:53:05 +0800 Subject: [PATCH 06/20] fix(tsdb): add simd for decompress timestamp. --- cmake/cmake.define | 4 +- include/util/tcompression.h | 1 + source/util/src/tdecompress.c | 148 ++++++++++++++++++++++------------ 3 files changed, 100 insertions(+), 53 deletions(-) diff --git a/cmake/cmake.define b/cmake/cmake.define index 7710c071eb..3eb872cfee 100644 --- a/cmake/cmake.define +++ b/cmake/cmake.define @@ -181,8 +181,8 @@ ELSE () MESSAGE(STATUS "SIMD instructions (FMA/AVX/AVX2) is ACTIVATED") IF (COMPILER_SUPPORT_AVX512F AND COMPILER_SUPPORT_AVX512BMI) - SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -mavx512f -mavx512vbmi") - SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -mavx512f -mavx512vbmi") + SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -mavx512f -mavx512vbmi -mavx512vl") + SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -mavx512f -mavx512vbmi -mavx512vl") MESSAGE(STATUS "avx512 supported by gcc") ENDIF() ENDIF() diff --git a/include/util/tcompression.h b/include/util/tcompression.h index ab0c22fc9b..7da3587f1c 100644 --- a/include/util/tcompression.h +++ b/include/util/tcompression.h @@ -139,6 +139,7 @@ int32_t getWordLength(char type); int32_t tsDecompressIntImpl_Hw(const char *const input, const int32_t nelements, char *const output, const char type); int32_t tsDecompressFloatImplAvx512(const char *const input, const int32_t nelements, char *const output); int32_t tsDecompressFloatImplAvx2(const char *const input, const int32_t nelements, char *const output); +int32_t tsDecompressTimestampAvx512(const char* const input, const int32_t nelements, char *const output, bool bigEndian); /************************************************************************* * STREAM COMPRESSION diff --git a/source/util/src/tdecompress.c b/source/util/src/tdecompress.c index ac9d70b2e7..b1c6df95c4 100644 --- a/source/util/src/tdecompress.c +++ b/source/util/src/tdecompress.c @@ -247,75 +247,121 @@ int32_t tsDecompressFloatImplAvx2(const char *const input, const int32_t nelemen return 0; } -int32_t tsDecompressTimestampAvx2(const char* const input, const int32_t nelements, char *const output, bool bigEndian) { +int32_t tsDecompressTimestampAvx512(const char* const input, const int32_t nelements, char *const output, bool bigEndian) { int64_t *ostream = (int64_t *)output; int32_t ipos = 1, opos = 0; - int8_t nbytes = 0; - - int64_t prevValue = 0; - int64_t prevDelta = 0; - - int64_t deltaOfDelta = 0; - int32_t longBytes = LONG_BYTES; + __m128i prevVal = _mm_setzero_si128(); + __m128i prevDelta = _mm_setzero_si128(); // _mm_maskz_loadu_epi8 #if __AVX512F__ - // _mm_blendv_epi8 - int32_t batch = nelements >> 4; - int32_t remainder = nelements & 0x03; + int32_t batch = nelements >> 1; + int32_t remainder = nelements & 0x01; - for(int32_t i = 0; i < batch; ++i) { + int32_t i = 0; + if (batch > 1) { + // first loop uint8_t flags = input[ipos++]; - // Decode dd1 - uint64_t dd1 = 0; - nbytes = flags & INT8MASK(4); // range of nbytes starts from 0 to 7 - // __m128i mask = {};//[0], [] + int8_t nbytes1 = flags & INT8MASK(4); // range of nbytes starts from 0 to 7 + int8_t nbytes2 = (flags >> 4) & INT8MASK(4); - if (nbytes == 0) { - deltaOfDelta = 0; - } else { - if (bigEndian) { - memcpy(((char *)(&dd1)) + longBytes - nbytes, input + ipos, nbytes); - } else { - memcpy(&dd1, input + ipos, nbytes); - } - deltaOfDelta = ZIGZAG_DECODE(int64_t, dd1); - } + __mmask16 mask2[16] = {0, 0x0001, 0x0003, 0x0007, 0x000f, 0x001f, 0x003f, 0x007f, 0x00ff}; + __m128i data1 = _mm_maskz_loadu_epi8(mask2[nbytes1], (const void*)(input + ipos)); + __m128i data2 = _mm_maskz_loadu_epi8(mask2[nbytes2], (const void*)(input + ipos + nbytes1)); + data2 = _mm_broadcastq_epi64(data2); - ipos += nbytes; - prevDelta += deltaOfDelta; - prevValue += prevDelta; - ostream[opos++] = prevValue; + __m128i zzVal = _mm_blend_epi32(data2, data1, 0x03); - // Decode dd2 - uint64_t dd2 = 0; - nbytes = (flags >> 4) & INT8MASK(4); - if (nbytes == 0) { - deltaOfDelta = 0; - } else { - if (bigEndian) { - memcpy(((char *)(&dd2)) + longBytes - nbytes, input + ipos, nbytes); - } else { - memcpy(&dd2, input + ipos, nbytes); - } - // zigzag_decoding - deltaOfDelta = ZIGZAG_DECODE(int64_t, dd2); - } + // ZIGZAG_DECODE(T, v) (((v) >> 1) ^ -((T)((v)&1))) + __m128i signmask = _mm_and_si128(_mm_set1_epi64x(1), zzVal); + signmask = _mm_sub_epi64(_mm_setzero_si128(), signmask); - ipos += nbytes; - prevDelta += deltaOfDelta; - prevValue += prevDelta; - ostream[opos++] = prevValue; + // get two zigzag values here + __m128i deltaOfDelta = _mm_xor_si128(_mm_srli_epi64(zzVal, 1), signmask); - if (opos == nelements) { - return nelements * longBytes; - } + __m128i deltaCurrent = _mm_add_epi64(deltaOfDelta, prevDelta); + deltaCurrent = _mm_add_epi64(_mm_slli_si128(deltaCurrent, 8), deltaCurrent); + + __m128i val = _mm_add_epi64(deltaCurrent, prevVal); + _mm_storeu_si128((__m128i *)&ostream[opos], val); + + // keep the previous value + prevVal = _mm_set1_epi64x(val[1]); + + // keep the previous delta of delta, for the first item + prevDelta = _mm_set1_epi64x(deltaOfDelta[1]); + + opos += 2; + ipos += nbytes1 + nbytes2; + i += 1; + } + + // the remain + for(; i < batch; ++i) { + uint8_t flags = input[ipos++]; + + int8_t nbytes1 = flags & INT8MASK(4); // range of nbytes starts from 0 to 7 + int8_t nbytes2 = (flags >> 4) & INT8MASK(4); + + __mmask16 mask2[16] = {0, 0x0001, 0x0003, 0x0007, 0x000f, 0x001f, 0x003f, 0x007f, 0x00ff}; + __m128i data1 = _mm_maskz_loadu_epi8(mask2[nbytes1], (const void*)(input + ipos)); + __m128i data2 = _mm_maskz_loadu_epi8(mask2[nbytes2], (const void*)(input + ipos + nbytes1)); + data2 = _mm_broadcastq_epi64(data2); + + __m128i zzVal = _mm_blend_epi32(data2, data1, 0x03); + + // ZIGZAG_DECODE(T, v) (((v) >> 1) ^ -((T)((v)&1))) + __m128i signmask = _mm_and_si128(_mm_set1_epi64x(1), zzVal); + signmask = _mm_sub_epi64(_mm_setzero_si128(), signmask); + + // get two zigzag values here + __m128i deltaOfDelta = _mm_xor_si128(_mm_srli_epi64(zzVal, 1), signmask); + + __m128i deltaCurrent = _mm_add_epi64(deltaOfDelta, prevDelta); + deltaCurrent = _mm_add_epi64(_mm_slli_si128(deltaCurrent, 8), deltaCurrent); + + __m128i val = _mm_add_epi64(deltaCurrent, prevVal); + _mm_storeu_si128((__m128i *)&ostream[opos], val); + + // keep the previous value + prevVal = _mm_set1_epi64x(val[1]); + + // keep the previous delta of delta + __m128i delta = _mm_add_epi64(_mm_slli_si128(deltaOfDelta, 8), deltaOfDelta); + prevDelta = _mm_set1_epi64x(_mm_add_epi64(delta, prevDelta)[1]); + + opos += 2; + ipos += nbytes1 + nbytes2; } if (remainder > 0) { + uint64_t dd = 0; + uint8_t flags = input[ipos++]; + int32_t nbytes = flags & INT8MASK(4); + int64_t deltaOfDelta = 0; + if (nbytes == 0) { + deltaOfDelta = 0; + } else { + // if (is_bigendian()) { + // memcpy(((char *)(&dd1)) + longBytes - nbytes, input + ipos, nbytes); + // } else { + memcpy(&dd, input + ipos, nbytes); + // } + deltaOfDelta = ZIGZAG_DECODE(int64_t, dd); + } + + ipos += nbytes; + if (opos == 0) { + ostream[opos++] = deltaOfDelta; + } else { + int64_t prevV = prevVal[1]; + + int64_t prevDeltaX = deltaOfDelta + prevDelta[1]; + ostream[opos++] = prevV + prevDeltaX; + } } #endif From de0c20c296ae78e4e16ee2a869e1524614e9c0a2 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 27 Nov 2023 00:02:52 +0800 Subject: [PATCH 07/20] fix(test): add test case. --- source/util/test/decompressTest.cpp | 94 +++++++++++++++++++++++++++++ 1 file changed, 94 insertions(+) create mode 100644 source/util/test/decompressTest.cpp diff --git a/source/util/test/decompressTest.cpp b/source/util/test/decompressTest.cpp new file mode 100644 index 0000000000..0c4f660002 --- /dev/null +++ b/source/util/test/decompressTest.cpp @@ -0,0 +1,94 @@ +#include +#include +#include +#include + +namespace {} // namespace + +TEST(utilTest, decompress_test) { + int64_t tsList[10] = {1700000000, 1700000100, 1700000200, 1700000300, 1700000400, + 1700000500, 1700000600, 1700000700, 1700000800, 1700000900}; + + char* pOutput[10 * sizeof(int64_t)] = {0}; + int32_t len = tsCompressTimestamp(tsList, sizeof(tsList), sizeof(tsList) / sizeof(tsList[0]), pOutput, 10, ONE_STAGE_COMP, NULL, 0); + + char* decompOutput[10 * 8] = {0}; + tsDecompressTimestamp(pOutput, len, 10, decompOutput, sizeof(int64_t)*10, ONE_STAGE_COMP, NULL, 0); + + for(int32_t i = 0; i < 10; ++i) { + std::cout<< ((int64_t*)decompOutput)[i] << std::endl; + } + + memset(decompOutput, 0, 10*8); + tsDecompressTimestampAvx512(reinterpret_cast(pOutput), 10, + reinterpret_cast(decompOutput), false); + + for(int32_t i = 0; i < 10; ++i) { + std::cout<<((int64_t*)decompOutput)[i] << std::endl; + } + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + int64_t tsList1[7] = {1700000000, 1700000000, 1700000000, 1700000000, 1700000000, 1700000000, 1700000900}; + int32_t len1 = tsCompressTimestamp(tsList1, sizeof(tsList1), sizeof(tsList1) / sizeof(tsList1[0]), pOutput, 7, ONE_STAGE_COMP, NULL, 0); + + memset(decompOutput, 0, 10*8); + tsDecompressTimestampAvx512(reinterpret_cast(pOutput), 7, + reinterpret_cast(decompOutput), false); + + for(int32_t i = 0; i < 7; ++i) { + std::cout<<((int64_t*)decompOutput)[i] << std::endl; + } + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + int64_t tsList2[1] = {1700000000}; + int32_t len2 = tsCompressTimestamp(tsList2, sizeof(tsList2), sizeof(tsList2) / sizeof(tsList2[0]), pOutput, 1, ONE_STAGE_COMP, NULL, 0); + + memset(decompOutput, 0, 10*8); + tsDecompressTimestampAvx512(reinterpret_cast(pOutput), 1, + reinterpret_cast(decompOutput), false); + + for(int32_t i = 0; i < 1; ++i) { + std::cout<<((int64_t*)decompOutput)[i] << std::endl; + } +} + +TEST(utilTest, decompress_perf_test) { + int32_t num = 100000; + + int64_t* pList = static_cast(taosMemoryCalloc(num, sizeof(int64_t))); + int64_t iniVal = 1700000000; + + uint32_t v = 100; + + for(int32_t i = 0; i < num; ++i) { + iniVal += taosRandR(&v)%10; + pList[i] = iniVal; + } + + char* px = static_cast(taosMemoryMalloc(num * sizeof(int64_t))); + int32_t len = tsCompressTimestamp(pList, num * sizeof(int64_t), num, px, num, ONE_STAGE_COMP, NULL, 0); + + char* pOutput = static_cast(taosMemoryMalloc(num * sizeof(int64_t))); + + int64_t st = taosGetTimestampUs(); + for(int32_t k = 0; k < 10; ++k) { + tsDecompressTimestamp(px, len, num, pOutput, sizeof(int64_t) * num, ONE_STAGE_COMP, NULL, 0); + } + + int64_t el1 = taosGetTimestampUs() - st; + std::cout << "decompress elapsed time:" << el1 << " us" << std::endl; + + memset(pOutput, 0, num * sizeof(int64_t)); + st = taosGetTimestampUs(); + for(int32_t k = 0; k < 10; ++k) { + tsDecompressTimestampAvx512(px, num, pOutput, false); + } + + int64_t el2 = taosGetTimestampUs() - st; + std::cout << "SIMD decompress elapsed time:" << el2 << " us" << std::endl; + + taosMemoryFree(pList); + taosMemoryFree(pOutput); + taosMemoryFree(px); +} + From 6ab14d39bbf63619e3c92eba8366acb5c20713c0 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 27 Nov 2023 09:32:47 +0800 Subject: [PATCH 08/20] fix(test): do some internal refactor. --- cmake/cmake.define | 19 +++++++++++++------ source/util/src/tdecompress.c | 11 ++++------- source/util/test/decompressTest.cpp | 6 +++--- 3 files changed, 20 insertions(+), 16 deletions(-) diff --git a/cmake/cmake.define b/cmake/cmake.define index 3eb872cfee..7cc33b8252 100644 --- a/cmake/cmake.define +++ b/cmake/cmake.define @@ -1,5 +1,5 @@ cmake_minimum_required(VERSION 3.0) -set(CMAKE_VERBOSE_MAKEFILE FALSE) +set(CMAKE_VERBOSE_MAKEFILE TRUE) set(TD_BUILD_TAOSA_INTERNAL FALSE) #set output directory @@ -159,6 +159,7 @@ ELSE () CHECK_C_COMPILER_FLAG("-mavx2" COMPILER_SUPPORT_AVX2) CHECK_C_COMPILER_FLAG("-mavx512f" COMPILER_SUPPORT_AVX512F) CHECK_C_COMPILER_FLAG("-mavx512vbmi" COMPILER_SUPPORT_AVX512BMI) + CHECK_C_COMPILER_FLAG("-mavx512vl" COMPILER_SUPPORT_AVX512VL) IF (COMPILER_SUPPORT_SSE42) SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -msse4.2") @@ -166,11 +167,11 @@ ELSE () ENDIF() IF ("${SIMD_SUPPORT}" MATCHES "true") - IF (COMPILER_SUPPORT_FMA) + IF (COMPILER_SUPPORT_FMA) SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -mfma") SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -mfma") - ENDIF() - IF (COMPILER_SUPPORT_AVX) + ENDIF() + IF (COMPILER_SUPPORT_AVX) SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -mavx") SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -mavx") ENDIF() @@ -182,8 +183,14 @@ ELSE () IF (COMPILER_SUPPORT_AVX512F AND COMPILER_SUPPORT_AVX512BMI) SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -mavx512f -mavx512vbmi -mavx512vl") - SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -mavx512f -mavx512vbmi -mavx512vl") - MESSAGE(STATUS "avx512 supported by gcc") + SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -mavx512f -mavx512vbmi -mavx512vl") + MESSAGE(STATUS "avx512f/avx512bmi supported by gcc") + ENDIF() + + IF (COMPILER_SUPPORT_AVX512VL) + SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -mavx512vl") + SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -mavx512vl") + MESSAGE(STATUS "avx512vl supported by gcc") ENDIF() ENDIF() diff --git a/source/util/src/tdecompress.c b/source/util/src/tdecompress.c index b1c6df95c4..b39a340ac6 100644 --- a/source/util/src/tdecompress.c +++ b/source/util/src/tdecompress.c @@ -111,7 +111,7 @@ int32_t tsDecompressIntImpl_Hw(const char *const input, const int32_t nelements, __m256i signmask = _mm256_and_si256(_mm256_set1_epi64x(1), zigzagVal); signmask = _mm256_sub_epi64(_mm256_setzero_si256(), signmask); - // get the four zigzag values here + // get four zigzag values here __m256i delta = _mm256_xor_si256(_mm256_srli_epi64(zigzagVal, 1), signmask); // calculate the cumulative sum (prefix sum) for each number @@ -254,10 +254,11 @@ int32_t tsDecompressTimestampAvx512(const char* const input, const int32_t nelem __m128i prevDelta = _mm_setzero_si128(); // _mm_maskz_loadu_epi8 -#if __AVX512F__ +#if __AVX512VL__ int32_t batch = nelements >> 1; int32_t remainder = nelements & 0x01; + __mmask16 mask2[16] = {0, 0x0001, 0x0003, 0x0007, 0x000f, 0x001f, 0x003f, 0x007f, 0x00ff}; int32_t i = 0; if (batch > 1) { @@ -267,7 +268,6 @@ int32_t tsDecompressTimestampAvx512(const char* const input, const int32_t nelem int8_t nbytes1 = flags & INT8MASK(4); // range of nbytes starts from 0 to 7 int8_t nbytes2 = (flags >> 4) & INT8MASK(4); - __mmask16 mask2[16] = {0, 0x0001, 0x0003, 0x0007, 0x000f, 0x001f, 0x003f, 0x007f, 0x00ff}; __m128i data1 = _mm_maskz_loadu_epi8(mask2[nbytes1], (const void*)(input + ipos)); __m128i data2 = _mm_maskz_loadu_epi8(mask2[nbytes2], (const void*)(input + ipos + nbytes1)); data2 = _mm_broadcastq_epi64(data2); @@ -305,7 +305,6 @@ int32_t tsDecompressTimestampAvx512(const char* const input, const int32_t nelem int8_t nbytes1 = flags & INT8MASK(4); // range of nbytes starts from 0 to 7 int8_t nbytes2 = (flags >> 4) & INT8MASK(4); - __mmask16 mask2[16] = {0, 0x0001, 0x0003, 0x0007, 0x000f, 0x001f, 0x003f, 0x007f, 0x00ff}; __m128i data1 = _mm_maskz_loadu_epi8(mask2[nbytes1], (const void*)(input + ipos)); __m128i data2 = _mm_maskz_loadu_epi8(mask2[nbytes2], (const void*)(input + ipos + nbytes1)); data2 = _mm_broadcastq_epi64(data2); @@ -357,10 +356,8 @@ int32_t tsDecompressTimestampAvx512(const char* const input, const int32_t nelem if (opos == 0) { ostream[opos++] = deltaOfDelta; } else { - int64_t prevV = prevVal[1]; - int64_t prevDeltaX = deltaOfDelta + prevDelta[1]; - ostream[opos++] = prevV + prevDeltaX; + ostream[opos++] = prevVal[1] + prevDeltaX; } } diff --git a/source/util/test/decompressTest.cpp b/source/util/test/decompressTest.cpp index 0c4f660002..378b67edbb 100644 --- a/source/util/test/decompressTest.cpp +++ b/source/util/test/decompressTest.cpp @@ -53,7 +53,7 @@ TEST(utilTest, decompress_test) { } TEST(utilTest, decompress_perf_test) { - int32_t num = 100000; + int32_t num = 10000; int64_t* pList = static_cast(taosMemoryCalloc(num, sizeof(int64_t))); int64_t iniVal = 1700000000; @@ -71,7 +71,7 @@ TEST(utilTest, decompress_perf_test) { char* pOutput = static_cast(taosMemoryMalloc(num * sizeof(int64_t))); int64_t st = taosGetTimestampUs(); - for(int32_t k = 0; k < 10; ++k) { + for(int32_t k = 0; k < 10000; ++k) { tsDecompressTimestamp(px, len, num, pOutput, sizeof(int64_t) * num, ONE_STAGE_COMP, NULL, 0); } @@ -80,7 +80,7 @@ TEST(utilTest, decompress_perf_test) { memset(pOutput, 0, num * sizeof(int64_t)); st = taosGetTimestampUs(); - for(int32_t k = 0; k < 10; ++k) { + for(int32_t k = 0; k < 10000; ++k) { tsDecompressTimestampAvx512(px, num, pOutput, false); } From 5de59a354787b2f941bdd626238ecbde17592cc8 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 27 Nov 2023 10:32:47 +0800 Subject: [PATCH 09/20] enh(sys): add check for avx512vl --- cmake/cmake.define | 4 +- source/util/src/tcompression.c | 101 +++++++++++++++++---------------- 2 files changed, 53 insertions(+), 52 deletions(-) diff --git a/cmake/cmake.define b/cmake/cmake.define index 7cc33b8252..c685ba6161 100644 --- a/cmake/cmake.define +++ b/cmake/cmake.define @@ -184,13 +184,13 @@ ELSE () IF (COMPILER_SUPPORT_AVX512F AND COMPILER_SUPPORT_AVX512BMI) SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -mavx512f -mavx512vbmi -mavx512vl") SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -mavx512f -mavx512vbmi -mavx512vl") - MESSAGE(STATUS "avx512f/avx512bmi supported by gcc") + MESSAGE(STATUS "avx512f/avx512bmi supported by compiler") ENDIF() IF (COMPILER_SUPPORT_AVX512VL) SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -mavx512vl") SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -mavx512vl") - MESSAGE(STATUS "avx512vl supported by gcc") + MESSAGE(STATUS "avx512vl supported by compiler") ENDIF() ENDIF() diff --git a/source/util/src/tcompression.c b/source/util/src/tcompression.c index 09f50d0f6d..8a79d62cda 100644 --- a/source/util/src/tcompression.c +++ b/source/util/src/tcompression.c @@ -538,65 +538,66 @@ int32_t tsDecompressTimestampImp(const char *const input, const int32_t nelement memcpy(output, input + 1, nelements * longBytes); return nelements * longBytes; } else if (input[0] == 1) { // Decompress - int64_t *ostream = (int64_t *)output; + if (tsSIMDEnable && tsAVX512Enable) { + tsDecompressTimestampAvx512(input, nelements, output, false); + } else { + int64_t *ostream = (int64_t *)output; - int32_t ipos = 1, opos = 0; - int8_t nbytes = 0; - int64_t prev_value = 0; - int64_t prev_delta = 0; - int64_t delta_of_delta = 0; + int32_t ipos = 1, opos = 0; + int8_t nbytes = 0; + int64_t prev_value = 0; + int64_t prev_delta = 0; + int64_t delta_of_delta = 0; - while (1) { - uint8_t flags = input[ipos++]; - // Decode dd1 - uint64_t dd1 = 0; - nbytes = flags & INT8MASK(4); - if (nbytes == 0) { - delta_of_delta = 0; - } else { - if (is_bigendian()) { - memcpy(((char *)(&dd1)) + longBytes - nbytes, input + ipos, nbytes); + while (1) { + uint8_t flags = input[ipos++]; + // Decode dd1 + uint64_t dd1 = 0; + nbytes = flags & INT8MASK(4); + if (nbytes == 0) { + delta_of_delta = 0; } else { - memcpy(&dd1, input + ipos, nbytes); + if (is_bigendian()) { + memcpy(((char *)(&dd1)) + longBytes - nbytes, input + ipos, nbytes); + } else { + memcpy(&dd1, input + ipos, nbytes); + } + delta_of_delta = ZIGZAG_DECODE(int64_t, dd1); } - delta_of_delta = ZIGZAG_DECODE(int64_t, dd1); - } - ipos += nbytes; - if (opos == 0) { - prev_value = delta_of_delta; - prev_delta = 0; - ostream[opos++] = delta_of_delta; - } else { + + ipos += nbytes; + if (opos == 0) { + prev_value = delta_of_delta; + prev_delta = 0; + ostream[opos++] = delta_of_delta; + } else { + prev_delta = delta_of_delta + prev_delta; + prev_value = prev_value + prev_delta; + ostream[opos++] = prev_value; + } + if (opos == nelements) return nelements * longBytes; + + // Decode dd2 + uint64_t dd2 = 0; + nbytes = (flags >> 4) & INT8MASK(4); + if (nbytes == 0) { + delta_of_delta = 0; + } else { + if (is_bigendian()) { + memcpy(((char *)(&dd2)) + longBytes - nbytes, input + ipos, nbytes); + } else { + memcpy(&dd2, input + ipos, nbytes); + } + // zigzag_decoding + delta_of_delta = ZIGZAG_DECODE(int64_t, dd2); + } + ipos += nbytes; prev_delta = delta_of_delta + prev_delta; prev_value = prev_value + prev_delta; ostream[opos++] = prev_value; + if (opos == nelements) return nelements * longBytes; } - if (opos == nelements) return nelements * longBytes; - - // Decode dd2 - uint64_t dd2 = 0; - nbytes = (flags >> 4) & INT8MASK(4); - if (nbytes == 0) { - delta_of_delta = 0; - } else { - if (is_bigendian()) { - memcpy(((char *)(&dd2)) + longBytes - nbytes, input + ipos, nbytes); - } else { - memcpy(&dd2, input + ipos, nbytes); - } - // zigzag_decoding - delta_of_delta = ZIGZAG_DECODE(int64_t, dd2); - } - ipos += nbytes; - prev_delta = delta_of_delta + prev_delta; - prev_value = prev_value + prev_delta; - ostream[opos++] = prev_value; - if (opos == nelements) return nelements * longBytes; } - - } else { - ASSERT(0); - return -1; } } From f9d717d0f2fdc7b0a9917a29831d20e6f021eabd Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 27 Nov 2023 14:42:13 +0800 Subject: [PATCH 10/20] refactor: do some internal refactor. --- cmake/cmake.define | 4 +- include/util/tcompression.h | 1 + source/util/src/tcompression.c | 4 + source/util/src/tdecompress.c | 165 ++++++++++++++++++++++++++-- source/util/test/decompressTest.cpp | 2 +- 5 files changed, 164 insertions(+), 12 deletions(-) diff --git a/cmake/cmake.define b/cmake/cmake.define index c685ba6161..7db6baafab 100644 --- a/cmake/cmake.define +++ b/cmake/cmake.define @@ -182,8 +182,8 @@ ELSE () MESSAGE(STATUS "SIMD instructions (FMA/AVX/AVX2) is ACTIVATED") IF (COMPILER_SUPPORT_AVX512F AND COMPILER_SUPPORT_AVX512BMI) - SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -mavx512f -mavx512vbmi -mavx512vl") - SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -mavx512f -mavx512vbmi -mavx512vl") + SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -mavx512f -mavx512vbmi") + SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -mavx512f -mavx512vbmi") MESSAGE(STATUS "avx512f/avx512bmi supported by compiler") ENDIF() diff --git a/include/util/tcompression.h b/include/util/tcompression.h index 7da3587f1c..75ddbb12e7 100644 --- a/include/util/tcompression.h +++ b/include/util/tcompression.h @@ -140,6 +140,7 @@ int32_t tsDecompressIntImpl_Hw(const char *const input, const int32_t nelements, int32_t tsDecompressFloatImplAvx512(const char *const input, const int32_t nelements, char *const output); int32_t tsDecompressFloatImplAvx2(const char *const input, const int32_t nelements, char *const output); int32_t tsDecompressTimestampAvx512(const char* const input, const int32_t nelements, char *const output, bool bigEndian); +int32_t tsDecompressTimestampAvx2(const char* const input, const int32_t nelements, char *const output, bool bigEndian); /************************************************************************* * STREAM COMPRESSION diff --git a/source/util/src/tcompression.c b/source/util/src/tcompression.c index 8a79d62cda..3cc00ddc7f 100644 --- a/source/util/src/tcompression.c +++ b/source/util/src/tcompression.c @@ -540,6 +540,8 @@ int32_t tsDecompressTimestampImp(const char *const input, const int32_t nelement } else if (input[0] == 1) { // Decompress if (tsSIMDEnable && tsAVX512Enable) { tsDecompressTimestampAvx512(input, nelements, output, false); + } else if (tsSIMDEnable && tsAVX2Enable) { + tsDecompressTimestampAvx2(input, nelements, output, false); } else { int64_t *ostream = (int64_t *)output; @@ -599,6 +601,8 @@ int32_t tsDecompressTimestampImp(const char *const input, const int32_t nelement } } } + + return nelements * longBytes; } /* --------------------------------------------Double Compression ---------------------------------------------- */ diff --git a/source/util/src/tdecompress.c b/source/util/src/tdecompress.c index b39a340ac6..5a5e60093c 100644 --- a/source/util/src/tdecompress.c +++ b/source/util/src/tdecompress.c @@ -247,15 +247,14 @@ int32_t tsDecompressFloatImplAvx2(const char *const input, const int32_t nelemen return 0; } -int32_t tsDecompressTimestampAvx512(const char* const input, const int32_t nelements, char *const output, bool bigEndian) { +int32_t tsDecompressTimestampAvx2(const char *const input, const int32_t nelements, char *const output, + bool bigEndian) { int64_t *ostream = (int64_t *)output; int32_t ipos = 1, opos = 0; __m128i prevVal = _mm_setzero_si128(); __m128i prevDelta = _mm_setzero_si128(); - // _mm_maskz_loadu_epi8 -#if __AVX512VL__ - +#if __AVX2__ int32_t batch = nelements >> 1; int32_t remainder = nelements & 0x01; __mmask16 mask2[16] = {0, 0x0001, 0x0003, 0x0007, 0x000f, 0x001f, 0x003f, 0x007f, 0x00ff}; @@ -268,10 +267,22 @@ int32_t tsDecompressTimestampAvx512(const char* const input, const int32_t nelem int8_t nbytes1 = flags & INT8MASK(4); // range of nbytes starts from 0 to 7 int8_t nbytes2 = (flags >> 4) & INT8MASK(4); - __m128i data1 = _mm_maskz_loadu_epi8(mask2[nbytes1], (const void*)(input + ipos)); - __m128i data2 = _mm_maskz_loadu_epi8(mask2[nbytes2], (const void*)(input + ipos + nbytes1)); - data2 = _mm_broadcastq_epi64(data2); + __m128i data1; + if (nbytes1 == 0) { + data1 = _mm_setzero_si128(); + } else { +// _mm_shuffle_epi8() + memcpy(&data1, (const void*) (input + ipos), nbytes1); + } + __m128i data2; + if (nbytes2 == 0) { + data2 = _mm_setzero_si128(); + } else { + memcpy(&data2, (const void*) (input + ipos + nbytes1), nbytes2); + } + + data2 = _mm_broadcastq_epi64(data2); __m128i zzVal = _mm_blend_epi32(data2, data1, 0x03); // ZIGZAG_DECODE(T, v) (((v) >> 1) ^ -((T)((v)&1))) @@ -305,8 +316,26 @@ int32_t tsDecompressTimestampAvx512(const char* const input, const int32_t nelem int8_t nbytes1 = flags & INT8MASK(4); // range of nbytes starts from 0 to 7 int8_t nbytes2 = (flags >> 4) & INT8MASK(4); - __m128i data1 = _mm_maskz_loadu_epi8(mask2[nbytes1], (const void*)(input + ipos)); - __m128i data2 = _mm_maskz_loadu_epi8(mask2[nbytes2], (const void*)(input + ipos + nbytes1)); +// __m128i data1 = _mm_maskz_loadu_epi8(mask2[nbytes1], (const void*)(input + ipos)); +// __m128i data2 = _mm_maskz_loadu_epi8(mask2[nbytes2], (const void*)(input + ipos + nbytes1)); + __m128i data1; + if (nbytes1 == 0) { + data1 = _mm_setzero_si128(); + } else { + int64_t dd = 0; + memcpy(&dd, (const void*) (input + ipos), nbytes1); + data1 = _mm_loadu_si64(&dd); + } + + __m128i data2; + if (nbytes2 == 0) { + data2 = _mm_setzero_si128(); + } else { + int64_t dd = 0; + memcpy(&dd, (const void*) (input + ipos + nbytes1), nbytes2); + data2 = _mm_loadu_si64(&dd); + } + data2 = _mm_broadcastq_epi64(data2); __m128i zzVal = _mm_blend_epi32(data2, data1, 0x03); @@ -335,6 +364,124 @@ int32_t tsDecompressTimestampAvx512(const char* const input, const int32_t nelem ipos += nbytes1 + nbytes2; } + if (remainder > 0) { + uint64_t dd = 0; + uint8_t flags = input[ipos++]; + + int32_t nbytes = flags & INT8MASK(4); + int64_t deltaOfDelta = 0; + if (nbytes == 0) { + deltaOfDelta = 0; + } else { + // if (is_bigendian()) { + // memcpy(((char *)(&dd1)) + longBytes - nbytes, input + ipos, nbytes); + // } else { + memcpy(&dd, input + ipos, nbytes); + // } + deltaOfDelta = ZIGZAG_DECODE(int64_t, dd); + } + + ipos += nbytes; + if (opos == 0) { + ostream[opos++] = deltaOfDelta; + } else { + int64_t prevDeltaX = deltaOfDelta + prevDelta[1]; + ostream[opos++] = prevVal[1] + prevDeltaX; + } + } +#endif + + return 0; +} + +int32_t tsDecompressTimestampAvx512(const char* const input, const int32_t nelements, char *const output, bool bigEndian) { + int64_t *ostream = (int64_t *)output; + int32_t ipos = 1, opos = 0; + __m128i prevVal = _mm_setzero_si128(); + __m128i prevDelta = _mm_setzero_si128(); + + // _mm_maskz_loadu_epi8 +#if __AVX512VL__ + + int32_t batch = nelements >> 1; + int32_t remainder = nelements & 0x01; + __mmask16 mask2[16] = {0, 0x0001, 0x0003, 0x0007, 0x000f, 0x001f, 0x003f, 0x007f, 0x00ff}; + + int32_t i = 0; + if (batch > 1) { + // first loop + uint8_t flags = input[ipos++]; + + int8_t nbytes1 = flags & INT8MASK(4); // range of nbytes starts from 0 to 7 + int8_t nbytes2 = (flags >> 4) & INT8MASK(4); + + __m128i data1 = _mm_maskz_loadu_epi8(mask2[nbytes1], (const void*)(input + ipos)); + __m128i data2 = _mm_maskz_loadu_epi8(mask2[nbytes2], (const void*)(input + ipos + nbytes1)); + data2 = _mm_broadcastq_epi64(data2); + + __m128i zzVal = _mm_blend_epi32(data2, data1, 0x03); + + // ZIGZAG_DECODE(T, v) (((v) >> 1) ^ -((T)((v)&1))) + __m128i signmask = _mm_and_si128(_mm_set1_epi64x(1), zzVal); + signmask = _mm_sub_epi64(_mm_setzero_si128(), signmask); + + // get two zigzag values here + __m128i deltaOfDelta = _mm_xor_si128(_mm_srli_epi64(zzVal, 1), signmask); + + __m128i deltaCurrent = _mm_add_epi64(deltaOfDelta, prevDelta); + deltaCurrent = _mm_add_epi64(_mm_slli_si128(deltaCurrent, 8), deltaCurrent); + + __m128i val = _mm_add_epi64(deltaCurrent, prevVal); + _mm_storeu_si128((__m128i *)&ostream[opos], val); + + // keep the previous value + prevVal = _mm_shuffle_epi32 (val, 0xEE); + + // keep the previous delta of delta, for the first item + prevDelta = _mm_shuffle_epi32(deltaOfDelta, 0xEE); + + opos += 2; + ipos += nbytes1 + nbytes2; + i += 1; + } + + // the remain + for(; i < batch; ++i) { + uint8_t flags = input[ipos++]; + + int8_t nbytes1 = flags & INT8MASK(4); // range of nbytes starts from 0 to 7 + int8_t nbytes2 = (flags >> 4) & INT8MASK(4); + + __m128i data1 = _mm_maskz_loadu_epi8(mask2[nbytes1], (const void*)(input + ipos)); + __m128i data2 = _mm_maskz_loadu_epi8(mask2[nbytes2], (const void*)(input + ipos + nbytes1)); + data2 = _mm_broadcastq_epi64(data2); + + __m128i zzVal = _mm_blend_epi32(data2, data1, 0x03); + + // ZIGZAG_DECODE(T, v) (((v) >> 1) ^ -((T)((v)&1))) + __m128i signmask = _mm_and_si128(_mm_set1_epi64x(1), zzVal); + signmask = _mm_sub_epi64(_mm_setzero_si128(), signmask); + + // get two zigzag values here + __m128i deltaOfDelta = _mm_xor_si128(_mm_srli_epi64(zzVal, 1), signmask); + + __m128i deltaCurrent = _mm_add_epi64(deltaOfDelta, prevDelta); + deltaCurrent = _mm_add_epi64(_mm_slli_si128(deltaCurrent, 8), deltaCurrent); + + __m128i val = _mm_add_epi64(deltaCurrent, prevVal); + _mm_storeu_si128((__m128i *)&ostream[opos], val); + + // keep the previous value + prevVal = _mm_shuffle_epi32 (val, 0xEE); + + // keep the previous delta of delta + __m128i delta = _mm_add_epi64(_mm_slli_si128(deltaOfDelta, 8), deltaOfDelta); + prevDelta = _mm_shuffle_epi32(_mm_add_epi64(delta, prevDelta), 0xEE); + + opos += 2; + ipos += nbytes1 + nbytes2; + } + if (remainder > 0) { uint64_t dd = 0; uint8_t flags = input[ipos++]; diff --git a/source/util/test/decompressTest.cpp b/source/util/test/decompressTest.cpp index 378b67edbb..caf8df3ba8 100644 --- a/source/util/test/decompressTest.cpp +++ b/source/util/test/decompressTest.cpp @@ -76,7 +76,7 @@ TEST(utilTest, decompress_perf_test) { } int64_t el1 = taosGetTimestampUs() - st; - std::cout << "decompress elapsed time:" << el1 << " us" << std::endl; + std::cout << "soft decompress elapsed time:" << el1 << " us" << std::endl; memset(pOutput, 0, num * sizeof(int64_t)); st = taosGetTimestampUs(); From d1193b5a7a29d3df3811d77e6bdd5d5ec84f8721 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 27 Nov 2023 14:59:12 +0800 Subject: [PATCH 11/20] refactor: opt simd perf. --- source/util/src/tdecompress.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/source/util/src/tdecompress.c b/source/util/src/tdecompress.c index 5a5e60093c..509afb6ed4 100644 --- a/source/util/src/tdecompress.c +++ b/source/util/src/tdecompress.c @@ -299,10 +299,10 @@ int32_t tsDecompressTimestampAvx2(const char *const input, const int32_t nelemen _mm_storeu_si128((__m128i *)&ostream[opos], val); // keep the previous value - prevVal = _mm_set1_epi64x(val[1]); + prevVal = _mm_shuffle_epi32 (val, 0xEE); // keep the previous delta of delta, for the first item - prevDelta = _mm_set1_epi64x(deltaOfDelta[1]); + prevDelta = _mm_shuffle_epi32(deltaOfDelta, 0xEE); opos += 2; ipos += nbytes1 + nbytes2; @@ -354,11 +354,11 @@ int32_t tsDecompressTimestampAvx2(const char *const input, const int32_t nelemen _mm_storeu_si128((__m128i *)&ostream[opos], val); // keep the previous value - prevVal = _mm_set1_epi64x(val[1]); + prevVal = _mm_shuffle_epi32 (val, 0xEE); // keep the previous delta of delta __m128i delta = _mm_add_epi64(_mm_slli_si128(deltaOfDelta, 8), deltaOfDelta); - prevDelta = _mm_set1_epi64x(_mm_add_epi64(delta, prevDelta)[1]); + prevDelta = _mm_shuffle_epi32(_mm_add_epi64(delta, prevDelta), 0xEE); opos += 2; ipos += nbytes1 + nbytes2; From 61111385676a910cc9b956fc111c7858da2322f3 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 27 Nov 2023 17:23:34 +0800 Subject: [PATCH 12/20] fix(stream): fix error. --- source/libs/stream/src/streamExec.c | 18 +++++++++--------- source/util/src/tdecompress.c | 10 ++++------ 2 files changed, 13 insertions(+), 15 deletions(-) diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 8b14846414..e0ee01d345 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -252,14 +252,15 @@ static void streamScanHistoryDataImpl(SStreamTask* pTask, SArray* pRes, int32_t* SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) { ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE); - void* exec = pTask->exec.pExecutor; - bool finished = false; + void* exec = pTask->exec.pExecutor; + bool finished = false; + const char* id = pTask->id.idStr; qSetStreamOpOpen(exec); while (1) { if (streamTaskShouldPause(pTask)) { - stDebug("s-task:%s paused from the scan-history task", pTask->id.idStr); + stDebug("s-task:%s paused from the scan-history task", id); // quit from step1, not continue to handle the step2 return (SScanhistoryDataInfo){TASK_SCANHISTORY_QUIT, 0}; } @@ -267,8 +268,7 @@ SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) { SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); if (pRes == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; - stError("s-task:%s scan-history prepare result block failed, code:%s, retry later", pTask->id.idStr, - tstrerror(terrno)); + stError("s-task:%s scan-history prepare result block failed, code:%s, retry later", id, tstrerror(terrno)); continue; } @@ -295,8 +295,8 @@ SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) { } if (el >= STREAM_SCAN_HISTORY_TIMESLICE) { - stDebug("s-task:%s fill-history:%d time slice exhausted, elapsed time:%.2fs, retry in 100ms", - pTask->id.idStr, pTask->info.fillHistory, el / 1000.0); + stDebug("s-task:%s fill-history:%d time slice exhausted, elapsed time:%.2fs, retry in 100ms", id, + pTask->info.fillHistory, el / 1000.0); return (SScanhistoryDataInfo){TASK_SCANHISTORY_REXEC, 100}; } } @@ -543,7 +543,7 @@ int32_t streamProcessTranstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock * todo: the batch of blocks should be tuned dynamic, according to the total elapsed time of each batch of blocks, the * appropriate batch of blocks should be handled in 5 to 10 sec. */ -int32_t streamExecForAll(SStreamTask* pTask) { +int32_t doStreamExecTask(SStreamTask* pTask) { const char* id = pTask->id.idStr; // merge multiple input data if possible in the input queue. @@ -654,7 +654,7 @@ int32_t streamExecTask(SStreamTask* pTask) { int8_t schedStatus = streamTaskSetSchedStatusActive(pTask); if (schedStatus == TASK_SCHED_STATUS__WAITING) { while (1) { - int32_t code = streamExecForAll(pTask); + int32_t code = doStreamExecTask(pTask); if (code < 0) { // todo this status should be removed atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__FAILED); return -1; diff --git a/source/util/src/tdecompress.c b/source/util/src/tdecompress.c index 509afb6ed4..bb22772f7d 100644 --- a/source/util/src/tdecompress.c +++ b/source/util/src/tdecompress.c @@ -271,7 +271,6 @@ int32_t tsDecompressTimestampAvx2(const char *const input, const int32_t nelemen if (nbytes1 == 0) { data1 = _mm_setzero_si128(); } else { -// _mm_shuffle_epi8() memcpy(&data1, (const void*) (input + ipos), nbytes1); } @@ -400,15 +399,14 @@ int32_t tsDecompressTimestampAvx512(const char* const input, const int32_t nelem __m128i prevVal = _mm_setzero_si128(); __m128i prevDelta = _mm_setzero_si128(); - // _mm_maskz_loadu_epi8 #if __AVX512VL__ - int32_t batch = nelements >> 1; - int32_t remainder = nelements & 0x01; + int32_t numOfBatch = nelements >> 1; + int32_t remainder = nelements & 0x01; __mmask16 mask2[16] = {0, 0x0001, 0x0003, 0x0007, 0x000f, 0x001f, 0x003f, 0x007f, 0x00ff}; int32_t i = 0; - if (batch > 1) { + if (numOfBatch > 1) { // first loop uint8_t flags = input[ipos++]; @@ -446,7 +444,7 @@ int32_t tsDecompressTimestampAvx512(const char* const input, const int32_t nelem } // the remain - for(; i < batch; ++i) { + for(; i < numOfBatch; ++i) { uint8_t flags = input[ipos++]; int8_t nbytes1 = flags & INT8MASK(4); // range of nbytes starts from 0 to 7 From c5aba6089951ebd69af31ec09a7191c34ea5a6c9 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 28 Nov 2023 09:28:43 +0800 Subject: [PATCH 13/20] fix(stream): disable the time slice usage limitation for stream task. --- source/libs/stream/src/streamExec.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index e0ee01d345..8ab8f3852e 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -281,12 +281,12 @@ SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) { } // dispatch the generated results - int32_t code = handleResultBlocks(pTask, pRes, size); + /*int32_t code = */handleResultBlocks(pTask, pRes, size); int64_t el = taosGetTimestampMs() - st; // downstream task input queue is full, try in 5sec - if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) { + if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED && (pTask->info.fillHistory == 1)) { return (SScanhistoryDataInfo){TASK_SCANHISTORY_REXEC, 5000}; } @@ -294,7 +294,7 @@ SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) { return (SScanhistoryDataInfo){TASK_SCANHISTORY_CONT, 0}; } - if (el >= STREAM_SCAN_HISTORY_TIMESLICE) { + if (el >= STREAM_SCAN_HISTORY_TIMESLICE && (pTask->info.fillHistory == 1)) { stDebug("s-task:%s fill-history:%d time slice exhausted, elapsed time:%.2fs, retry in 100ms", id, pTask->info.fillHistory, el / 1000.0); return (SScanhistoryDataInfo){TASK_SCANHISTORY_REXEC, 100}; From 9649db9003829a45a65b8dba14e3c63544692552 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 27 Nov 2023 18:49:22 +0800 Subject: [PATCH 14/20] fix(stream): fix invalid free. --- source/dnode/mnode/impl/src/mndStream.c | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 81cb4703e1..17c8dfe9cb 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -755,13 +755,14 @@ static int32_t checkForNumOfStreams(SMnode *pMnode, SStreamObj *pStreamObj) { / } static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { - SMnode * pMnode = pReq->info.node; - int32_t code = -1; - SStreamObj * pStream = NULL; - SDbObj * pDb = NULL; - SCMCreateStreamReq createStreamReq = {0}; - SStreamObj streamObj = {0}; + SMnode *pMnode = pReq->info.node; + int32_t code = -1; + SStreamObj *pStream = NULL; + SStreamObj streamObj = {0}; + char *sql = NULL; + int32_t sqlLen = 0; + SCMCreateStreamReq createStreamReq = {0}; if (tDeserializeSCMCreateStreamReq(pReq->pCont, pReq->contLen, &createStreamReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; goto _OVER; @@ -792,8 +793,6 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { goto _OVER; } - char* sql = NULL; - int32_t sqlLen = 0; if(createStreamReq.sql != NULL){ sqlLen = strlen(createStreamReq.sql); sql = taosMemoryMalloc(sqlLen + 1); From 6bb30822c2711315dbb480037db337e8a06a62af Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 27 Nov 2023 18:51:30 +0800 Subject: [PATCH 15/20] refactor: do some internal refactor. --- source/dnode/mnode/impl/src/mndStream.c | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 17c8dfe9cb..feb0c3e52a 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -793,7 +793,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { goto _OVER; } - if(createStreamReq.sql != NULL){ + if (createStreamReq.sql != NULL) { sqlLen = strlen(createStreamReq.sql); sql = taosMemoryMalloc(sqlLen + 1); memset(sql, 0, sqlLen + 1); @@ -880,14 +880,13 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { // reuse this function for stream if (sql != NULL && sqlLen > 0) { - auditRecord(pReq, pMnode->clusterId, "createStream", dbname.dbname, name.dbname, sql, - sqlLen); - } - else{ + auditRecord(pReq, pMnode->clusterId, "createStream", dbname.dbname, name.dbname, sql, sqlLen); + } else { char detail[1000] = {0}; sprintf(detail, "dbname:%s, stream name:%s", dbname.dbname, name.dbname); auditRecord(pReq, pMnode->clusterId, "createStream", dbname.dbname, name.dbname, detail, strlen(detail)); } + _OVER: if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { mError("stream:%s, failed to create since %s", createStreamReq.name, terrstr()); @@ -896,7 +895,7 @@ _OVER: mndReleaseStream(pMnode, pStream); tFreeSCMCreateStreamReq(&createStreamReq); tFreeStreamObj(&streamObj); - if(sql != NULL){ + if (sql != NULL) { taosMemoryFreeClear(sql); } return code; From 2eec29861c6e5652d6496474e749315edbdee342 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 28 Nov 2023 10:06:21 +0800 Subject: [PATCH 16/20] fix(util): disable avx2 version decompress timestamp. --- source/util/src/tdecompress.c | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/source/util/src/tdecompress.c b/source/util/src/tdecompress.c index bb22772f7d..9c17000c50 100644 --- a/source/util/src/tdecompress.c +++ b/source/util/src/tdecompress.c @@ -249,6 +249,7 @@ int32_t tsDecompressFloatImplAvx2(const char *const input, const int32_t nelemen int32_t tsDecompressTimestampAvx2(const char *const input, const int32_t nelements, char *const output, bool bigEndian) { +#if 0 int64_t *ostream = (int64_t *)output; int32_t ipos = 1, opos = 0; __m128i prevVal = _mm_setzero_si128(); @@ -389,11 +390,12 @@ int32_t tsDecompressTimestampAvx2(const char *const input, const int32_t nelemen } } #endif - +#endif return 0; } -int32_t tsDecompressTimestampAvx512(const char* const input, const int32_t nelements, char *const output, bool bigEndian) { +int32_t tsDecompressTimestampAvx512(const char *const input, const int32_t nelements, char *const output, + bool UNUSED_PARAM(bigEndian)) { int64_t *ostream = (int64_t *)output; int32_t ipos = 1, opos = 0; __m128i prevVal = _mm_setzero_si128(); @@ -489,11 +491,7 @@ int32_t tsDecompressTimestampAvx512(const char* const input, const int32_t nelem if (nbytes == 0) { deltaOfDelta = 0; } else { - // if (is_bigendian()) { - // memcpy(((char *)(&dd1)) + longBytes - nbytes, input + ipos, nbytes); - // } else { memcpy(&dd, input + ipos, nbytes); - // } deltaOfDelta = ZIGZAG_DECODE(int64_t, dd); } From f4cf898af73c7728b5dc2ea7e7124f6e4854e0f6 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 28 Nov 2023 13:59:13 +0800 Subject: [PATCH 17/20] fix(util): fix syntax error. --- source/util/src/tdecompress.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/source/util/src/tdecompress.c b/source/util/src/tdecompress.c index 9c17000c50..f212bf5231 100644 --- a/source/util/src/tdecompress.c +++ b/source/util/src/tdecompress.c @@ -398,11 +398,12 @@ int32_t tsDecompressTimestampAvx512(const char *const input, const int32_t nelem bool UNUSED_PARAM(bigEndian)) { int64_t *ostream = (int64_t *)output; int32_t ipos = 1, opos = 0; - __m128i prevVal = _mm_setzero_si128(); - __m128i prevDelta = _mm_setzero_si128(); #if __AVX512VL__ + __m128i prevVal = _mm_setzero_si128(); + __m128i prevDelta = _mm_setzero_si128(); + int32_t numOfBatch = nelements >> 1; int32_t remainder = nelements & 0x01; __mmask16 mask2[16] = {0, 0x0001, 0x0003, 0x0007, 0x000f, 0x001f, 0x003f, 0x007f, 0x00ff}; From a5c8eaacaac548ef67d3e395164c21d556fc1a7f Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 28 Nov 2023 23:52:19 +0800 Subject: [PATCH 18/20] fix(tsdb):clear the tsdb stt file reader flag when suspending the tsdbReader --- source/dnode/vnode/src/tsdb/tsdbRead2.c | 67 +++++++++------------- source/dnode/vnode/src/tsdb/tsdbReadUtil.c | 1 + 2 files changed, 27 insertions(+), 41 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 751df706ab..5d018af6c1 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -4148,6 +4148,20 @@ void tsdbReaderClose2(STsdbReader* pReader) { taosMemoryFreeClear(pReader); } +static void clearMemIterInfo(STableBlockScanInfo* pInfo) { + pInfo->iterInit = false; + pInfo->iter.hasVal = false; + pInfo->iiter.hasVal = false; + + if (pInfo->iter.iter != NULL) { + pInfo->iter.iter = tsdbTbDataIterDestroy(pInfo->iter.iter); + } + + if (pInfo->iiter.iter != NULL) { + pInfo->iiter.iter = tsdbTbDataIterDestroy(pInfo->iiter.iter); + } +} + int32_t tsdbReaderSuspend2(STsdbReader* pReader) { // save reader's base state & reset top state to be reconstructed from base state int32_t code = 0; @@ -4168,28 +4182,20 @@ int32_t tsdbReaderSuspend2(STsdbReader* pReader) { tsdbDataFileReaderClose(&pReader->pFileReader); SCostSummary* pCost = &pReader->cost; + pReader->status.pLDataIterArray = destroySttBlockReader(pReader->status.pLDataIterArray, &pCost->sttCost); pReader->status.pLDataIterArray = taosArrayInit(4, POINTER_BYTES); + // resetDataBlockScanInfo excluding lastKey STableBlockScanInfo** p = NULL; - int32_t iter = 0; + int32_t iter = 0; while ((p = tSimpleHashIterate(pStatus->pTableMap, p, &iter)) != NULL) { STableBlockScanInfo* pInfo = *(STableBlockScanInfo**)p; - - pInfo->iterInit = false; - pInfo->iter.hasVal = false; - pInfo->iiter.hasVal = false; - - if (pInfo->iter.iter != NULL) { - pInfo->iter.iter = tsdbTbDataIterDestroy(pInfo->iter.iter); - } - - if (pInfo->iiter.iter != NULL) { - pInfo->iiter.iter = tsdbTbDataIterDestroy(pInfo->iiter.iter); - } - + clearMemIterInfo(pInfo); + pInfo->sttKeyInfo.status = STT_FILE_READER_UNINIT; pInfo->delSkyline = taosArrayDestroy(pInfo->delSkyline); + pInfo->pFileDelData = taosArrayDestroy(pInfo->pFileDelData); } } else { @@ -4199,45 +4205,24 @@ int32_t tsdbReaderSuspend2(STsdbReader* pReader) { while ((p = tSimpleHashIterate(pStatus->pTableMap, p, &iter)) != NULL) { STableBlockScanInfo* pInfo = *(STableBlockScanInfo**)p; - - pInfo->iterInit = false; - pInfo->iter.hasVal = false; - pInfo->iiter.hasVal = false; - - if (pInfo->iter.iter != NULL) { - pInfo->iter.iter = tsdbTbDataIterDestroy(pInfo->iter.iter); - } - - if (pInfo->iiter.iter != NULL) { - pInfo->iiter.iter = tsdbTbDataIterDestroy(pInfo->iiter.iter); - } - + clearMemIterInfo(pInfo); + pInfo->sttKeyInfo.status = STT_FILE_READER_UNINIT; pInfo->delSkyline = taosArrayDestroy(pInfo->delSkyline); } - pBlockScanInfo = pStatus->pTableIter == NULL ? NULL : *pStatus->pTableIter; + pBlockScanInfo = (pStatus->pTableIter == NULL) ? NULL : *pStatus->pTableIter; if (pBlockScanInfo) { // save lastKey to restore memory iterator STimeWindow w = pReader->resBlockInfo.pResBlock->info.window; pBlockScanInfo->lastProcKey = ASCENDING_TRAVERSE(pReader->info.order) ? w.ekey : w.skey; - // reset current current table's data block scan info, - pBlockScanInfo->iterInit = false; - - pBlockScanInfo->iter.hasVal = false; - pBlockScanInfo->iiter.hasVal = false; - if (pBlockScanInfo->iter.iter != NULL) { - pBlockScanInfo->iter.iter = tsdbTbDataIterDestroy(pBlockScanInfo->iter.iter); - } - - if (pBlockScanInfo->iiter.iter != NULL) { - pBlockScanInfo->iiter.iter = tsdbTbDataIterDestroy(pBlockScanInfo->iiter.iter); - } + clearMemIterInfo(pBlockScanInfo); + pBlockScanInfo->sttKeyInfo.status = STT_FILE_READER_UNINIT; + pBlockScanInfo->delSkyline = taosArrayDestroy(pBlockScanInfo->delSkyline); pBlockScanInfo->pBlockList = taosArrayDestroy(pBlockScanInfo->pBlockList); pBlockScanInfo->pBlockIdxList = taosArrayDestroy(pBlockScanInfo->pBlockIdxList); // TODO: keep skyline for reuse - pBlockScanInfo->delSkyline = taosArrayDestroy(pBlockScanInfo->delSkyline); } } diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c index 305399e0af..24c526a906 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c @@ -210,6 +210,7 @@ void clearBlockScanInfo(STableBlockScanInfo* p) { p->iterInit = false; p->iter.hasVal = false; p->iiter.hasVal = false; + p->sttKeyInfo.status = STT_FILE_READER_UNINIT; if (p->iter.iter != NULL) { p->iter.iter = tsdbTbDataIterDestroy(p->iter.iter); From 66f25712fd4f1e9e710e296f022509461e21ebb2 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 29 Nov 2023 18:07:15 +0800 Subject: [PATCH 19/20] fix(tsdb): fix bugs in suspend/resume. adjust code to conduct the test more easily. td-27579 --- source/client/test/clientTests.cpp | 102 ++++++++------ source/dnode/vnode/src/inc/vnodeInt.h | 4 + source/dnode/vnode/src/tsdb/tsdbRead2.c | 151 ++++++++------------- source/dnode/vnode/src/tsdb/tsdbReadUtil.h | 14 +- 4 files changed, 127 insertions(+), 144 deletions(-) diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index addf0aa629..e78783cf3c 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -126,9 +126,9 @@ void queryCallback(void* param, void* res, int32_t code) { taos_fetch_raw_block_a(res, fetchCallback, param); } -void createNewTable(TAOS* pConn, int32_t index) { +void createNewTable(TAOS* pConn, int32_t index, int32_t numOfRows, int64_t startTs, const char* pVarchar) { char str[1024] = {0}; - sprintf(str, "create table tu%d using st2 tags(%d)", index, index); + sprintf(str, "create table if not exists tu%d using st2 tags(%d)", index, index); TAOS_RES* pRes = taos_query(pConn, str); if (taos_errno(pRes) != 0) { @@ -136,22 +136,43 @@ void createNewTable(TAOS* pConn, int32_t index) { } taos_free_result(pRes); - for (int32_t i = 0; i < 10000; i += 20) { - char sql[1024] = {0}; - sprintf(sql, - "insert into tu%d values(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)" - "(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)" - "(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)" - "(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)", - index, i, i, i + 1, i + 1, i + 2, i + 2, i + 3, i + 3, i + 4, i + 4, i + 5, i + 5, i + 6, i + 6, i + 7, - i + 7, i + 8, i + 8, i + 9, i + 9, i + 10, i + 10, i + 11, i + 11, i + 12, i + 12, i + 13, i + 13, i + 14, - i + 14, i + 15, i + 15, i + 16, i + 16, i + 17, i + 17, i + 18, i + 18, i + 19, i + 19); - TAOS_RES* p = taos_query(pConn, sql); - if (taos_errno(p) != 0) { - printf("failed to insert data, reason:%s\n", taos_errstr(p)); - } + if (startTs == 0) { + for (int32_t i = 0; i < numOfRows; i += 20) { + char sql[1024] = {0}; + sprintf(sql, + "insert into tu%d values(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)" + "(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)" + "(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)" + "(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)", + index, i, i, i + 1, i + 1, i + 2, i + 2, i + 3, i + 3, i + 4, i + 4, i + 5, i + 5, i + 6, i + 6, i + 7, + i + 7, i + 8, i + 8, i + 9, i + 9, i + 10, i + 10, i + 11, i + 11, i + 12, i + 12, i + 13, i + 13, i + 14, + i + 14, i + 15, i + 15, i + 16, i + 16, i + 17, i + 17, i + 18, i + 18, i + 19, i + 19); + TAOS_RES* p = taos_query(pConn, sql); + if (taos_errno(p) != 0) { + printf("failed to insert data, reason:%s\n", taos_errstr(p)); + } - taos_free_result(p); + taos_free_result(p); + } + } else { + for (int32_t i = 0; i < numOfRows; i += 20) { + char sql[1024*50] = {0}; + sprintf(sql, + "insert into tu%d values(%ld, %d, '%s')(%ld, %d, '%s')(%ld, %d, '%s')(%ld, %d, '%s')(%ld, %d, '%s')(%ld, " + "%d, '%s')(%ld, %d, '%s')(%ld, %d, '%s')(%ld, %d, '%s')(%ld, %d, '%s')(%ld, %d, '%s')(%ld, %d, '%s')(%ld, %d, '%s')(%ld, %d, '%s')(%ld, %d, '%s')(%ld, %d, " + "'%s')(%ld, %d, '%s')(%ld, %d, '%s')(%ld, %d, '%s')(%ld, %d, '%s')", + index, startTs, i, pVarchar, startTs + 1, i + 1, pVarchar, startTs + 2, i + 2, pVarchar, startTs + 3, i + 3, pVarchar, startTs + 4, i + 4, + pVarchar, startTs + 5, i + 5, pVarchar, startTs + 6, i + 6, pVarchar, startTs + 7, i + 7, pVarchar, startTs + 8, i + 8, pVarchar, startTs + 9, i + 9, + pVarchar, startTs + 10, i + 10, pVarchar, startTs + 11, i + 11, pVarchar, startTs + 12, i + 12, pVarchar, startTs + 13, i + 13, pVarchar, startTs + 14, + i + 14, pVarchar, startTs + 15, i + 15, pVarchar, startTs + 16, i + 16, pVarchar, startTs + 17, i + 17, pVarchar, startTs + 18, i + 18, + pVarchar, startTs + 19, i + 19, pVarchar); + TAOS_RES* p = taos_query(pConn, sql); + if (taos_errno(p) != 0) { + printf("failed to insert data, reason:%s\n", taos_errstr(p)); + } + + taos_free_result(p); + } } } @@ -808,14 +829,7 @@ TEST(clientCase, projection_query_tables) { TAOS_RES* pRes = taos_query(pConn, "use abc1"); taos_free_result(pRes); - pRes = taos_query(pConn, "create stable st1 (ts timestamp, k int) tags(a int)"); - if (taos_errno(pRes) != 0) { - printf("failed to create table tu, reason:%s\n", taos_errstr(pRes)); - } - - taos_free_result(pRes); - - pRes = taos_query(pConn, "create stable st2 (ts timestamp, k int) tags(a int)"); + pRes = taos_query(pConn, "create stable st2 (ts timestamp, k int, f varchar(4096)) tags(a int)"); if (taos_errno(pRes) != 0) { printf("failed to create table tu, reason:%s\n", taos_errstr(pRes)); } @@ -828,28 +842,32 @@ TEST(clientCase, projection_query_tables) { taos_free_result(pRes); int64_t start = 1685959190000; + const char* pstr = + "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefgh" + "ijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnop" + "qrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwx" + "yzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdef" + "ghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz!@#$%^&&*&^^%$#@!qQWERTYUIOPASDFGHJKL:" + "QWERTYUIOP{}"; - int32_t code = -1; - for(int32_t i = 0; i < 1000000; ++i) { - char t[512] = {0}; + for(int32_t i = 0; i < 10000; ++i) { + char str[1024] = {0}; + sprintf(str, "create table if not exists tu%d using st2 tags(%d)", i, i); - sprintf(t, "insert into t1 values(now, %d)", i); - while(1) { - void* p = taos_query(pConn, t); - code = taos_errno(p); - taos_free_result(p); - if (code != 0) { - printf("insert data error, retry\n"); - } else { - break; - } + TAOS_RES* px = taos_query(pConn, str); + if (taos_errno(px) != 0) { + printf("failed to create table tu, reason:%s\n", taos_errstr(pRes)); + } + taos_free_result(px); + } + + for(int32_t j = 0; j < 5000; ++j) { + start += 20; + for (int32_t i = 0; i < 10000; ++i) { + createNewTable(pConn, i, 20, start, pstr); } } - for (int32_t i = 0; i < 1; ++i) { - printf("create table :%d\n", i); - createNewTable(pConn, i); - } // // pRes = taos_query(pConn, "select * from tu"); // if (taos_errno(pRes) != 0) { diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 16379db053..f3f84896ad 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -93,7 +93,11 @@ typedef struct SQueryNode SQueryNode; #define VNODE_RSMA2_DIR "rsma2" #define VNODE_TQ_STREAM "stream" +#if SUSPEND_RESUME_TEST // only for test purpose +#define VNODE_BUFPOOL_SEGMENTS 1 +#else #define VNODE_BUFPOOL_SEGMENTS 3 +#endif #define VND_INFO_FNAME "vnode.json" #define VND_INFO_FNAME_TMP "vnode_tmp.json" diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 5d018af6c1..4138d91f5d 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -48,7 +48,7 @@ static int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScan static int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key, STsdbReader* pReader); -static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, int32_t order, SCostSummary* pCost); +static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, int32_t order, SReadCostSummary* pCost); static STsdb* getTsdbByRetentions(SVnode* pVnode, SQueryTableDataCond* pCond, SRetention* retentions, const char* idstr, int8_t* pLevel); static SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level); @@ -58,6 +58,7 @@ static TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbRea static bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo); static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter); static int32_t getInitialDelIndex(const SArray* pDelSkyline, int32_t order); +static void updateComposedBlockInfo(STsdbReader* pReader, double el, STableBlockScanInfo* pBlockScanInfo); static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); } @@ -168,7 +169,7 @@ static int32_t filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader, bo return TSDB_CODE_SUCCESS; } - SCostSummary* pCost = &pReader->cost; + SReadCostSummary* pCost = &pReader->cost; pIter->pLastBlockReader->uid = 0; tMergeTreeClose(&pIter->pLastBlockReader->mergeTree); @@ -291,11 +292,7 @@ static SSDataBlock* createResBlock(SQueryTableDataCond* pCond, int32_t capacity) } static int32_t tsdbInitReaderLock(STsdbReader* pReader) { - int32_t code = -1; - qTrace("tsdb/read: %p, pre-init read mutex: %p, code: %d", pReader, &pReader->readerMutex, code); - - code = taosThreadMutexInit(&pReader->readerMutex, NULL); - + int32_t code = taosThreadMutexInit(&pReader->readerMutex, NULL); qTrace("tsdb/read: %p, post-init read mutex: %p, code: %d", pReader, &pReader->readerMutex, code); return code; @@ -324,22 +321,14 @@ static int32_t tsdbAcquireReader(STsdbReader* pReader) { } static int32_t tsdbTryAcquireReader(STsdbReader* pReader) { - int32_t code = -1; - qTrace("tsdb/read: %p, pre-trytake read mutex: %p, code: %d", pReader, &pReader->readerMutex, code); - - code = taosThreadMutexTryLock(&pReader->readerMutex); - + int32_t code = taosThreadMutexTryLock(&pReader->readerMutex); qTrace("tsdb/read: %p, post-trytake read mutex: %p, code: %d", pReader, &pReader->readerMutex, code); return code; } static int32_t tsdbReleaseReader(STsdbReader* pReader) { - int32_t code = -1; - qTrace("tsdb/read: %p, pre-untake read mutex: %p, code: %d", pReader, &pReader->readerMutex, code); - - code = taosThreadMutexUnlock(&pReader->readerMutex); - + int32_t code = taosThreadMutexUnlock(&pReader->readerMutex); qTrace("tsdb/read: %p, post-untake read mutex: %p, code: %d", pReader, &pReader->readerMutex, code); return code; @@ -432,6 +421,7 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, void } tsdbInitReaderLock(pReader); + tsem_init(&pReader->resumeAfterSuspend, 0, 0); *ppReader = pReader; return code; @@ -1015,8 +1005,8 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) { // check if current block are all handled if (pDumpInfo->rowIndex >= 0 && pDumpInfo->rowIndex < pRecord->numRow) { int64_t ts = pBlockData->aTSKEY[pDumpInfo->rowIndex]; - if (outOfTimeWindow(ts, - &pReader->info.window)) { // the remain data has out of query time window, ignore current block + if (outOfTimeWindow(ts, &pReader->info.window)) { + // the remain data has out of query time window, ignore current block setBlockAllDumped(pDumpInfo, ts, pReader->info.order); } } else { @@ -1123,16 +1113,12 @@ static bool getNeighborBlockOfSameTable(SDataBlockIter* pBlockIter, SFileDataBlo } int32_t step = asc ? 1 : -1; - // *nextIndex = pBlockInfo->tbBlockIdx + step; - // *pBlockIndex = *(SBlockIndex*)taosArrayGet(pTableBlockScanInfo->pBlockList, *nextIndex); STableDataBlockIdx* pTableDataBlockIdx = taosArrayGet(pTableBlockScanInfo->pBlockIdxList, pBlockInfo->tbBlockIdx + step); SFileDataBlockInfo* p = taosArrayGet(pBlockIter->blockList, pTableDataBlockIdx->globalIndex); memcpy(pRecord, &p->record, sizeof(SBrinRecord)); *nextIndex = pBlockInfo->tbBlockIdx + step; - - // tMapDataGetItemByIdx(&pTableBlockScanInfo->mapData, pIndex->ordinalIndex, pBlock, tGetDataBlk); return true; } @@ -1376,23 +1362,19 @@ static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo* return TSDB_CODE_SUCCESS; } - SSDataBlock* pBlock = pReader->resBlockInfo.pResBlock; - int64_t st = taosGetTimestampUs(); + SSDataBlock* pBlock = pReader->resBlockInfo.pResBlock; int32_t code = buildDataBlockFromBufImpl(pBlockScanInfo, endKey, pReader->resBlockInfo.capacity, pReader); - blockDataUpdateTsWindow(pBlock, pReader->suppInfo.slotId[0]); - pBlock->info.id.uid = pBlockScanInfo->uid; + double el = (taosGetTimestampUs() - st) / 1000.0; + updateComposedBlockInfo(pReader, el, pBlockScanInfo); - setComposedBlockFlag(pReader, true); - - double elapsedTime = (taosGetTimestampUs() - st) / 1000.0; tsdbDebug("%p build data block from cache completed, elapsed time:%.2f ms, numOfRows:%" PRId64 ", brange:%" PRId64 " - %" PRId64 ", uid:%" PRIu64 ", %s", - pReader, elapsedTime, pBlock->info.rows, pBlock->info.window.skey, pBlock->info.window.ekey, + pReader, el, pBlock->info.rows, pBlock->info.window.skey, pBlock->info.window.ekey, pBlockScanInfo->uid, pReader->idStr); - pReader->cost.buildmemBlock += elapsedTime; + pReader->cost.buildmemBlock += el; return code; } @@ -2293,13 +2275,12 @@ static int32_t loadNeighborIfOverlap(SFileDataBlockInfo* pBlockInfo, STableBlock return code; } -static void updateComposedBlockInfo(STsdbReader* pReader, double el, STableBlockScanInfo* pBlockScanInfo) { +void updateComposedBlockInfo(STsdbReader* pReader, double el, STableBlockScanInfo* pBlockScanInfo) { SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock; pResBlock->info.id.uid = (pBlockScanInfo != NULL) ? pBlockScanInfo->uid : 0; pResBlock->info.dataLoad = 1; blockDataUpdateTsWindow(pResBlock, pReader->suppInfo.slotId[0]); - setComposedBlockFlag(pReader, true); pReader->cost.composedBlocks += 1; @@ -2356,7 +2337,6 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { pBlockScanInfo = *pReader->status.pTableIter; if (pReader->pIgnoreTables && taosHashGet(*pReader->pIgnoreTables, &pBlockScanInfo->uid, sizeof(pBlockScanInfo->uid))) { - // setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->info.order); return code; } } @@ -2436,7 +2416,7 @@ int32_t getInitialDelIndex(const SArray* pDelSkyline, int32_t order) { return ASCENDING_TRAVERSE(order) ? 0 : taosArrayGetSize(pDelSkyline) - 1; } -int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, int32_t order, SCostSummary* pCost) { +int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, int32_t order, SReadCostSummary* pCost) { int32_t code = 0; int32_t newDelDataInFile = taosArrayGetSize(pBlockScanInfo->pFileDelData); if (newDelDataInFile == 0 && @@ -2935,6 +2915,8 @@ static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) { SReaderStatus* pStatus = &pReader->status; STableUidList* pUidList = &pStatus->uidList; + tsdbDebug("seq load data blocks from cache, %s", pReader->idStr); + while (1) { if (pReader->code != TSDB_CODE_SUCCESS) { tsdbWarn("tsdb reader is stopped ASAP, code:%s, %s", strerror(pReader->code), pReader->idStr); @@ -3043,6 +3025,8 @@ static ERetrieveType doReadDataFromLastFiles(STsdbReader* pReader) { SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock; SDataBlockIter* pBlockIter = &pReader->status.blockIter; + tsdbDebug("seq load data blocks from stt files %s", pReader->idStr); + while (1) { terrno = 0; @@ -3774,7 +3758,6 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e int32_t code = TSDB_CODE_SUCCESS; do { - // SRow* pTSRow = NULL; TSDBROW row = {.type = -1}; bool freeTSRow = false; tsdbGetNextRowInMem(pBlockScanInfo, pReader, &row, endKey, &freeTSRow); @@ -3792,13 +3775,17 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e if (code) { return code; } + + pBlockScanInfo->lastProcKey = row.pTSRow->ts; } else { code = doAppendRowFromFileBlock(pBlock, pReader, row.pBlockData, row.iRow); if (code) { break; } + pBlockScanInfo->lastProcKey = row.pBlockData->aTSKEY[row.iRow]; } + // no data in buffer, return immediately if (!(pBlockScanInfo->iter.hasVal || pBlockScanInfo->iiter.hasVal)) { break; @@ -4107,7 +4094,7 @@ void tsdbReaderClose2(STsdbReader* pReader) { tsdbDataFileReaderClose(&pReader->pFileReader); } - SCostSummary* pCost = &pReader->cost; + SReadCostSummary* pCost = &pReader->cost; SFilesetIter* pFilesetIter = &pReader->status.fileIter; if (pFilesetIter->pLastBlockReader != NULL) { SLastBlockReader* pLReader = pFilesetIter->pLastBlockReader; @@ -4122,6 +4109,7 @@ void tsdbReaderClose2(STsdbReader* pReader) { tsdbUntakeReadSnap2(pReader, pReader->pReadSnap, true); pReader->pReadSnap = NULL; + tsem_destroy(&pReader->resumeAfterSuspend); tsdbReleaseReader(pReader); tsdbUninitReaderLock(pReader); @@ -4148,26 +4136,14 @@ void tsdbReaderClose2(STsdbReader* pReader) { taosMemoryFreeClear(pReader); } -static void clearMemIterInfo(STableBlockScanInfo* pInfo) { - pInfo->iterInit = false; - pInfo->iter.hasVal = false; - pInfo->iiter.hasVal = false; - - if (pInfo->iter.iter != NULL) { - pInfo->iter.iter = tsdbTbDataIterDestroy(pInfo->iter.iter); - } - - if (pInfo->iiter.iter != NULL) { - pInfo->iiter.iter = tsdbTbDataIterDestroy(pInfo->iiter.iter); - } -} - int32_t tsdbReaderSuspend2(STsdbReader* pReader) { // save reader's base state & reset top state to be reconstructed from base state int32_t code = 0; SReaderStatus* pStatus = &pReader->status; STableBlockScanInfo* pBlockScanInfo = NULL; + pReader->status.suspendInvoked = true; // record the suspend status + if (pStatus->loadFromFile) { SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter); if (pBlockInfo != NULL) { @@ -4181,55 +4157,34 @@ int32_t tsdbReaderSuspend2(STsdbReader* pReader) { tsdbDataFileReaderClose(&pReader->pFileReader); - SCostSummary* pCost = &pReader->cost; - + SReadCostSummary* pCost = &pReader->cost; pReader->status.pLDataIterArray = destroySttBlockReader(pReader->status.pLDataIterArray, &pCost->sttCost); pReader->status.pLDataIterArray = taosArrayInit(4, POINTER_BYTES); - - // resetDataBlockScanInfo excluding lastKey - STableBlockScanInfo** p = NULL; - - int32_t iter = 0; - while ((p = tSimpleHashIterate(pStatus->pTableMap, p, &iter)) != NULL) { - STableBlockScanInfo* pInfo = *(STableBlockScanInfo**)p; - clearMemIterInfo(pInfo); - pInfo->sttKeyInfo.status = STT_FILE_READER_UNINIT; - pInfo->delSkyline = taosArrayDestroy(pInfo->delSkyline); - - pInfo->pFileDelData = taosArrayDestroy(pInfo->pFileDelData); - } - } else { - // resetDataBlockScanInfo excluding lastKey - STableBlockScanInfo** p = NULL; - int32_t iter = 0; - - while ((p = tSimpleHashIterate(pStatus->pTableMap, p, &iter)) != NULL) { - STableBlockScanInfo* pInfo = *(STableBlockScanInfo**)p; - clearMemIterInfo(pInfo); - pInfo->sttKeyInfo.status = STT_FILE_READER_UNINIT; - pInfo->delSkyline = taosArrayDestroy(pInfo->delSkyline); - } - - pBlockScanInfo = (pStatus->pTableIter == NULL) ? NULL : *pStatus->pTableIter; - if (pBlockScanInfo) { - // save lastKey to restore memory iterator - STimeWindow w = pReader->resBlockInfo.pResBlock->info.window; - pBlockScanInfo->lastProcKey = ASCENDING_TRAVERSE(pReader->info.order) ? w.ekey : w.skey; - - clearMemIterInfo(pBlockScanInfo); - pBlockScanInfo->sttKeyInfo.status = STT_FILE_READER_UNINIT; - pBlockScanInfo->delSkyline = taosArrayDestroy(pBlockScanInfo->delSkyline); - - pBlockScanInfo->pBlockList = taosArrayDestroy(pBlockScanInfo->pBlockList); - pBlockScanInfo->pBlockIdxList = taosArrayDestroy(pBlockScanInfo->pBlockIdxList); - // TODO: keep skyline for reuse - } } + // resetDataBlockScanInfo excluding lastKey + STableBlockScanInfo** p = NULL; + + int32_t step = ASCENDING_TRAVERSE(pReader->info.order)? 1:-1; + + int32_t iter = 0; + while ((p = tSimpleHashIterate(pStatus->pTableMap, p, &iter)) != NULL) { + STableBlockScanInfo* pInfo = *(STableBlockScanInfo**)p; + clearBlockScanInfo(pInfo); + pInfo->sttKeyInfo.nextProcKey = pInfo->lastProcKey + step; + } + + pStatus->uidList.currentIndex = 0; + initReaderStatus(pStatus); + tsdbUntakeReadSnap2(pReader, pReader->pReadSnap, false); pReader->pReadSnap = NULL; pReader->flag = READER_STATUS_SUSPEND; +#if SUSPEND_RESUME_TEST + tsem_post(&pReader->resumeAfterSuspend); +#endif + tsdbDebug("reader: %p suspended uid %" PRIu64 " in this query %s", pReader, pBlockScanInfo ? pBlockScanInfo->uid : 0, pReader->idStr); return code; @@ -4384,6 +4339,16 @@ int32_t tsdbNextDataBlock2(STsdbReader* pReader, bool* hasNext) { SReaderStatus* pStatus = &pReader->status; + // NOTE: the following codes is used to perform test for suspend/resume for tsdbReader when it blocks the commit + // the data should be ingested in round-robin and all the child tables should be createted before ingesting data + // the version range of query will be used to identify the correctness of suspend/resume functions. + // this function will blocked before loading the SECOND block from vnode-buffer, and restart itself from sst-files +#if SUSPEND_RESUME_TEST + if (!pReader->status.suspendInvoked && !pReader->status.loadFromFile) { + tsem_wait(&pReader->resumeAfterSuspend); + } +#endif + code = tsdbAcquireReader(pReader); qTrace("tsdb/read: %p, take read mutex, code: %d", pReader, code); diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h index 60e6e6960a..709e311ff0 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h @@ -96,7 +96,7 @@ typedef struct SResultBlockInfo { int64_t capacity; } SResultBlockInfo; -typedef struct SCostSummary { +typedef struct SReadCostSummary { int64_t numOfBlocks; double blockLoadTime; double buildmemBlock; @@ -110,7 +110,7 @@ typedef struct SCostSummary { double createScanInfoList; double createSkylineIterTime; double initLastBlockReader; -} SCostSummary; +} SReadCostSummary; typedef struct STableUidList { uint64_t* tableUidList; // access table uid list in uid ascending order list @@ -122,12 +122,6 @@ typedef struct { int32_t numOfSttFiles; } SBlockNumber; -typedef struct SBlockIndex { - int32_t ordinalIndex; - int64_t inFileOffset; - STimeWindow window; // todo replace it with overlap flag. -} SBlockIndex; - typedef struct SBlockOrderWrapper { int64_t uid; int64_t offset; @@ -192,6 +186,7 @@ typedef struct SFileBlockDumpInfo { } SFileBlockDumpInfo; typedef struct SReaderStatus { + bool suspendInvoked; bool loadFromFile; // check file stage bool composedDataBlock; // the returned data block is a composed block or not SSHashObj* pTableMap; // SHash @@ -220,7 +215,8 @@ struct STsdbReader { int32_t type; // query type: 1. retrieve all data blocks, 2. retrieve direct prev|next rows SBlockLoadSuppInfo suppInfo; STsdbReadSnap* pReadSnap; - SCostSummary cost; + tsem_t resumeAfterSuspend; + SReadCostSummary cost; SHashObj** pIgnoreTables; SSHashObj* pSchemaMap; // keep the retrieved schema info, to avoid the overhead by repeatly load schema SDataFileReader* pFileReader; // the file reader From 28e17a61b932000c32c05d1ffe5c8a444e926cb3 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 29 Nov 2023 19:28:53 +0800 Subject: [PATCH 20/20] fix(tsdb): fix invalid read. --- source/dnode/vnode/src/tsdb/tsdbRead2.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 4138d91f5d..cb899f9ee8 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -3766,6 +3766,7 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e } if (row.type == TSDBROW_ROW_FMT) { + int64_t ts = row.pTSRow->ts;; code = doAppendRowFromTSRow(pBlock, pReader, row.pTSRow, pBlockScanInfo); if (freeTSRow) { @@ -3776,7 +3777,7 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e return code; } - pBlockScanInfo->lastProcKey = row.pTSRow->ts; + pBlockScanInfo->lastProcKey = ts; } else { code = doAppendRowFromFileBlock(pBlock, pReader, row.pBlockData, row.iRow); if (code) {