Merge pull request #23631 from taosdata/fix/liaohj

multiple fix
This commit is contained in:
Haojun Liao 2023-11-10 13:09:40 +08:00 committed by GitHub
commit 5824caa8f5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 138 additions and 63 deletions

View File

@ -149,6 +149,8 @@ ELSE ()
CHECK_C_COMPILER_FLAG("-mfma" COMPILER_SUPPORT_FMA)
CHECK_C_COMPILER_FLAG("-mavx" COMPILER_SUPPORT_AVX)
CHECK_C_COMPILER_FLAG("-mavx2" COMPILER_SUPPORT_AVX2)
CHECK_C_COMPILER_FLAG("-mavx512f" COMPILER_SUPPORT_AVX512F)
CHECK_C_COMPILER_FLAG("-mavx512vbmi" COMPILER_SUPPORT_AVX512BMI)
IF (COMPILER_SUPPORT_SSE42)
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -msse4.2")
@ -168,7 +170,13 @@ ELSE ()
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -mavx2")
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -mavx2")
ENDIF()
MESSAGE(STATUS "SIMD instructions (FMA/AVX/AVX2) is ACTIVATED")
MESSAGE(STATUS "SIMD instructions (FMA/AVX/AVX2/AVX512) is ACTIVATED")
IF (COMPILER_SUPPORT_AVX512F AND COMPILER_SUPPORT_AVX512BMI)
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -mavx512f -mavx512vbmi")
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -mavx512f -mavx512vbmi")
MESSAGE(STATUS "avx512 supported by gcc")
ENDIF()
ENDIF()
# build mode

View File

@ -304,6 +304,7 @@ typedef struct SCheckpointInfo {
int64_t startTs;
int64_t checkpointId;
int64_t checkpointVer; // latest checkpointId version
int64_t processedVer; // already processed ver, that has generated results version.
int64_t nextProcessVer; // current offset in WAL, not serialize it
int64_t failedId; // record the latest failed checkpoint id
} SCheckpointInfo;

View File

@ -36,11 +36,12 @@ extern int64_t tsStreamMax;
extern float tsNumOfCores;
extern int64_t tsTotalMemoryKB;
extern char *tsProcPath;
extern char tsSIMDBuiltins;
extern char tsSIMDEnable;
extern char tsSSE42Enable;
extern char tsAVXEnable;
extern char tsAVX2Enable;
extern char tsFMAEnable;
extern char tsAVX512Enable;
extern char tsTagFilterCache;
extern char configDir[];

View File

@ -41,7 +41,7 @@ int32_t taosGetOsReleaseName(char *releaseName, char* sName, char* ver, int32_t
int32_t taosGetCpuInfo(char *cpuModel, int32_t maxLen, float *numOfCores);
int32_t taosGetCpuCores(float *numOfCores, bool physical);
void taosGetCpuUsage(double *cpu_system, double *cpu_engine);
int32_t taosGetCpuInstructions(char* sse42, char* avx, char* avx2, char* fma);
int32_t taosGetCpuInstructions(char* sse42, char* avx, char* avx2, char* fma, char* avx512);
int32_t taosGetTotalMemory(int64_t *totalKB);
int32_t taosGetProcMemory(int64_t *usedKB);
int32_t taosGetSysMemory(int64_t *usedKB);

View File

@ -494,7 +494,8 @@ static int32_t taosAddSystemCfg(SConfig *pCfg) {
if (cfgAddBool(pCfg, "avx", tsAVXEnable, CFG_SCOPE_BOTH) != 0) return -1;
if (cfgAddBool(pCfg, "avx2", tsAVX2Enable, CFG_SCOPE_BOTH) != 0) return -1;
if (cfgAddBool(pCfg, "fma", tsFMAEnable, CFG_SCOPE_BOTH) != 0) return -1;
if (cfgAddBool(pCfg, "simdEnable", tsSIMDBuiltins, CFG_SCOPE_BOTH) != 0) return -1;
if (cfgAddBool(pCfg, "avx512", tsAVX512Enable, CFG_SCOPE_BOTH) != 0) return -1;
if (cfgAddBool(pCfg, "simdEnable", tsSIMDEnable, CFG_SCOPE_BOTH) != 0) return -1;
if (cfgAddBool(pCfg, "tagFilterCache", tsTagFilterCache, CFG_SCOPE_BOTH) != 0) return -1;
if (cfgAddInt64(pCfg, "openMax", tsOpenMax, 0, INT64_MAX, CFG_SCOPE_BOTH) != 0) return -1;
@ -1023,7 +1024,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsNumOfSnodeWriteThreads = cfgGetItem(pCfg, "numOfSnodeUniqueThreads")->i32;
tsRpcQueueMemoryAllowed = cfgGetItem(pCfg, "rpcQueueMemoryAllowed")->i64;
tsSIMDBuiltins = (bool)cfgGetItem(pCfg, "simdEnable")->bval;
tsSIMDEnable = (bool)cfgGetItem(pCfg, "simdEnable")->bval;
tsTagFilterCache = (bool)cfgGetItem(pCfg, "tagFilterCache")->bval;
tsEnableMonitor = cfgGetItem(pCfg, "monitor")->bval;

View File

@ -2584,7 +2584,7 @@ int32_t doKillActiveCheckpointTrans(SMnode *pMnode) {
}
if (transId == 0) {
mError("failed to find the checkpoint trans, reset not executed");
mDebug("failed to find the checkpoint trans, reset not executed");
return TSDB_CODE_SUCCESS;
}

View File

@ -1663,7 +1663,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
SStreamCheckpointSourceReq req = {0};
if (!vnodeIsRoleLeader(pTq->pVnode)) {
tqDebug("vgId:%d not leader, ignore checkpoint-source msg", vgId);
tqDebug("vgId:%d not leader, ignore checkpoint-source msg, s-task:0x%x", vgId, req.taskId);
SRpcMsg rsp = {0};
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0);
tmsgSendRsp(&rsp); // error occurs
@ -1671,7 +1671,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
}
if (!pTq->pVnode->restored) {
tqDebug("vgId:%d checkpoint-source msg received during restoring, ignore it", vgId);
tqDebug("vgId:%d checkpoint-source msg received during restoring, s-task:0x%x ignore it", vgId, req.taskId);
SRpcMsg rsp = {0};
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0);
tmsgSendRsp(&rsp); // error occurs
@ -1691,7 +1691,6 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
}
tDecoderClear(&decoder);
// todo handle failure to reset from checkpoint procedure
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.taskId);
if (pTask == NULL) {
tqError("vgId:%d failed to find s-task:0x%x, ignore checkpoint msg. it may have been destroyed already", vgId,
@ -1702,7 +1701,6 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
return TSDB_CODE_SUCCESS;
}
// todo handle failure to reset from checkpoint procedure
// downstream not ready, current the stream tasks are not all ready. Ignore this checkpoint req.
if (pTask->status.downstreamReady != 1) {
pTask->chkInfo.failedId = req.checkpointId; // record the latest failed checkpoint id
@ -1723,17 +1721,32 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
ETaskStatus status = streamTaskGetStatus(pTask, NULL);
if (status == TASK_STATUS__HALT || status == TASK_STATUS__PAUSE) {
qError("s-task:%s not ready for checkpoint, since it is halt, ignore this checkpoint:%" PRId64 ", set it failure",
tqError("s-task:%s not ready for checkpoint, since it is halt, ignore this checkpoint:%" PRId64 ", set it failure",
pTask->id.idStr, req.checkpointId);
taosThreadMutexUnlock(&pTask->lock);
taosThreadMutexUnlock(&pTask->lock);
streamMetaReleaseTask(pMeta, pTask);
SRpcMsg rsp = {0};
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0);
tmsgSendRsp(&rsp); // error occurs
return TSDB_CODE_SUCCESS;
}
// check if the checkpoint msg already sent or not.
if (status == TASK_STATUS__CK) {
ASSERT(pTask->checkpointingId == req.checkpointId);
tqWarn("s-task:%s recv checkpoint-source msg again checkpointId:%" PRId64
" already received, ignore this msg and continue process checkpoint",
pTask->id.idStr, pTask->checkpointingId);
taosThreadMutexUnlock(&pTask->lock);
streamMetaReleaseTask(pMeta, pTask);
return TSDB_CODE_SUCCESS;
}
streamProcessCheckpointSourceReq(pTask, &req);
taosThreadMutexUnlock(&pTask->lock);
@ -1919,8 +1932,59 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
pMeta->startInfo.tasksWillRestart = 0;
streamMetaWUnLock(pMeta);
} else {
streamMetaWUnLock(pMeta);
tqDebug("vgId:%d all %d task(s) nodeEp updated and closed", vgId, numOfTasks);
#if 1
tqStartStreamTaskAsync(pTq, true);
streamMetaWUnLock(pMeta);
#else
streamMetaWUnLock(pMeta);
// For debug purpose.
// the following procedure consume many CPU resource, result in the re-election of leader
// with high probability. So we employ it as a test case for the stream processing framework, with
// checkpoint/restart/nodeUpdate etc.
while(1) {
int32_t startVal = atomic_val_compare_exchange_32(&pMeta->startInfo.taskStarting, 0, 1);
if (startVal == 0) {
break;
}
tqDebug("vgId:%d in start stream tasks procedure, wait for 500ms and recheck", vgId);
taosMsleep(500);
}
while (streamMetaTaskInTimer(pMeta)) {
tqDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId);
taosMsleep(100);
}
streamMetaWLock(pMeta);
int32_t code = streamMetaReopen(pMeta);
if (code != 0) {
tqError("vgId:%d failed to reopen stream meta", vgId);
streamMetaWUnLock(pMeta);
taosArrayDestroy(req.pNodeList);
return -1;
}
if (streamMetaLoadAllTasks(pTq->pStreamMeta) < 0) {
tqError("vgId:%d failed to load stream tasks", vgId);
streamMetaWUnLock(pMeta);
taosArrayDestroy(req.pNodeList);
return -1;
}
if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) {
tqInfo("vgId:%d start all stream tasks after all being updated", vgId);
tqResetStreamTaskStatus(pTq);
tqStartStreamTaskAsync(pTq, false);
} else {
tqInfo("vgId:%d, follower node not start stream tasks", vgId);
}
streamMetaWUnLock(pMeta);
#endif
}
}

View File

@ -302,7 +302,7 @@ int32_t tqResetStreamTaskStatus(STQ* pTq) {
int32_t vgId = TD_VID(pTq->pVnode);
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
tqDebug("vgId:%d start all %d stream task(s)", vgId, numOfTasks);
tqDebug("vgId:%d reset all %d stream task(s) status to be uninit", vgId, numOfTasks);
if (numOfTasks == 0) {
return TSDB_CODE_SUCCESS;
}
@ -451,7 +451,7 @@ bool doPutDataIntoInputQFromWal(SStreamTask* pTask, int64_t maxVer, int32_t* num
numOfNewItems += 1;
int64_t ver = walReaderGetCurrentVer(pTask->exec.pWalReader);
pTask->chkInfo.nextProcessVer = ver;
tqDebug("s-task:%s set the ver:%" PRId64 " from WALReader after extract block from WAL", id, ver);
tqDebug("s-task:%s set ver:%" PRId64 " for reader after extract data from WAL", id, ver);
bool itemInFillhistory = handleFillhistoryScanComplete(pTask, ver);
if (itemInFillhistory) {
@ -514,7 +514,6 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) {
int64_t maxVer = (pTask->info.fillHistory == 1) ? pTask->dataRange.range.maxVer : INT64_MAX;
taosThreadMutexLock(&pTask->lock);
tqDebug("s-task:%s lock", pTask->id.idStr);
char* p = NULL;
ETaskStatus status = streamTaskGetStatus(pTask, &p);

View File

@ -565,7 +565,7 @@ int32_t avgFunction(SqlFunctionCtx* pCtx) {
numOfElem = pInput->numOfRows;
pAvgRes->count += pInput->numOfRows;
bool simdAvailable = tsAVXEnable && tsSIMDBuiltins && (numOfRows > THRESHOLD_SIZE);
bool simdAvailable = tsAVXEnable && tsSIMDEnable && (numOfRows > THRESHOLD_SIZE);
switch(type) {
case TSDB_DATA_TYPE_UTINYINT:

View File

@ -370,7 +370,7 @@ static int32_t findFirstValPosition(const SColumnInfoData* pCol, int32_t start,
static void handleInt8Col(const void* data, int32_t start, int32_t numOfRows, SMinmaxResInfo* pBuf, bool isMinFunc,
bool signVal) {
// AVX2 version to speedup the loop
if (tsAVX2Enable && tsSIMDBuiltins) {
if (tsAVX2Enable && tsSIMDEnable) {
pBuf->v = i8VectorCmpAVX2(data, numOfRows, isMinFunc, signVal);
} else {
if (!pBuf->assign) {
@ -404,7 +404,7 @@ static void handleInt8Col(const void* data, int32_t start, int32_t numOfRows, SM
static void handleInt16Col(const void* data, int32_t start, int32_t numOfRows, SMinmaxResInfo* pBuf, bool isMinFunc,
bool signVal) {
// AVX2 version to speedup the loop
if (tsAVX2Enable && tsSIMDBuiltins) {
if (tsAVX2Enable && tsSIMDEnable) {
pBuf->v = i16VectorCmpAVX2(data, numOfRows, isMinFunc, signVal);
} else {
if (!pBuf->assign) {
@ -438,7 +438,7 @@ static void handleInt16Col(const void* data, int32_t start, int32_t numOfRows, S
static void handleInt32Col(const void* data, int32_t start, int32_t numOfRows, SMinmaxResInfo* pBuf, bool isMinFunc,
bool signVal) {
// AVX2 version to speedup the loop
if (tsAVX2Enable && tsSIMDBuiltins) {
if (tsAVX2Enable && tsSIMDEnable) {
pBuf->v = i32VectorCmpAVX2(data, numOfRows, isMinFunc, signVal);
} else {
if (!pBuf->assign) {
@ -502,7 +502,7 @@ static void handleFloatCol(SColumnInfoData* pCol, int32_t start, int32_t numOfRo
float* val = (float*)&pBuf->v;
// AVX version to speedup the loop
if (tsAVXEnable && tsSIMDBuiltins) {
if (tsAVXEnable && tsSIMDEnable) {
*val = floatVectorCmpAVX(pData, numOfRows, isMinFunc);
} else {
if (!pBuf->assign) {
@ -533,7 +533,7 @@ static void handleDoubleCol(SColumnInfoData* pCol, int32_t start, int32_t numOfR
double* val = (double*)&pBuf->v;
// AVX version to speedup the loop
if (tsAVXEnable && tsSIMDBuiltins) {
if (tsAVXEnable && tsSIMDEnable) {
*val = (double)doubleVectorCmpAVX(pData, numOfRows, isMinFunc);
} else {
if (!pBuf->assign) {

View File

@ -44,7 +44,7 @@ typedef struct {
int64_t defaultCfInit;
} SBackendWrapper;
void* streamBackendInit(const char* path, int64_t chkpId);
void* streamBackendInit(const char* path, int64_t chkpId, int32_t vgId);
void streamBackendCleanup(void* arg);
void streamBackendHandleCleanup(void* arg);
int32_t streamBackendLoadCheckpointInfo(void* pMeta);

View File

@ -469,11 +469,11 @@ int32_t rebuildDirFromCheckpoint(const char* path, int64_t chkpId, char** dst) {
return 0;
}
void* streamBackendInit(const char* streamPath, int64_t chkpId) {
void* streamBackendInit(const char* streamPath, int64_t chkpId, int32_t vgId) {
char* backendPath = NULL;
int32_t code = rebuildDirFromCheckpoint(streamPath, chkpId, &backendPath);
stDebug("start to init stream backend at %s, checkpointid: %" PRId64 "", backendPath, chkpId);
stDebug("start to init stream backend at %s, checkpointid: %" PRId64 " vgId:%d", backendPath, chkpId, vgId);
uint32_t dbMemLimit = nextPow2(tsMaxStreamBackendCache) << 20;
SBackendWrapper* pHandle = taosMemoryCalloc(1, sizeof(SBackendWrapper));
@ -534,7 +534,7 @@ void* streamBackendInit(const char* streamPath, int64_t chkpId) {
if (cfs != NULL) {
rocksdb_list_column_families_destroy(cfs, nCf);
}
stDebug("succ to init stream backend at %s, backend:%p", backendPath, pHandle);
stDebug("succ to init stream backend at %s, backend:%p, vgId:%d", backendPath, pHandle, vgId);
taosMemoryFreeClear(backendPath);
return (void*)pHandle;

View File

@ -297,9 +297,12 @@ int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) {
continue;
}
ASSERT(p->chkInfo.checkpointId < p->checkpointingId && p->checkpointingId == checkpointId);
ASSERT(p->chkInfo.checkpointId < p->checkpointingId && p->checkpointingId == checkpointId &&
p->chkInfo.checkpointVer <= p->chkInfo.processedVer);
p->chkInfo.checkpointId = p->checkpointingId;
p->chkInfo.checkpointVer = p->chkInfo.processedVer;
streamTaskClearCheckInfo(p);
char* str = NULL;

View File

@ -129,6 +129,7 @@ SStreamDataSubmit* streamDataSubmitNew(SPackedData* pData, int32_t type) {
void streamDataSubmitDestroy(SStreamDataSubmit* pDataSubmit) {
ASSERT(pDataSubmit->type == STREAM_INPUT__DATA_SUBMIT);
taosMemoryFree(pDataSubmit->submit.msgStr);
taosFreeQitem(pDataSubmit);
}
SStreamMergedSubmit* streamMergedSubmitNew() {
@ -208,12 +209,10 @@ void streamFreeQitem(SStreamQueueItem* data) {
if (type == STREAM_INPUT__GET_RES) {
blockDataDestroy(((SStreamTrigger*)data)->pBlock);
taosFreeQitem(data);
} else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE || type == STREAM_INPUT__TRANS_STATE) {
taosArrayDestroyEx(((SStreamDataBlock*)data)->blocks, (FDelete)blockDataFreeRes);
taosFreeQitem(data);
} else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE) {
destroyStreamDataBlock((SStreamDataBlock*)data);
} else if (type == STREAM_INPUT__DATA_SUBMIT) {
streamDataSubmitDestroy((SStreamDataSubmit*)data);
taosFreeQitem(data);
} else if (type == STREAM_INPUT__MERGED_SUBMIT) {
SStreamMergedSubmit* pMerge = (SStreamMergedSubmit*)data;
@ -228,7 +227,7 @@ void streamFreeQitem(SStreamQueueItem* data) {
SStreamRefDataBlock* pRefBlock = (SStreamRefDataBlock*)data;
blockDataDestroy(pRefBlock->pBlock);
taosFreeQitem(pRefBlock);
} else if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER) {
} else if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER || type == STREAM_INPUT__TRANS_STATE) {
SStreamDataBlock* pBlock = (SStreamDataBlock*) data;
taosArrayDestroyEx(pBlock->blocks, freeItems);
taosFreeQitem(pBlock);

View File

@ -593,7 +593,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
const SStreamQueueItem* pItem = pInput;
stDebug("s-task:%s start to process batch of blocks, num:%d, type:%d", id, numOfBlocks, pItem->type);
int64_t ver = pTask->chkInfo.checkpointVer;
int64_t ver = pTask->chkInfo.processedVer;
doSetStreamInputBlock(pTask, pInput, &ver, id);
int64_t resSize = 0;
@ -604,13 +604,16 @@ int32_t streamExecForAll(SStreamTask* pTask) {
stDebug("s-task:%s batch of input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d", id, el,
SIZE_IN_MiB(resSize), totalBlocks);
// update the currentVer if processing the submit blocks.
ASSERT(pTask->chkInfo.checkpointVer <= pTask->chkInfo.nextProcessVer && ver >= pTask->chkInfo.checkpointVer);
SCheckpointInfo* pInfo = &pTask->chkInfo;
if (ver != pTask->chkInfo.checkpointVer) {
stDebug("s-task:%s update checkpointVer(unsaved) from %" PRId64 " to %" PRId64 ", nextProcessVer:%" PRId64,
pTask->id.idStr, pTask->chkInfo.checkpointVer, ver, pTask->chkInfo.nextProcessVer);
pTask->chkInfo.checkpointVer = ver;
// update the currentVer if processing the submit blocks.
ASSERT(pInfo->checkpointVer <= pInfo->nextProcessVer && ver >= pInfo->checkpointVer);
if (ver != pInfo->processedVer) {
stDebug("s-task:%s update processedVer(unsaved) from %" PRId64 " to %" PRId64 " nextProcessVer:%" PRId64
" ckpt:%" PRId64,
pTask->id.idStr, pInfo->processedVer, ver, pInfo->nextProcessVer, pInfo->checkpointVer);
pInfo->processedVer = ver;
}
streamFreeQitem(pInput);

View File

@ -195,10 +195,10 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
taosInitRWLatch(&pMeta->chkpDirLock);
pMeta->chkpId = streamGetLatestCheckpointId(pMeta);
pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId);
pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId, vgId);
while (pMeta->streamBackend == NULL) {
taosMsleep(100);
pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId);
pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId, vgId);
if (pMeta->streamBackend == NULL) {
stInfo("vgId:%d failed to init stream backend, retry in 100ms", pMeta->vgId);
}
@ -263,7 +263,8 @@ int32_t streamMetaReopen(SStreamMeta* pMeta) {
}
}
while ((pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId)) == NULL) {
// todo: not wait in a critical region
while ((pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId, pMeta->vgId)) == NULL) {
stInfo("vgId:%d failed to init stream backend, retry in 100ms", pMeta->vgId);
taosMsleep(100);
}

View File

@ -270,7 +270,6 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem)
"s-task:%s inputQ is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) stop to push data",
pTask->id.idStr, STREAM_TASK_QUEUE_CAPACITY, STREAM_TASK_QUEUE_CAPACITY_IN_SIZE, total, size);
streamDataSubmitDestroy(px);
taosFreeQitem(pItem);
return -1;
}
@ -280,7 +279,6 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem)
int32_t code = taosWriteQitem(pQueue, pItem);
if (code != TSDB_CODE_SUCCESS) {
streamDataSubmitDestroy(px);
taosFreeQitem(pItem);
return code;
}
@ -296,13 +294,13 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem)
stTrace("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort",
pTask->id.idStr, STREAM_TASK_QUEUE_CAPACITY, STREAM_TASK_QUEUE_CAPACITY_IN_SIZE, total, size);
destroyStreamDataBlock((SStreamDataBlock*)pItem);
streamFreeQitem(pItem);
return -1;
}
int32_t code = taosWriteQitem(pQueue, pItem);
if (code != TSDB_CODE_SUCCESS) {
destroyStreamDataBlock((SStreamDataBlock*)pItem);
streamFreeQitem(pItem);
return code;
}
@ -312,7 +310,7 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem)
type == STREAM_INPUT__TRANS_STATE) {
int32_t code = taosWriteQitem(pQueue, pItem);
if (code != TSDB_CODE_SUCCESS) {
taosFreeQitem(pItem);
streamFreeQitem(pItem);
return code;
}
@ -323,7 +321,7 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem)
// use the default memory limit, refactor later.
int32_t code = taosWriteQitem(pQueue, pItem);
if (code != TSDB_CODE_SUCCESS) {
taosFreeQitem(pItem);
streamFreeQitem(pItem);
return code;
}

View File

@ -562,7 +562,6 @@ int32_t streamTaskPutTranstateIntoInputQ(SStreamTask* pTask) {
taosMemoryFree(pBlock);
if (streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pTranstate) < 0) {
taosFreeQitem(pTranstate);
return TSDB_CODE_OUT_OF_MEMORY;
}

View File

@ -431,8 +431,10 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
pTask->outputq.status = TASK_OUTPUT_STATUS__NORMAL;
pTask->pMeta = pMeta;
pTask->chkInfo.checkpointVer = ver - 1;
pTask->chkInfo.nextProcessVer = ver;
pTask->chkInfo.checkpointVer = ver - 1; // only update when generating checkpoint
pTask->chkInfo.processedVer = ver - 1; // already processed version
pTask->chkInfo.nextProcessVer = ver; // next processed version
pTask->dataRange.range.maxVer = ver;
pTask->dataRange.range.minVer = ver;
pTask->pMsgCb = pMsgCb;

View File

@ -12,7 +12,7 @@ class StreamStateEnv : public ::testing::Test {
protected:
virtual void SetUp() {
streamMetaInit();
backend = streamBackendInit(path, 0);
backend = streamBackendInit(path, 0, 0);
}
virtual void TearDown() {
streamMetaCleanup();

View File

@ -37,11 +37,12 @@ float tsNumOfCores = 0;
int64_t tsTotalMemoryKB = 0;
char *tsProcPath = NULL;
char tsSIMDBuiltins = 0;
char tsSIMDEnable = 0;
char tsSSE42Enable = 0;
char tsAVXEnable = 0;
char tsAVX2Enable = 0;
char tsFMAEnable = 0;
char tsAVX512Enable = 0;
void osDefaultInit() {
taosSeedRand(taosSafeRand());

View File

@ -250,7 +250,7 @@ void taosGetSystemInfo() {
taosGetCpuCores(&tsNumOfCores, false);
taosGetTotalMemory(&tsTotalMemoryKB);
taosGetCpuUsage(NULL, NULL);
taosGetCpuInstructions(&tsSSE42Enable, &tsAVXEnable, &tsAVX2Enable, &tsFMAEnable);
taosGetCpuInstructions(&tsSSE42Enable, &tsAVXEnable, &tsAVX2Enable, &tsFMAEnable, &tsAVX512Enable);
#endif
}
@ -602,7 +602,7 @@ void taosGetCpuUsage(double *cpu_system, double *cpu_engine) {
: "0"(level))
// todo add for windows and mac
int32_t taosGetCpuInstructions(char* sse42, char* avx, char* avx2, char* fma) {
int32_t taosGetCpuInstructions(char* sse42, char* avx, char* avx2, char* fma, char* avx512) {
#ifdef WINDOWS
#elif defined(_TD_DARWIN_64)
#else
@ -610,12 +610,6 @@ int32_t taosGetCpuInstructions(char* sse42, char* avx, char* avx2, char* fma) {
#ifdef _TD_X86_
// Since the compiler is not support avx/avx2 instructions, the global variables always need to be
// set to be false
//#if __AVX__ || __AVX2__
// tsSIMDBuiltins = true;
//#else
// tsSIMDBuiltins = false;
//#endif
uint32_t eax = 0, ebx = 0, ecx = 0, edx = 0;
int32_t ret = __get_cpuid(1, &eax, &ebx, &ecx, &edx);
@ -631,6 +625,7 @@ int32_t taosGetCpuInstructions(char* sse42, char* avx, char* avx2, char* fma) {
// Ref to https://gcc.gnu.org/bugzilla/show_bug.cgi?id=77756
__cpuid_fix(7u, eax, ebx, ecx, edx);
*avx2 = (char) ((ebx & bit_AVX2) == bit_AVX2);
*avx512 = (char)((ebx & bit_AVX512F) == bit_AVX512F);
#endif // _TD_X86_
#endif

View File

@ -283,7 +283,7 @@ int32_t tsDecompressINTImp(const char *const input, const int32_t nelements, cha
int32_t batch = num >> 2;
int32_t remain = num & 0x03;
if (selector == 0 || selector == 1) {
if (tsAVX2Enable && tsSIMDBuiltins) {
if (tsAVX2Enable && tsSIMDEnable) {
for (int32_t i = 0; i < batch; ++i) {
__m256i prev = _mm256_set1_epi64x(prev_value);
_mm256_storeu_si256((__m256i *)&p[_pos], prev);
@ -300,7 +300,7 @@ int32_t tsDecompressINTImp(const char *const input, const int32_t nelements, cha
}
}
} else {
if (tsAVX2Enable && tsSIMDBuiltins) {
if (tsAVX2Enable && tsSIMDEnable) {
__m256i base = _mm256_set1_epi64x(w);
__m256i maskVal = _mm256_set1_epi64x(mask);