diff --git a/cmake/cmake.define b/cmake/cmake.define index 3343798686..56b6b7e1de 100644 --- a/cmake/cmake.define +++ b/cmake/cmake.define @@ -149,6 +149,8 @@ ELSE () CHECK_C_COMPILER_FLAG("-mfma" COMPILER_SUPPORT_FMA) CHECK_C_COMPILER_FLAG("-mavx" COMPILER_SUPPORT_AVX) CHECK_C_COMPILER_FLAG("-mavx2" COMPILER_SUPPORT_AVX2) + CHECK_C_COMPILER_FLAG("-mavx512f" COMPILER_SUPPORT_AVX512F) + CHECK_C_COMPILER_FLAG("-mavx512vbmi" COMPILER_SUPPORT_AVX512BMI) IF (COMPILER_SUPPORT_SSE42) SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -msse4.2") @@ -168,7 +170,13 @@ ELSE () SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -mavx2") SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -mavx2") ENDIF() - MESSAGE(STATUS "SIMD instructions (FMA/AVX/AVX2) is ACTIVATED") + MESSAGE(STATUS "SIMD instructions (FMA/AVX/AVX2/AVX512) is ACTIVATED") + + IF (COMPILER_SUPPORT_AVX512F AND COMPILER_SUPPORT_AVX512BMI) + SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -mavx512f -mavx512vbmi") + SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -mavx512f -mavx512vbmi") + MESSAGE(STATUS "avx512 supported by gcc") + ENDIF() ENDIF() # build mode diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index eab3ecf04e..9d32912ece 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -304,6 +304,7 @@ typedef struct SCheckpointInfo { int64_t startTs; int64_t checkpointId; int64_t checkpointVer; // latest checkpointId version + int64_t processedVer; // already processed ver, that has generated results version. int64_t nextProcessVer; // current offset in WAL, not serialize it int64_t failedId; // record the latest failed checkpoint id } SCheckpointInfo; diff --git a/include/os/osEnv.h b/include/os/osEnv.h index bc65da47a9..ac4ecd4212 100644 --- a/include/os/osEnv.h +++ b/include/os/osEnv.h @@ -36,11 +36,12 @@ extern int64_t tsStreamMax; extern float tsNumOfCores; extern int64_t tsTotalMemoryKB; extern char *tsProcPath; -extern char tsSIMDBuiltins; +extern char tsSIMDEnable; extern char tsSSE42Enable; extern char tsAVXEnable; extern char tsAVX2Enable; extern char tsFMAEnable; +extern char tsAVX512Enable; extern char tsTagFilterCache; extern char configDir[]; diff --git a/include/os/osSysinfo.h b/include/os/osSysinfo.h index 29b6f07dca..7a1df2b81c 100644 --- a/include/os/osSysinfo.h +++ b/include/os/osSysinfo.h @@ -41,7 +41,7 @@ int32_t taosGetOsReleaseName(char *releaseName, char* sName, char* ver, int32_t int32_t taosGetCpuInfo(char *cpuModel, int32_t maxLen, float *numOfCores); int32_t taosGetCpuCores(float *numOfCores, bool physical); void taosGetCpuUsage(double *cpu_system, double *cpu_engine); -int32_t taosGetCpuInstructions(char* sse42, char* avx, char* avx2, char* fma); +int32_t taosGetCpuInstructions(char* sse42, char* avx, char* avx2, char* fma, char* avx512); int32_t taosGetTotalMemory(int64_t *totalKB); int32_t taosGetProcMemory(int64_t *usedKB); int32_t taosGetSysMemory(int64_t *usedKB); diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index d70c6b725e..448fd7c653 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -494,7 +494,8 @@ static int32_t taosAddSystemCfg(SConfig *pCfg) { if (cfgAddBool(pCfg, "avx", tsAVXEnable, CFG_SCOPE_BOTH) != 0) return -1; if (cfgAddBool(pCfg, "avx2", tsAVX2Enable, CFG_SCOPE_BOTH) != 0) return -1; if (cfgAddBool(pCfg, "fma", tsFMAEnable, CFG_SCOPE_BOTH) != 0) return -1; - if (cfgAddBool(pCfg, "simdEnable", tsSIMDBuiltins, CFG_SCOPE_BOTH) != 0) return -1; + if (cfgAddBool(pCfg, "avx512", tsAVX512Enable, CFG_SCOPE_BOTH) != 0) return -1; + if (cfgAddBool(pCfg, "simdEnable", tsSIMDEnable, CFG_SCOPE_BOTH) != 0) return -1; if (cfgAddBool(pCfg, "tagFilterCache", tsTagFilterCache, CFG_SCOPE_BOTH) != 0) return -1; if (cfgAddInt64(pCfg, "openMax", tsOpenMax, 0, INT64_MAX, CFG_SCOPE_BOTH) != 0) return -1; @@ -1023,7 +1024,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsNumOfSnodeWriteThreads = cfgGetItem(pCfg, "numOfSnodeUniqueThreads")->i32; tsRpcQueueMemoryAllowed = cfgGetItem(pCfg, "rpcQueueMemoryAllowed")->i64; - tsSIMDBuiltins = (bool)cfgGetItem(pCfg, "simdEnable")->bval; + tsSIMDEnable = (bool)cfgGetItem(pCfg, "simdEnable")->bval; tsTagFilterCache = (bool)cfgGetItem(pCfg, "tagFilterCache")->bval; tsEnableMonitor = cfgGetItem(pCfg, "monitor")->bval; diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 135aab285b..198a9a55ce 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -2584,7 +2584,7 @@ int32_t doKillActiveCheckpointTrans(SMnode *pMnode) { } if (transId == 0) { - mError("failed to find the checkpoint trans, reset not executed"); + mDebug("failed to find the checkpoint trans, reset not executed"); return TSDB_CODE_SUCCESS; } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index fd35f5f7eb..dd4bee27f0 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1663,7 +1663,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) SStreamCheckpointSourceReq req = {0}; if (!vnodeIsRoleLeader(pTq->pVnode)) { - tqDebug("vgId:%d not leader, ignore checkpoint-source msg", vgId); + tqDebug("vgId:%d not leader, ignore checkpoint-source msg, s-task:0x%x", vgId, req.taskId); SRpcMsg rsp = {0}; buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0); tmsgSendRsp(&rsp); // error occurs @@ -1671,7 +1671,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) } if (!pTq->pVnode->restored) { - tqDebug("vgId:%d checkpoint-source msg received during restoring, ignore it", vgId); + tqDebug("vgId:%d checkpoint-source msg received during restoring, s-task:0x%x ignore it", vgId, req.taskId); SRpcMsg rsp = {0}; buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0); tmsgSendRsp(&rsp); // error occurs @@ -1691,7 +1691,6 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) } tDecoderClear(&decoder); - // todo handle failure to reset from checkpoint procedure SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.taskId); if (pTask == NULL) { tqError("vgId:%d failed to find s-task:0x%x, ignore checkpoint msg. it may have been destroyed already", vgId, @@ -1702,7 +1701,6 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) return TSDB_CODE_SUCCESS; } - // todo handle failure to reset from checkpoint procedure // downstream not ready, current the stream tasks are not all ready. Ignore this checkpoint req. if (pTask->status.downstreamReady != 1) { pTask->chkInfo.failedId = req.checkpointId; // record the latest failed checkpoint id @@ -1723,17 +1721,32 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) ETaskStatus status = streamTaskGetStatus(pTask, NULL); if (status == TASK_STATUS__HALT || status == TASK_STATUS__PAUSE) { - qError("s-task:%s not ready for checkpoint, since it is halt, ignore this checkpoint:%" PRId64 ", set it failure", + tqError("s-task:%s not ready for checkpoint, since it is halt, ignore this checkpoint:%" PRId64 ", set it failure", pTask->id.idStr, req.checkpointId); - taosThreadMutexUnlock(&pTask->lock); + taosThreadMutexUnlock(&pTask->lock); streamMetaReleaseTask(pMeta, pTask); SRpcMsg rsp = {0}; buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0); tmsgSendRsp(&rsp); // error occurs + return TSDB_CODE_SUCCESS; } + + // check if the checkpoint msg already sent or not. + if (status == TASK_STATUS__CK) { + ASSERT(pTask->checkpointingId == req.checkpointId); + tqWarn("s-task:%s recv checkpoint-source msg again checkpointId:%" PRId64 + " already received, ignore this msg and continue process checkpoint", + pTask->id.idStr, pTask->checkpointingId); + + taosThreadMutexUnlock(&pTask->lock); + streamMetaReleaseTask(pMeta, pTask); + + return TSDB_CODE_SUCCESS; + } + streamProcessCheckpointSourceReq(pTask, &req); taosThreadMutexUnlock(&pTask->lock); @@ -1919,8 +1932,59 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { pMeta->startInfo.tasksWillRestart = 0; streamMetaWUnLock(pMeta); } else { - streamMetaWUnLock(pMeta); + tqDebug("vgId:%d all %d task(s) nodeEp updated and closed", vgId, numOfTasks); + +#if 1 tqStartStreamTaskAsync(pTq, true); + streamMetaWUnLock(pMeta); +#else + streamMetaWUnLock(pMeta); + + // For debug purpose. + // the following procedure consume many CPU resource, result in the re-election of leader + // with high probability. So we employ it as a test case for the stream processing framework, with + // checkpoint/restart/nodeUpdate etc. + while(1) { + int32_t startVal = atomic_val_compare_exchange_32(&pMeta->startInfo.taskStarting, 0, 1); + if (startVal == 0) { + break; + } + + tqDebug("vgId:%d in start stream tasks procedure, wait for 500ms and recheck", vgId); + taosMsleep(500); + } + + while (streamMetaTaskInTimer(pMeta)) { + tqDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId); + taosMsleep(100); + } + + streamMetaWLock(pMeta); + + int32_t code = streamMetaReopen(pMeta); + if (code != 0) { + tqError("vgId:%d failed to reopen stream meta", vgId); + streamMetaWUnLock(pMeta); + taosArrayDestroy(req.pNodeList); + return -1; + } + + if (streamMetaLoadAllTasks(pTq->pStreamMeta) < 0) { + tqError("vgId:%d failed to load stream tasks", vgId); + streamMetaWUnLock(pMeta); + taosArrayDestroy(req.pNodeList); + return -1; + } + + if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) { + tqInfo("vgId:%d start all stream tasks after all being updated", vgId); + tqResetStreamTaskStatus(pTq); + tqStartStreamTaskAsync(pTq, false); + } else { + tqInfo("vgId:%d, follower node not start stream tasks", vgId); + } + streamMetaWUnLock(pMeta); +#endif } } diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 44eb0351d7..fb745dbc86 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -302,7 +302,7 @@ int32_t tqResetStreamTaskStatus(STQ* pTq) { int32_t vgId = TD_VID(pTq->pVnode); int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); - tqDebug("vgId:%d start all %d stream task(s)", vgId, numOfTasks); + tqDebug("vgId:%d reset all %d stream task(s) status to be uninit", vgId, numOfTasks); if (numOfTasks == 0) { return TSDB_CODE_SUCCESS; } @@ -451,7 +451,7 @@ bool doPutDataIntoInputQFromWal(SStreamTask* pTask, int64_t maxVer, int32_t* num numOfNewItems += 1; int64_t ver = walReaderGetCurrentVer(pTask->exec.pWalReader); pTask->chkInfo.nextProcessVer = ver; - tqDebug("s-task:%s set the ver:%" PRId64 " from WALReader after extract block from WAL", id, ver); + tqDebug("s-task:%s set ver:%" PRId64 " for reader after extract data from WAL", id, ver); bool itemInFillhistory = handleFillhistoryScanComplete(pTask, ver); if (itemInFillhistory) { @@ -514,7 +514,6 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) { int64_t maxVer = (pTask->info.fillHistory == 1) ? pTask->dataRange.range.maxVer : INT64_MAX; taosThreadMutexLock(&pTask->lock); - tqDebug("s-task:%s lock", pTask->id.idStr); char* p = NULL; ETaskStatus status = streamTaskGetStatus(pTask, &p); diff --git a/source/libs/function/src/detail/tavgfunction.c b/source/libs/function/src/detail/tavgfunction.c index 50df1b5067..e626c937da 100644 --- a/source/libs/function/src/detail/tavgfunction.c +++ b/source/libs/function/src/detail/tavgfunction.c @@ -565,7 +565,7 @@ int32_t avgFunction(SqlFunctionCtx* pCtx) { numOfElem = pInput->numOfRows; pAvgRes->count += pInput->numOfRows; - bool simdAvailable = tsAVXEnable && tsSIMDBuiltins && (numOfRows > THRESHOLD_SIZE); + bool simdAvailable = tsAVXEnable && tsSIMDEnable && (numOfRows > THRESHOLD_SIZE); switch(type) { case TSDB_DATA_TYPE_UTINYINT: diff --git a/source/libs/function/src/detail/tminmax.c b/source/libs/function/src/detail/tminmax.c index 3ca1c06303..a6c91a57ce 100644 --- a/source/libs/function/src/detail/tminmax.c +++ b/source/libs/function/src/detail/tminmax.c @@ -370,7 +370,7 @@ static int32_t findFirstValPosition(const SColumnInfoData* pCol, int32_t start, static void handleInt8Col(const void* data, int32_t start, int32_t numOfRows, SMinmaxResInfo* pBuf, bool isMinFunc, bool signVal) { // AVX2 version to speedup the loop - if (tsAVX2Enable && tsSIMDBuiltins) { + if (tsAVX2Enable && tsSIMDEnable) { pBuf->v = i8VectorCmpAVX2(data, numOfRows, isMinFunc, signVal); } else { if (!pBuf->assign) { @@ -404,7 +404,7 @@ static void handleInt8Col(const void* data, int32_t start, int32_t numOfRows, SM static void handleInt16Col(const void* data, int32_t start, int32_t numOfRows, SMinmaxResInfo* pBuf, bool isMinFunc, bool signVal) { // AVX2 version to speedup the loop - if (tsAVX2Enable && tsSIMDBuiltins) { + if (tsAVX2Enable && tsSIMDEnable) { pBuf->v = i16VectorCmpAVX2(data, numOfRows, isMinFunc, signVal); } else { if (!pBuf->assign) { @@ -438,7 +438,7 @@ static void handleInt16Col(const void* data, int32_t start, int32_t numOfRows, S static void handleInt32Col(const void* data, int32_t start, int32_t numOfRows, SMinmaxResInfo* pBuf, bool isMinFunc, bool signVal) { // AVX2 version to speedup the loop - if (tsAVX2Enable && tsSIMDBuiltins) { + if (tsAVX2Enable && tsSIMDEnable) { pBuf->v = i32VectorCmpAVX2(data, numOfRows, isMinFunc, signVal); } else { if (!pBuf->assign) { @@ -502,7 +502,7 @@ static void handleFloatCol(SColumnInfoData* pCol, int32_t start, int32_t numOfRo float* val = (float*)&pBuf->v; // AVX version to speedup the loop - if (tsAVXEnable && tsSIMDBuiltins) { + if (tsAVXEnable && tsSIMDEnable) { *val = floatVectorCmpAVX(pData, numOfRows, isMinFunc); } else { if (!pBuf->assign) { @@ -533,7 +533,7 @@ static void handleDoubleCol(SColumnInfoData* pCol, int32_t start, int32_t numOfR double* val = (double*)&pBuf->v; // AVX version to speedup the loop - if (tsAVXEnable && tsSIMDBuiltins) { + if (tsAVXEnable && tsSIMDEnable) { *val = (double)doubleVectorCmpAVX(pData, numOfRows, isMinFunc); } else { if (!pBuf->assign) { diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index b34b3420fe..441c71662e 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -44,7 +44,7 @@ typedef struct { int64_t defaultCfInit; } SBackendWrapper; -void* streamBackendInit(const char* path, int64_t chkpId); +void* streamBackendInit(const char* path, int64_t chkpId, int32_t vgId); void streamBackendCleanup(void* arg); void streamBackendHandleCleanup(void* arg); int32_t streamBackendLoadCheckpointInfo(void* pMeta); diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index b22c6c9b0f..63dc497c6f 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -469,11 +469,11 @@ int32_t rebuildDirFromCheckpoint(const char* path, int64_t chkpId, char** dst) { return 0; } -void* streamBackendInit(const char* streamPath, int64_t chkpId) { +void* streamBackendInit(const char* streamPath, int64_t chkpId, int32_t vgId) { char* backendPath = NULL; int32_t code = rebuildDirFromCheckpoint(streamPath, chkpId, &backendPath); - stDebug("start to init stream backend at %s, checkpointid: %" PRId64 "", backendPath, chkpId); + stDebug("start to init stream backend at %s, checkpointid: %" PRId64 " vgId:%d", backendPath, chkpId, vgId); uint32_t dbMemLimit = nextPow2(tsMaxStreamBackendCache) << 20; SBackendWrapper* pHandle = taosMemoryCalloc(1, sizeof(SBackendWrapper)); @@ -534,7 +534,7 @@ void* streamBackendInit(const char* streamPath, int64_t chkpId) { if (cfs != NULL) { rocksdb_list_column_families_destroy(cfs, nCf); } - stDebug("succ to init stream backend at %s, backend:%p", backendPath, pHandle); + stDebug("succ to init stream backend at %s, backend:%p, vgId:%d", backendPath, pHandle, vgId); taosMemoryFreeClear(backendPath); return (void*)pHandle; diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 81840aaeb7..48b6486e05 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -297,9 +297,12 @@ int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) { continue; } - ASSERT(p->chkInfo.checkpointId < p->checkpointingId && p->checkpointingId == checkpointId); + ASSERT(p->chkInfo.checkpointId < p->checkpointingId && p->checkpointingId == checkpointId && + p->chkInfo.checkpointVer <= p->chkInfo.processedVer); p->chkInfo.checkpointId = p->checkpointingId; + p->chkInfo.checkpointVer = p->chkInfo.processedVer; + streamTaskClearCheckInfo(p); char* str = NULL; diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index 80927b36b9..f6ec6e9fdb 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -129,6 +129,7 @@ SStreamDataSubmit* streamDataSubmitNew(SPackedData* pData, int32_t type) { void streamDataSubmitDestroy(SStreamDataSubmit* pDataSubmit) { ASSERT(pDataSubmit->type == STREAM_INPUT__DATA_SUBMIT); taosMemoryFree(pDataSubmit->submit.msgStr); + taosFreeQitem(pDataSubmit); } SStreamMergedSubmit* streamMergedSubmitNew() { @@ -208,12 +209,10 @@ void streamFreeQitem(SStreamQueueItem* data) { if (type == STREAM_INPUT__GET_RES) { blockDataDestroy(((SStreamTrigger*)data)->pBlock); taosFreeQitem(data); - } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE || type == STREAM_INPUT__TRANS_STATE) { - taosArrayDestroyEx(((SStreamDataBlock*)data)->blocks, (FDelete)blockDataFreeRes); - taosFreeQitem(data); + } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE) { + destroyStreamDataBlock((SStreamDataBlock*)data); } else if (type == STREAM_INPUT__DATA_SUBMIT) { streamDataSubmitDestroy((SStreamDataSubmit*)data); - taosFreeQitem(data); } else if (type == STREAM_INPUT__MERGED_SUBMIT) { SStreamMergedSubmit* pMerge = (SStreamMergedSubmit*)data; @@ -228,7 +227,7 @@ void streamFreeQitem(SStreamQueueItem* data) { SStreamRefDataBlock* pRefBlock = (SStreamRefDataBlock*)data; blockDataDestroy(pRefBlock->pBlock); taosFreeQitem(pRefBlock); - } else if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER) { + } else if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER || type == STREAM_INPUT__TRANS_STATE) { SStreamDataBlock* pBlock = (SStreamDataBlock*) data; taosArrayDestroyEx(pBlock->blocks, freeItems); taosFreeQitem(pBlock); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 701cc76086..94cd2f76f3 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -593,7 +593,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { const SStreamQueueItem* pItem = pInput; stDebug("s-task:%s start to process batch of blocks, num:%d, type:%d", id, numOfBlocks, pItem->type); - int64_t ver = pTask->chkInfo.checkpointVer; + int64_t ver = pTask->chkInfo.processedVer; doSetStreamInputBlock(pTask, pInput, &ver, id); int64_t resSize = 0; @@ -604,13 +604,16 @@ int32_t streamExecForAll(SStreamTask* pTask) { stDebug("s-task:%s batch of input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d", id, el, SIZE_IN_MiB(resSize), totalBlocks); - // update the currentVer if processing the submit blocks. - ASSERT(pTask->chkInfo.checkpointVer <= pTask->chkInfo.nextProcessVer && ver >= pTask->chkInfo.checkpointVer); + SCheckpointInfo* pInfo = &pTask->chkInfo; - if (ver != pTask->chkInfo.checkpointVer) { - stDebug("s-task:%s update checkpointVer(unsaved) from %" PRId64 " to %" PRId64 ", nextProcessVer:%" PRId64, - pTask->id.idStr, pTask->chkInfo.checkpointVer, ver, pTask->chkInfo.nextProcessVer); - pTask->chkInfo.checkpointVer = ver; + // update the currentVer if processing the submit blocks. + ASSERT(pInfo->checkpointVer <= pInfo->nextProcessVer && ver >= pInfo->checkpointVer); + + if (ver != pInfo->processedVer) { + stDebug("s-task:%s update processedVer(unsaved) from %" PRId64 " to %" PRId64 " nextProcessVer:%" PRId64 + " ckpt:%" PRId64, + pTask->id.idStr, pInfo->processedVer, ver, pInfo->nextProcessVer, pInfo->checkpointVer); + pInfo->processedVer = ver; } streamFreeQitem(pInput); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 17cd9fac57..e6bbd89f02 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -195,10 +195,10 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF taosInitRWLatch(&pMeta->chkpDirLock); pMeta->chkpId = streamGetLatestCheckpointId(pMeta); - pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId); + pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId, vgId); while (pMeta->streamBackend == NULL) { taosMsleep(100); - pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId); + pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId, vgId); if (pMeta->streamBackend == NULL) { stInfo("vgId:%d failed to init stream backend, retry in 100ms", pMeta->vgId); } @@ -263,7 +263,8 @@ int32_t streamMetaReopen(SStreamMeta* pMeta) { } } - while ((pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId)) == NULL) { + // todo: not wait in a critical region + while ((pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId, pMeta->vgId)) == NULL) { stInfo("vgId:%d failed to init stream backend, retry in 100ms", pMeta->vgId); taosMsleep(100); } diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 63ee702ada..556de169b4 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -270,7 +270,6 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) "s-task:%s inputQ is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) stop to push data", pTask->id.idStr, STREAM_TASK_QUEUE_CAPACITY, STREAM_TASK_QUEUE_CAPACITY_IN_SIZE, total, size); streamDataSubmitDestroy(px); - taosFreeQitem(pItem); return -1; } @@ -280,7 +279,6 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) int32_t code = taosWriteQitem(pQueue, pItem); if (code != TSDB_CODE_SUCCESS) { streamDataSubmitDestroy(px); - taosFreeQitem(pItem); return code; } @@ -296,13 +294,13 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) stTrace("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort", pTask->id.idStr, STREAM_TASK_QUEUE_CAPACITY, STREAM_TASK_QUEUE_CAPACITY_IN_SIZE, total, size); - destroyStreamDataBlock((SStreamDataBlock*)pItem); + streamFreeQitem(pItem); return -1; } int32_t code = taosWriteQitem(pQueue, pItem); if (code != TSDB_CODE_SUCCESS) { - destroyStreamDataBlock((SStreamDataBlock*)pItem); + streamFreeQitem(pItem); return code; } @@ -312,7 +310,7 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) type == STREAM_INPUT__TRANS_STATE) { int32_t code = taosWriteQitem(pQueue, pItem); if (code != TSDB_CODE_SUCCESS) { - taosFreeQitem(pItem); + streamFreeQitem(pItem); return code; } @@ -323,7 +321,7 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) // use the default memory limit, refactor later. int32_t code = taosWriteQitem(pQueue, pItem); if (code != TSDB_CODE_SUCCESS) { - taosFreeQitem(pItem); + streamFreeQitem(pItem); return code; } diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index ed96f2c4b0..32d6294de8 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -562,7 +562,6 @@ int32_t streamTaskPutTranstateIntoInputQ(SStreamTask* pTask) { taosMemoryFree(pBlock); if (streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pTranstate) < 0) { - taosFreeQitem(pTranstate); return TSDB_CODE_OUT_OF_MEMORY; } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index a7fb590d1b..24228c0307 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -431,8 +431,10 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i pTask->outputq.status = TASK_OUTPUT_STATUS__NORMAL; pTask->pMeta = pMeta; - pTask->chkInfo.checkpointVer = ver - 1; - pTask->chkInfo.nextProcessVer = ver; + pTask->chkInfo.checkpointVer = ver - 1; // only update when generating checkpoint + pTask->chkInfo.processedVer = ver - 1; // already processed version + + pTask->chkInfo.nextProcessVer = ver; // next processed version pTask->dataRange.range.maxVer = ver; pTask->dataRange.range.minVer = ver; pTask->pMsgCb = pMsgCb; diff --git a/source/libs/stream/test/tstreamUpdateTest.cpp b/source/libs/stream/test/tstreamUpdateTest.cpp index f63939ac9e..1b999e5fb0 100644 --- a/source/libs/stream/test/tstreamUpdateTest.cpp +++ b/source/libs/stream/test/tstreamUpdateTest.cpp @@ -12,7 +12,7 @@ class StreamStateEnv : public ::testing::Test { protected: virtual void SetUp() { streamMetaInit(); - backend = streamBackendInit(path, 0); + backend = streamBackendInit(path, 0, 0); } virtual void TearDown() { streamMetaCleanup(); diff --git a/source/os/src/osEnv.c b/source/os/src/osEnv.c index 0fc136c693..54107db325 100644 --- a/source/os/src/osEnv.c +++ b/source/os/src/osEnv.c @@ -37,11 +37,12 @@ float tsNumOfCores = 0; int64_t tsTotalMemoryKB = 0; char *tsProcPath = NULL; -char tsSIMDBuiltins = 0; +char tsSIMDEnable = 0; char tsSSE42Enable = 0; char tsAVXEnable = 0; char tsAVX2Enable = 0; char tsFMAEnable = 0; +char tsAVX512Enable = 0; void osDefaultInit() { taosSeedRand(taosSafeRand()); diff --git a/source/os/src/osSysinfo.c b/source/os/src/osSysinfo.c index 4816ec8f8b..fea7a4f63d 100644 --- a/source/os/src/osSysinfo.c +++ b/source/os/src/osSysinfo.c @@ -250,7 +250,7 @@ void taosGetSystemInfo() { taosGetCpuCores(&tsNumOfCores, false); taosGetTotalMemory(&tsTotalMemoryKB); taosGetCpuUsage(NULL, NULL); - taosGetCpuInstructions(&tsSSE42Enable, &tsAVXEnable, &tsAVX2Enable, &tsFMAEnable); + taosGetCpuInstructions(&tsSSE42Enable, &tsAVXEnable, &tsAVX2Enable, &tsFMAEnable, &tsAVX512Enable); #endif } @@ -602,7 +602,7 @@ void taosGetCpuUsage(double *cpu_system, double *cpu_engine) { : "0"(level)) // todo add for windows and mac -int32_t taosGetCpuInstructions(char* sse42, char* avx, char* avx2, char* fma) { +int32_t taosGetCpuInstructions(char* sse42, char* avx, char* avx2, char* fma, char* avx512) { #ifdef WINDOWS #elif defined(_TD_DARWIN_64) #else @@ -610,12 +610,6 @@ int32_t taosGetCpuInstructions(char* sse42, char* avx, char* avx2, char* fma) { #ifdef _TD_X86_ // Since the compiler is not support avx/avx2 instructions, the global variables always need to be // set to be false -//#if __AVX__ || __AVX2__ -// tsSIMDBuiltins = true; -//#else -// tsSIMDBuiltins = false; -//#endif - uint32_t eax = 0, ebx = 0, ecx = 0, edx = 0; int32_t ret = __get_cpuid(1, &eax, &ebx, &ecx, &edx); @@ -631,6 +625,7 @@ int32_t taosGetCpuInstructions(char* sse42, char* avx, char* avx2, char* fma) { // Ref to https://gcc.gnu.org/bugzilla/show_bug.cgi?id=77756 __cpuid_fix(7u, eax, ebx, ecx, edx); *avx2 = (char) ((ebx & bit_AVX2) == bit_AVX2); + *avx512 = (char)((ebx & bit_AVX512F) == bit_AVX512F); #endif // _TD_X86_ #endif diff --git a/source/util/src/tcompression.c b/source/util/src/tcompression.c index 3fc3ef6be6..dc89a24180 100644 --- a/source/util/src/tcompression.c +++ b/source/util/src/tcompression.c @@ -283,7 +283,7 @@ int32_t tsDecompressINTImp(const char *const input, const int32_t nelements, cha int32_t batch = num >> 2; int32_t remain = num & 0x03; if (selector == 0 || selector == 1) { - if (tsAVX2Enable && tsSIMDBuiltins) { + if (tsAVX2Enable && tsSIMDEnable) { for (int32_t i = 0; i < batch; ++i) { __m256i prev = _mm256_set1_epi64x(prev_value); _mm256_storeu_si256((__m256i *)&p[_pos], prev); @@ -300,7 +300,7 @@ int32_t tsDecompressINTImp(const char *const input, const int32_t nelements, cha } } } else { - if (tsAVX2Enable && tsSIMDBuiltins) { + if (tsAVX2Enable && tsSIMDEnable) { __m256i base = _mm256_set1_epi64x(w); __m256i maskVal = _mm256_set1_epi64x(mask);