Merge pull request #23646 from taosdata/fix/3_liaohj
fix: merge bugs fix patch from main.
This commit is contained in:
commit
08d9277113
|
@ -149,6 +149,8 @@ ELSE ()
|
||||||
CHECK_C_COMPILER_FLAG("-mfma" COMPILER_SUPPORT_FMA)
|
CHECK_C_COMPILER_FLAG("-mfma" COMPILER_SUPPORT_FMA)
|
||||||
CHECK_C_COMPILER_FLAG("-mavx" COMPILER_SUPPORT_AVX)
|
CHECK_C_COMPILER_FLAG("-mavx" COMPILER_SUPPORT_AVX)
|
||||||
CHECK_C_COMPILER_FLAG("-mavx2" COMPILER_SUPPORT_AVX2)
|
CHECK_C_COMPILER_FLAG("-mavx2" COMPILER_SUPPORT_AVX2)
|
||||||
|
CHECK_C_COMPILER_FLAG("-mavx512f" COMPILER_SUPPORT_AVX512F)
|
||||||
|
CHECK_C_COMPILER_FLAG("-mavx512vbmi" COMPILER_SUPPORT_AVX512BMI)
|
||||||
|
|
||||||
IF (COMPILER_SUPPORT_SSE42)
|
IF (COMPILER_SUPPORT_SSE42)
|
||||||
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -msse4.2")
|
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -msse4.2")
|
||||||
|
@ -168,7 +170,13 @@ ELSE ()
|
||||||
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -mavx2")
|
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -mavx2")
|
||||||
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -mavx2")
|
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -mavx2")
|
||||||
ENDIF()
|
ENDIF()
|
||||||
MESSAGE(STATUS "SIMD instructions (FMA/AVX/AVX2) is ACTIVATED")
|
MESSAGE(STATUS "SIMD instructions (FMA/AVX/AVX2/AVX512) is ACTIVATED")
|
||||||
|
|
||||||
|
IF (COMPILER_SUPPORT_AVX512F AND COMPILER_SUPPORT_AVX512BMI)
|
||||||
|
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -mavx512f -mavx512vbmi")
|
||||||
|
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -mavx512f -mavx512vbmi")
|
||||||
|
MESSAGE(STATUS "avx512 supported by gcc")
|
||||||
|
ENDIF()
|
||||||
ENDIF()
|
ENDIF()
|
||||||
|
|
||||||
# build mode
|
# build mode
|
||||||
|
|
|
@ -304,6 +304,7 @@ typedef struct SCheckpointInfo {
|
||||||
int64_t startTs;
|
int64_t startTs;
|
||||||
int64_t checkpointId;
|
int64_t checkpointId;
|
||||||
int64_t checkpointVer; // latest checkpointId version
|
int64_t checkpointVer; // latest checkpointId version
|
||||||
|
int64_t processedVer; // already processed ver, that has generated results version.
|
||||||
int64_t nextProcessVer; // current offset in WAL, not serialize it
|
int64_t nextProcessVer; // current offset in WAL, not serialize it
|
||||||
int64_t failedId; // record the latest failed checkpoint id
|
int64_t failedId; // record the latest failed checkpoint id
|
||||||
} SCheckpointInfo;
|
} SCheckpointInfo;
|
||||||
|
@ -460,7 +461,7 @@ typedef struct STaskStartInfo {
|
||||||
int32_t taskStarting; // restart flag, sentinel to guard the restart procedure.
|
int32_t taskStarting; // restart flag, sentinel to guard the restart procedure.
|
||||||
SHashObj* pReadyTaskSet; // tasks that are all ready for running stream processing
|
SHashObj* pReadyTaskSet; // tasks that are all ready for running stream processing
|
||||||
SHashObj* pFailedTaskSet; // tasks that are done the check downstream process, may be successful or failed
|
SHashObj* pFailedTaskSet; // tasks that are done the check downstream process, may be successful or failed
|
||||||
int32_t elapsedTime;
|
int64_t elapsedTime;
|
||||||
} STaskStartInfo;
|
} STaskStartInfo;
|
||||||
|
|
||||||
typedef struct STaskUpdateInfo {
|
typedef struct STaskUpdateInfo {
|
||||||
|
|
|
@ -36,11 +36,12 @@ extern int64_t tsStreamMax;
|
||||||
extern float tsNumOfCores;
|
extern float tsNumOfCores;
|
||||||
extern int64_t tsTotalMemoryKB;
|
extern int64_t tsTotalMemoryKB;
|
||||||
extern char *tsProcPath;
|
extern char *tsProcPath;
|
||||||
extern char tsSIMDBuiltins;
|
extern char tsSIMDEnable;
|
||||||
extern char tsSSE42Enable;
|
extern char tsSSE42Enable;
|
||||||
extern char tsAVXEnable;
|
extern char tsAVXEnable;
|
||||||
extern char tsAVX2Enable;
|
extern char tsAVX2Enable;
|
||||||
extern char tsFMAEnable;
|
extern char tsFMAEnable;
|
||||||
|
extern char tsAVX512Enable;
|
||||||
extern char tsTagFilterCache;
|
extern char tsTagFilterCache;
|
||||||
|
|
||||||
extern char configDir[];
|
extern char configDir[];
|
||||||
|
|
|
@ -41,7 +41,7 @@ int32_t taosGetOsReleaseName(char *releaseName, char* sName, char* ver, int32_t
|
||||||
int32_t taosGetCpuInfo(char *cpuModel, int32_t maxLen, float *numOfCores);
|
int32_t taosGetCpuInfo(char *cpuModel, int32_t maxLen, float *numOfCores);
|
||||||
int32_t taosGetCpuCores(float *numOfCores, bool physical);
|
int32_t taosGetCpuCores(float *numOfCores, bool physical);
|
||||||
void taosGetCpuUsage(double *cpu_system, double *cpu_engine);
|
void taosGetCpuUsage(double *cpu_system, double *cpu_engine);
|
||||||
int32_t taosGetCpuInstructions(char* sse42, char* avx, char* avx2, char* fma);
|
int32_t taosGetCpuInstructions(char* sse42, char* avx, char* avx2, char* fma, char* avx512);
|
||||||
int32_t taosGetTotalMemory(int64_t *totalKB);
|
int32_t taosGetTotalMemory(int64_t *totalKB);
|
||||||
int32_t taosGetProcMemory(int64_t *usedKB);
|
int32_t taosGetProcMemory(int64_t *usedKB);
|
||||||
int32_t taosGetSysMemory(int64_t *usedKB);
|
int32_t taosGetSysMemory(int64_t *usedKB);
|
||||||
|
|
|
@ -249,7 +249,7 @@ int32_t tsTransPullupInterval = 2;
|
||||||
int32_t tsMqRebalanceInterval = 2;
|
int32_t tsMqRebalanceInterval = 2;
|
||||||
int32_t tsStreamCheckpointInterval = 60;
|
int32_t tsStreamCheckpointInterval = 60;
|
||||||
float tsSinkDataRate = 2.0;
|
float tsSinkDataRate = 2.0;
|
||||||
int32_t tsStreamNodeCheckInterval = 30;
|
int32_t tsStreamNodeCheckInterval = 15;
|
||||||
int32_t tsTtlUnit = 86400;
|
int32_t tsTtlUnit = 86400;
|
||||||
int32_t tsTtlPushIntervalSec = 10;
|
int32_t tsTtlPushIntervalSec = 10;
|
||||||
int32_t tsTrimVDbIntervalSec = 60 * 60; // interval of trimming db in all vgroups
|
int32_t tsTrimVDbIntervalSec = 60 * 60; // interval of trimming db in all vgroups
|
||||||
|
@ -529,7 +529,8 @@ static int32_t taosAddSystemCfg(SConfig *pCfg) {
|
||||||
if (cfgAddBool(pCfg, "avx", tsAVXEnable, CFG_SCOPE_BOTH, CFG_DYN_NONE) != 0) return -1;
|
if (cfgAddBool(pCfg, "avx", tsAVXEnable, CFG_SCOPE_BOTH, CFG_DYN_NONE) != 0) return -1;
|
||||||
if (cfgAddBool(pCfg, "avx2", tsAVX2Enable, CFG_SCOPE_BOTH, CFG_DYN_NONE) != 0) return -1;
|
if (cfgAddBool(pCfg, "avx2", tsAVX2Enable, CFG_SCOPE_BOTH, CFG_DYN_NONE) != 0) return -1;
|
||||||
if (cfgAddBool(pCfg, "fma", tsFMAEnable, CFG_SCOPE_BOTH, CFG_DYN_NONE) != 0) return -1;
|
if (cfgAddBool(pCfg, "fma", tsFMAEnable, CFG_SCOPE_BOTH, CFG_DYN_NONE) != 0) return -1;
|
||||||
if (cfgAddBool(pCfg, "simdEnable", tsSIMDBuiltins, CFG_SCOPE_BOTH, CFG_DYN_NONE) != 0) return -1;
|
if (cfgAddBool(pCfg, "avx512", tsAVX512Enable, CFG_SCOPE_BOTH, CFG_DYN_NONE) != 0) return -1;
|
||||||
|
if (cfgAddBool(pCfg, "simdEnable", tsSIMDEnable, CFG_SCOPE_BOTH, CFG_DYN_NONE) != 0) return -1;
|
||||||
if (cfgAddBool(pCfg, "tagFilterCache", tsTagFilterCache, CFG_SCOPE_BOTH, CFG_DYN_NONE) != 0) return -1;
|
if (cfgAddBool(pCfg, "tagFilterCache", tsTagFilterCache, CFG_SCOPE_BOTH, CFG_DYN_NONE) != 0) return -1;
|
||||||
|
|
||||||
if (cfgAddInt64(pCfg, "openMax", tsOpenMax, 0, INT64_MAX, CFG_SCOPE_BOTH, CFG_DYN_NONE) != 0) return -1;
|
if (cfgAddInt64(pCfg, "openMax", tsOpenMax, 0, INT64_MAX, CFG_SCOPE_BOTH, CFG_DYN_NONE) != 0) return -1;
|
||||||
|
@ -1101,7 +1102,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
|
||||||
tsNumOfSnodeWriteThreads = cfgGetItem(pCfg, "numOfSnodeUniqueThreads")->i32;
|
tsNumOfSnodeWriteThreads = cfgGetItem(pCfg, "numOfSnodeUniqueThreads")->i32;
|
||||||
tsRpcQueueMemoryAllowed = cfgGetItem(pCfg, "rpcQueueMemoryAllowed")->i64;
|
tsRpcQueueMemoryAllowed = cfgGetItem(pCfg, "rpcQueueMemoryAllowed")->i64;
|
||||||
|
|
||||||
tsSIMDBuiltins = (bool)cfgGetItem(pCfg, "simdEnable")->bval;
|
tsSIMDEnable = (bool)cfgGetItem(pCfg, "simdEnable")->bval;
|
||||||
tsTagFilterCache = (bool)cfgGetItem(pCfg, "tagFilterCache")->bval;
|
tsTagFilterCache = (bool)cfgGetItem(pCfg, "tagFilterCache")->bval;
|
||||||
|
|
||||||
tsEnableMonitor = cfgGetItem(pCfg, "monitor")->bval;
|
tsEnableMonitor = cfgGetItem(pCfg, "monitor")->bval;
|
||||||
|
|
|
@ -43,7 +43,7 @@ typedef struct SNodeEntry {
|
||||||
} SNodeEntry;
|
} SNodeEntry;
|
||||||
|
|
||||||
typedef struct SStreamExecInfo {
|
typedef struct SStreamExecInfo {
|
||||||
SArray * pNodeEntryList;
|
SArray *pNodeList;
|
||||||
int64_t ts; // snapshot ts
|
int64_t ts; // snapshot ts
|
||||||
int64_t activeCheckpoint; // active check point id
|
int64_t activeCheckpoint; // active check point id
|
||||||
SHashObj * pTaskMap;
|
SHashObj * pTaskMap;
|
||||||
|
@ -850,7 +850,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
|
||||||
mndTransDrop(pTrans);
|
mndTransDrop(pTrans);
|
||||||
|
|
||||||
taosThreadMutexLock(&execInfo.lock);
|
taosThreadMutexLock(&execInfo.lock);
|
||||||
mDebug("register to stream task node list");
|
mDebug("stream tasks register into node list");
|
||||||
keepStreamTasksInBuf(&streamObj, &execInfo);
|
keepStreamTasksInBuf(&streamObj, &execInfo);
|
||||||
taosThreadMutexUnlock(&execInfo.lock);
|
taosThreadMutexUnlock(&execInfo.lock);
|
||||||
|
|
||||||
|
@ -1141,6 +1141,15 @@ static const char *mndGetStreamDB(SMnode *pMnode) {
|
||||||
return p;
|
return p;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t initStreamNodeList(SMnode* pMnode) {
|
||||||
|
if (execInfo.pNodeList == NULL || (taosArrayGetSize(execInfo.pNodeList) == 0)) {
|
||||||
|
execInfo.pNodeList = taosArrayDestroy(execInfo.pNodeList);
|
||||||
|
execInfo.pNodeList = extractNodeListFromStream(pMnode);
|
||||||
|
}
|
||||||
|
|
||||||
|
return taosArrayGetSize(execInfo.pNodeList);
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
|
static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
|
||||||
SMnode * pMnode = pReq->info.node;
|
SMnode * pMnode = pReq->info.node;
|
||||||
SSdb * pSdb = pMnode->pSdb;
|
SSdb * pSdb = pMnode->pSdb;
|
||||||
|
@ -1151,22 +1160,18 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
|
||||||
{ // check if the node update happens or not
|
{ // check if the node update happens or not
|
||||||
int64_t ts = taosGetTimestampSec();
|
int64_t ts = taosGetTimestampSec();
|
||||||
|
|
||||||
if (execInfo.pNodeEntryList == NULL || (taosArrayGetSize(execInfo.pNodeEntryList) == 0)) {
|
taosThreadMutexLock(&execInfo.lock);
|
||||||
if (execInfo.pNodeEntryList != NULL) {
|
int32_t numOfNodes = initStreamNodeList(pMnode);
|
||||||
execInfo.pNodeEntryList = taosArrayDestroy(execInfo.pNodeEntryList);
|
taosThreadMutexUnlock(&execInfo.lock);
|
||||||
}
|
|
||||||
|
|
||||||
execInfo.pNodeEntryList = extractNodeListFromStream(pMnode);
|
if (numOfNodes == 0) {
|
||||||
}
|
|
||||||
|
|
||||||
if (taosArrayGetSize(execInfo.pNodeEntryList) == 0) {
|
|
||||||
mDebug("stream task node change checking done, no vgroups exist, do nothing");
|
mDebug("stream task node change checking done, no vgroups exist, do nothing");
|
||||||
execInfo.ts = ts;
|
execInfo.ts = ts;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(execInfo.pNodeEntryList); ++i) {
|
for(int32_t i = 0; i < numOfNodes; ++i) {
|
||||||
SNodeEntry *pNodeEntry = taosArrayGet(execInfo.pNodeEntryList, i);
|
SNodeEntry* pNodeEntry = taosArrayGet(execInfo.pNodeList, i);
|
||||||
if (pNodeEntry->stageUpdated) {
|
if (pNodeEntry->stageUpdated) {
|
||||||
mDebug("stream task not ready due to node update detected, checkpoint not issued");
|
mDebug("stream task not ready due to node update detected, checkpoint not issued");
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -1180,7 +1185,7 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execInfo.pNodeEntryList, pNodeSnapshot);
|
SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, pNodeSnapshot);
|
||||||
bool nodeUpdated = (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0);
|
bool nodeUpdated = (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0);
|
||||||
taosArrayDestroy(changeInfo.pUpdateNodeList);
|
taosArrayDestroy(changeInfo.pUpdateNodeList);
|
||||||
taosHashCleanup(changeInfo.pDBMap);
|
taosHashCleanup(changeInfo.pDBMap);
|
||||||
|
@ -2095,20 +2100,21 @@ static SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
SNodeEntry entry = {0};
|
SNodeEntry entry = {.nodeId = pVgroup->vgId, .hbTimestamp = pVgroup->updateTime};
|
||||||
entry.epset = mndGetVgroupEpset(pMnode, pVgroup);
|
entry.epset = mndGetVgroupEpset(pMnode, pVgroup);
|
||||||
entry.nodeId = pVgroup->vgId;
|
|
||||||
entry.hbTimestamp = pVgroup->updateTime;
|
|
||||||
|
|
||||||
|
// if not all ready till now, no need to check the remaining vgroups.
|
||||||
if (*allReady) {
|
if (*allReady) {
|
||||||
for (int32_t i = 0; i < pVgroup->replica; ++i) {
|
for (int32_t i = 0; i < pVgroup->replica; ++i) {
|
||||||
if (!pVgroup->vnodeGid[i].syncRestore) {
|
if (!pVgroup->vnodeGid[i].syncRestore) {
|
||||||
|
mInfo("vgId:%d not restored, not ready for checkpoint or other operations", pVgroup->vgId);
|
||||||
*allReady = false;
|
*allReady = false;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
ESyncState state = pVgroup->vnodeGid[i].syncState;
|
ESyncState state = pVgroup->vnodeGid[i].syncState;
|
||||||
if (state == TAOS_SYNC_STATE_OFFLINE || state == TAOS_SYNC_STATE_ERROR) {
|
if (state == TAOS_SYNC_STATE_OFFLINE || state == TAOS_SYNC_STATE_ERROR) {
|
||||||
|
mInfo("vgId:%d offline/err, not ready for checkpoint or other operations", pVgroup->vgId);
|
||||||
*allReady = false;
|
*allReady = false;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -2314,9 +2320,9 @@ int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot) {
|
||||||
(int32_t)taosArrayGetSize(execInfo.pTaskList));
|
(int32_t)taosArrayGetSize(execInfo.pTaskList));
|
||||||
|
|
||||||
int32_t size = taosArrayGetSize(pNodeSnapshot);
|
int32_t size = taosArrayGetSize(pNodeSnapshot);
|
||||||
SArray *pValidNodeEntryList = taosArrayInit(4, sizeof(SNodeEntry));
|
SArray* pValidNodeEntryList = taosArrayInit(4, sizeof(SNodeEntry));
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(execInfo.pNodeEntryList); ++i) {
|
for(int32_t i = 0; i < taosArrayGetSize(execInfo.pNodeList); ++i) {
|
||||||
SNodeEntry *p = taosArrayGet(execInfo.pNodeEntryList, i);
|
SNodeEntry* p = taosArrayGet(execInfo.pNodeList, i);
|
||||||
|
|
||||||
for (int32_t j = 0; j < size; ++j) {
|
for (int32_t j = 0; j < size; ++j) {
|
||||||
SNodeEntry *pEntry = taosArrayGet(pNodeSnapshot, j);
|
SNodeEntry *pEntry = taosArrayGet(pNodeSnapshot, j);
|
||||||
|
@ -2327,8 +2333,8 @@ int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
execInfo.pNodeEntryList = taosArrayDestroy(execInfo.pNodeEntryList);
|
taosArrayDestroy(execInfo.pNodeList);
|
||||||
execInfo.pNodeEntryList = pValidNodeEntryList;
|
execInfo.pNodeList = pValidNodeEntryList;
|
||||||
|
|
||||||
mDebug("remain %d valid node entries", (int32_t)taosArrayGetSize(pValidNodeEntryList));
|
mDebug("remain %d valid node entries", (int32_t)taosArrayGetSize(pValidNodeEntryList));
|
||||||
taosArrayDestroy(pRemovedTasks);
|
taosArrayDestroy(pRemovedTasks);
|
||||||
|
@ -2338,6 +2344,7 @@ int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot) {
|
||||||
// this function runs by only one thread, so it is not multi-thread safe
|
// this function runs by only one thread, so it is not multi-thread safe
|
||||||
static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
|
static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
int32_t old = atomic_val_compare_exchange_32(&mndNodeCheckSentinel, 0, 1);
|
int32_t old = atomic_val_compare_exchange_32(&mndNodeCheckSentinel, 0, 1);
|
||||||
if (old != 0) {
|
if (old != 0) {
|
||||||
mDebug("still in checking node change");
|
mDebug("still in checking node change");
|
||||||
|
@ -2348,23 +2355,21 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
|
||||||
int64_t ts = taosGetTimestampSec();
|
int64_t ts = taosGetTimestampSec();
|
||||||
|
|
||||||
SMnode *pMnode = pMsg->info.node;
|
SMnode *pMnode = pMsg->info.node;
|
||||||
if (execInfo.pNodeEntryList == NULL || (taosArrayGetSize(execInfo.pNodeEntryList) == 0)) {
|
|
||||||
if (execInfo.pNodeEntryList != NULL) {
|
|
||||||
execInfo.pNodeEntryList = taosArrayDestroy(execInfo.pNodeEntryList);
|
|
||||||
}
|
|
||||||
execInfo.pNodeEntryList = extractNodeListFromStream(pMnode);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (taosArrayGetSize(execInfo.pNodeEntryList) == 0) {
|
taosThreadMutexLock(&execInfo.lock);
|
||||||
|
int32_t numOfNodes = initStreamNodeList(pMnode);
|
||||||
|
taosThreadMutexUnlock(&execInfo.lock);
|
||||||
|
|
||||||
|
if (numOfNodes == 0) {
|
||||||
mDebug("end to do stream task node change checking, no vgroup exists, do nothing");
|
mDebug("end to do stream task node change checking, no vgroup exists, do nothing");
|
||||||
execInfo.ts = ts;
|
execInfo.ts = ts;
|
||||||
atomic_store_32(&mndNodeCheckSentinel, 0);
|
atomic_store_32(&mndNodeCheckSentinel, 0);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool allVnodeReady = true;
|
bool allVgroupsReady = true;
|
||||||
SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode, &allVnodeReady);
|
SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode, &allVgroupsReady);
|
||||||
if (!allVnodeReady) {
|
if (!allVgroupsReady) {
|
||||||
taosArrayDestroy(pNodeSnapshot);
|
taosArrayDestroy(pNodeSnapshot);
|
||||||
atomic_store_32(&mndNodeCheckSentinel, 0);
|
atomic_store_32(&mndNodeCheckSentinel, 0);
|
||||||
mWarn("not all vnodes are ready, ignore the exec nodeUpdate check");
|
mWarn("not all vnodes are ready, ignore the exec nodeUpdate check");
|
||||||
|
@ -2374,7 +2379,7 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
|
||||||
taosThreadMutexLock(&execInfo.lock);
|
taosThreadMutexLock(&execInfo.lock);
|
||||||
removeExpirednodeEntryAndTask(pNodeSnapshot);
|
removeExpirednodeEntryAndTask(pNodeSnapshot);
|
||||||
|
|
||||||
SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execInfo.pNodeEntryList, pNodeSnapshot);
|
SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, pNodeSnapshot);
|
||||||
if (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0) {
|
if (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0) {
|
||||||
// kill current active checkpoint transaction, since the transaction is vnode wide.
|
// kill current active checkpoint transaction, since the transaction is vnode wide.
|
||||||
doKillActiveCheckpointTrans(pMnode);
|
doKillActiveCheckpointTrans(pMnode);
|
||||||
|
@ -2383,8 +2388,8 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
|
||||||
// keep the new vnode snapshot
|
// keep the new vnode snapshot
|
||||||
if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) {
|
if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) {
|
||||||
mDebug("create trans successfully, update cached node list");
|
mDebug("create trans successfully, update cached node list");
|
||||||
taosArrayDestroy(execInfo.pNodeEntryList);
|
taosArrayDestroy(execInfo.pNodeList);
|
||||||
execInfo.pNodeEntryList = pNodeSnapshot;
|
execInfo.pNodeList = pNodeSnapshot;
|
||||||
execInfo.ts = ts;
|
execInfo.ts = ts;
|
||||||
} else {
|
} else {
|
||||||
mDebug("unexpect code during create nodeUpdate trans, code:%s", tstrerror(code));
|
mDebug("unexpect code during create nodeUpdate trans, code:%s", tstrerror(code));
|
||||||
|
@ -2584,7 +2589,7 @@ int32_t doKillActiveCheckpointTrans(SMnode *pMnode) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (transId == 0) {
|
if (transId == 0) {
|
||||||
mError("failed to find the checkpoint trans, reset not executed");
|
mDebug("failed to find the checkpoint trans, reset not executed");
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2623,16 +2628,18 @@ int32_t mndResetFromCheckpoint(SMnode *pMnode) {
|
||||||
|
|
||||||
int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList) {
|
int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList) {
|
||||||
int32_t num = taosArrayGetSize(pNodeList);
|
int32_t num = taosArrayGetSize(pNodeList);
|
||||||
|
mInfo("set node expired for %d nodes", num);
|
||||||
|
|
||||||
for (int k = 0; k < num; ++k) {
|
for (int k = 0; k < num; ++k) {
|
||||||
int32_t *pVgId = taosArrayGet(pNodeList, k);
|
int32_t* pVgId = taosArrayGet(pNodeList, k);
|
||||||
|
mInfo("set node expired for nodeId:%d, total:%d", *pVgId, num);
|
||||||
|
|
||||||
int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeEntryList);
|
int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeList);
|
||||||
for (int i = 0; i < numOfNodes; ++i) {
|
for (int i = 0; i < numOfNodes; ++i) {
|
||||||
SNodeEntry *pNodeEntry = taosArrayGet(execInfo.pNodeEntryList, i);
|
SNodeEntry* pNodeEntry = taosArrayGet(execInfo.pNodeList, i);
|
||||||
|
|
||||||
if (pNodeEntry->nodeId == *pVgId) {
|
if (pNodeEntry->nodeId == *pVgId) {
|
||||||
mInfo("vgId:%d expired in stream task, needs update nodeEp", *pVgId);
|
mInfo("vgId:%d expired for some stream tasks, needs update nodeEp", *pVgId);
|
||||||
pNodeEntry->stageUpdated = true;
|
pNodeEntry->stageUpdated = true;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -2642,10 +2649,10 @@ int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void updateStageInfo(STaskStatusEntry *pTaskEntry, int32_t stage) {
|
static void updateStageInfo(STaskStatusEntry* pTaskEntry, int32_t stage) {
|
||||||
int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeEntryList);
|
int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeList);
|
||||||
for (int32_t j = 0; j < numOfNodes; ++j) {
|
for(int32_t j = 0; j < numOfNodes; ++j) {
|
||||||
SNodeEntry *pNodeEntry = taosArrayGet(execInfo.pNodeEntryList, j);
|
SNodeEntry* pNodeEntry = taosArrayGet(execInfo.pNodeList, j);
|
||||||
if (pNodeEntry->nodeId == pTaskEntry->nodeId) {
|
if (pNodeEntry->nodeId == pTaskEntry->nodeId) {
|
||||||
mInfo("vgId:%d stage updated from %d to %d, nodeUpdate trigger by s-task:0x%" PRIx64, pTaskEntry->nodeId,
|
mInfo("vgId:%d stage updated from %d to %d, nodeUpdate trigger by s-task:0x%" PRIx64, pTaskEntry->nodeId,
|
||||||
pTaskEntry->stage, stage, pTaskEntry->id.taskId);
|
pTaskEntry->stage, stage, pTaskEntry->id.taskId);
|
||||||
|
@ -2677,12 +2684,20 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
|
||||||
mTrace("receive stream-meta hb from vgId:%d, active numOfTasks:%d", req.vgId, req.numOfTasks);
|
mTrace("receive stream-meta hb from vgId:%d, active numOfTasks:%d", req.vgId, req.numOfTasks);
|
||||||
|
|
||||||
taosThreadMutexLock(&execInfo.lock);
|
taosThreadMutexLock(&execInfo.lock);
|
||||||
|
|
||||||
|
// extract stream task list
|
||||||
int32_t numOfExisted = taosHashGetSize(execInfo.pTaskMap);
|
int32_t numOfExisted = taosHashGetSize(execInfo.pTaskMap);
|
||||||
if (numOfExisted == 0) {
|
if (numOfExisted == 0) {
|
||||||
doExtractTasksFromStream(pMnode);
|
doExtractTasksFromStream(pMnode);
|
||||||
}
|
}
|
||||||
|
|
||||||
setNodeEpsetExpiredFlag(req.pUpdateNodes);
|
initStreamNodeList(pMnode);
|
||||||
|
|
||||||
|
int32_t numOfUpdated = taosArrayGetSize(req.pUpdateNodes);
|
||||||
|
if (numOfUpdated > 0) {
|
||||||
|
mDebug("%d stream node(s) need updated from report of hbMsg(vgId:%d)", numOfUpdated, req.vgId);
|
||||||
|
setNodeEpsetExpiredFlag(req.pUpdateNodes);
|
||||||
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < req.numOfTasks; ++i) {
|
for (int32_t i = 0; i < req.numOfTasks; ++i) {
|
||||||
STaskStatusEntry *p = taosArrayGet(req.pTaskStatus, i);
|
STaskStatusEntry *p = taosArrayGet(req.pTaskStatus, i);
|
||||||
|
|
|
@ -1668,7 +1668,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
|
||||||
|
|
||||||
SStreamCheckpointSourceReq req = {0};
|
SStreamCheckpointSourceReq req = {0};
|
||||||
if (!vnodeIsRoleLeader(pTq->pVnode)) {
|
if (!vnodeIsRoleLeader(pTq->pVnode)) {
|
||||||
tqDebug("vgId:%d not leader, ignore checkpoint-source msg", vgId);
|
tqDebug("vgId:%d not leader, ignore checkpoint-source msg, s-task:0x%x", vgId, req.taskId);
|
||||||
SRpcMsg rsp = {0};
|
SRpcMsg rsp = {0};
|
||||||
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0);
|
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0);
|
||||||
tmsgSendRsp(&rsp); // error occurs
|
tmsgSendRsp(&rsp); // error occurs
|
||||||
|
@ -1676,7 +1676,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!pTq->pVnode->restored) {
|
if (!pTq->pVnode->restored) {
|
||||||
tqDebug("vgId:%d checkpoint-source msg received during restoring, ignore it", vgId);
|
tqDebug("vgId:%d checkpoint-source msg received during restoring, s-task:0x%x ignore it", vgId, req.taskId);
|
||||||
SRpcMsg rsp = {0};
|
SRpcMsg rsp = {0};
|
||||||
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0);
|
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0);
|
||||||
tmsgSendRsp(&rsp); // error occurs
|
tmsgSendRsp(&rsp); // error occurs
|
||||||
|
@ -1696,7 +1696,6 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
|
||||||
}
|
}
|
||||||
tDecoderClear(&decoder);
|
tDecoderClear(&decoder);
|
||||||
|
|
||||||
// todo handle failure to reset from checkpoint procedure
|
|
||||||
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.taskId);
|
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.taskId);
|
||||||
if (pTask == NULL) {
|
if (pTask == NULL) {
|
||||||
tqError("vgId:%d failed to find s-task:0x%x, ignore checkpoint msg. it may have been destroyed already", vgId,
|
tqError("vgId:%d failed to find s-task:0x%x, ignore checkpoint msg. it may have been destroyed already", vgId,
|
||||||
|
@ -1707,7 +1706,6 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo handle failure to reset from checkpoint procedure
|
|
||||||
// downstream not ready, current the stream tasks are not all ready. Ignore this checkpoint req.
|
// downstream not ready, current the stream tasks are not all ready. Ignore this checkpoint req.
|
||||||
if (pTask->status.downstreamReady != 1) {
|
if (pTask->status.downstreamReady != 1) {
|
||||||
pTask->chkInfo.failedId = req.checkpointId; // record the latest failed checkpoint id
|
pTask->chkInfo.failedId = req.checkpointId; // record the latest failed checkpoint id
|
||||||
|
@ -1728,17 +1726,32 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
|
||||||
ETaskStatus status = streamTaskGetStatus(pTask, NULL);
|
ETaskStatus status = streamTaskGetStatus(pTask, NULL);
|
||||||
|
|
||||||
if (status == TASK_STATUS__HALT || status == TASK_STATUS__PAUSE) {
|
if (status == TASK_STATUS__HALT || status == TASK_STATUS__PAUSE) {
|
||||||
qError("s-task:%s not ready for checkpoint, since it is halt, ignore this checkpoint:%" PRId64 ", set it failure",
|
tqError("s-task:%s not ready for checkpoint, since it is halt, ignore this checkpoint:%" PRId64 ", set it failure",
|
||||||
pTask->id.idStr, req.checkpointId);
|
pTask->id.idStr, req.checkpointId);
|
||||||
taosThreadMutexUnlock(&pTask->lock);
|
|
||||||
|
|
||||||
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
|
|
||||||
SRpcMsg rsp = {0};
|
SRpcMsg rsp = {0};
|
||||||
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0);
|
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0);
|
||||||
tmsgSendRsp(&rsp); // error occurs
|
tmsgSendRsp(&rsp); // error occurs
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// check if the checkpoint msg already sent or not.
|
||||||
|
if (status == TASK_STATUS__CK) {
|
||||||
|
ASSERT(pTask->checkpointingId == req.checkpointId);
|
||||||
|
tqWarn("s-task:%s recv checkpoint-source msg again checkpointId:%" PRId64
|
||||||
|
" already received, ignore this msg and continue process checkpoint",
|
||||||
|
pTask->id.idStr, pTask->checkpointingId);
|
||||||
|
|
||||||
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
streamProcessCheckpointSourceReq(pTask, &req);
|
streamProcessCheckpointSourceReq(pTask, &req);
|
||||||
taosThreadMutexUnlock(&pTask->lock);
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
|
|
||||||
|
@ -1924,8 +1937,59 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
pMeta->startInfo.tasksWillRestart = 0;
|
pMeta->startInfo.tasksWillRestart = 0;
|
||||||
streamMetaWUnLock(pMeta);
|
streamMetaWUnLock(pMeta);
|
||||||
} else {
|
} else {
|
||||||
streamMetaWUnLock(pMeta);
|
tqDebug("vgId:%d all %d task(s) nodeEp updated and closed", vgId, numOfTasks);
|
||||||
|
|
||||||
|
#if 1
|
||||||
tqStartStreamTaskAsync(pTq, true);
|
tqStartStreamTaskAsync(pTq, true);
|
||||||
|
streamMetaWUnLock(pMeta);
|
||||||
|
#else
|
||||||
|
streamMetaWUnLock(pMeta);
|
||||||
|
|
||||||
|
// For debug purpose.
|
||||||
|
// the following procedure consume many CPU resource, result in the re-election of leader
|
||||||
|
// with high probability. So we employ it as a test case for the stream processing framework, with
|
||||||
|
// checkpoint/restart/nodeUpdate etc.
|
||||||
|
while(1) {
|
||||||
|
int32_t startVal = atomic_val_compare_exchange_32(&pMeta->startInfo.taskStarting, 0, 1);
|
||||||
|
if (startVal == 0) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
tqDebug("vgId:%d in start stream tasks procedure, wait for 500ms and recheck", vgId);
|
||||||
|
taosMsleep(500);
|
||||||
|
}
|
||||||
|
|
||||||
|
while (streamMetaTaskInTimer(pMeta)) {
|
||||||
|
tqDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId);
|
||||||
|
taosMsleep(100);
|
||||||
|
}
|
||||||
|
|
||||||
|
streamMetaWLock(pMeta);
|
||||||
|
|
||||||
|
int32_t code = streamMetaReopen(pMeta);
|
||||||
|
if (code != 0) {
|
||||||
|
tqError("vgId:%d failed to reopen stream meta", vgId);
|
||||||
|
streamMetaWUnLock(pMeta);
|
||||||
|
taosArrayDestroy(req.pNodeList);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (streamMetaLoadAllTasks(pTq->pStreamMeta) < 0) {
|
||||||
|
tqError("vgId:%d failed to load stream tasks", vgId);
|
||||||
|
streamMetaWUnLock(pMeta);
|
||||||
|
taosArrayDestroy(req.pNodeList);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) {
|
||||||
|
tqInfo("vgId:%d start all stream tasks after all being updated", vgId);
|
||||||
|
tqResetStreamTaskStatus(pTq);
|
||||||
|
tqStartStreamTaskAsync(pTq, false);
|
||||||
|
} else {
|
||||||
|
tqInfo("vgId:%d, follower node not start stream tasks", vgId);
|
||||||
|
}
|
||||||
|
streamMetaWUnLock(pMeta);
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -299,7 +299,7 @@ int32_t tqResetStreamTaskStatus(STQ* pTq) {
|
||||||
int32_t vgId = TD_VID(pTq->pVnode);
|
int32_t vgId = TD_VID(pTq->pVnode);
|
||||||
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
|
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
|
||||||
|
|
||||||
tqDebug("vgId:%d start all %d stream task(s)", vgId, numOfTasks);
|
tqDebug("vgId:%d reset all %d stream task(s) status to be uninit", vgId, numOfTasks);
|
||||||
if (numOfTasks == 0) {
|
if (numOfTasks == 0) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -448,7 +448,7 @@ bool doPutDataIntoInputQFromWal(SStreamTask* pTask, int64_t maxVer, int32_t* num
|
||||||
numOfNewItems += 1;
|
numOfNewItems += 1;
|
||||||
int64_t ver = walReaderGetCurrentVer(pTask->exec.pWalReader);
|
int64_t ver = walReaderGetCurrentVer(pTask->exec.pWalReader);
|
||||||
pTask->chkInfo.nextProcessVer = ver;
|
pTask->chkInfo.nextProcessVer = ver;
|
||||||
tqDebug("s-task:%s set the ver:%" PRId64 " from WALReader after extract block from WAL", id, ver);
|
tqDebug("s-task:%s set ver:%" PRId64 " for reader after extract data from WAL", id, ver);
|
||||||
|
|
||||||
bool itemInFillhistory = handleFillhistoryScanComplete(pTask, ver);
|
bool itemInFillhistory = handleFillhistoryScanComplete(pTask, ver);
|
||||||
if (itemInFillhistory) {
|
if (itemInFillhistory) {
|
||||||
|
|
|
@ -565,7 +565,7 @@ int32_t avgFunction(SqlFunctionCtx* pCtx) {
|
||||||
numOfElem = pInput->numOfRows;
|
numOfElem = pInput->numOfRows;
|
||||||
pAvgRes->count += pInput->numOfRows;
|
pAvgRes->count += pInput->numOfRows;
|
||||||
|
|
||||||
bool simdAvailable = tsAVXEnable && tsSIMDBuiltins && (numOfRows > THRESHOLD_SIZE);
|
bool simdAvailable = tsAVXEnable && tsSIMDEnable && (numOfRows > THRESHOLD_SIZE);
|
||||||
|
|
||||||
switch(type) {
|
switch(type) {
|
||||||
case TSDB_DATA_TYPE_UTINYINT:
|
case TSDB_DATA_TYPE_UTINYINT:
|
||||||
|
|
|
@ -370,7 +370,7 @@ static int32_t findFirstValPosition(const SColumnInfoData* pCol, int32_t start,
|
||||||
static void handleInt8Col(const void* data, int32_t start, int32_t numOfRows, SMinmaxResInfo* pBuf, bool isMinFunc,
|
static void handleInt8Col(const void* data, int32_t start, int32_t numOfRows, SMinmaxResInfo* pBuf, bool isMinFunc,
|
||||||
bool signVal) {
|
bool signVal) {
|
||||||
// AVX2 version to speedup the loop
|
// AVX2 version to speedup the loop
|
||||||
if (tsAVX2Enable && tsSIMDBuiltins) {
|
if (tsAVX2Enable && tsSIMDEnable) {
|
||||||
pBuf->v = i8VectorCmpAVX2(data, numOfRows, isMinFunc, signVal);
|
pBuf->v = i8VectorCmpAVX2(data, numOfRows, isMinFunc, signVal);
|
||||||
} else {
|
} else {
|
||||||
if (!pBuf->assign) {
|
if (!pBuf->assign) {
|
||||||
|
@ -404,7 +404,7 @@ static void handleInt8Col(const void* data, int32_t start, int32_t numOfRows, SM
|
||||||
static void handleInt16Col(const void* data, int32_t start, int32_t numOfRows, SMinmaxResInfo* pBuf, bool isMinFunc,
|
static void handleInt16Col(const void* data, int32_t start, int32_t numOfRows, SMinmaxResInfo* pBuf, bool isMinFunc,
|
||||||
bool signVal) {
|
bool signVal) {
|
||||||
// AVX2 version to speedup the loop
|
// AVX2 version to speedup the loop
|
||||||
if (tsAVX2Enable && tsSIMDBuiltins) {
|
if (tsAVX2Enable && tsSIMDEnable) {
|
||||||
pBuf->v = i16VectorCmpAVX2(data, numOfRows, isMinFunc, signVal);
|
pBuf->v = i16VectorCmpAVX2(data, numOfRows, isMinFunc, signVal);
|
||||||
} else {
|
} else {
|
||||||
if (!pBuf->assign) {
|
if (!pBuf->assign) {
|
||||||
|
@ -438,7 +438,7 @@ static void handleInt16Col(const void* data, int32_t start, int32_t numOfRows, S
|
||||||
static void handleInt32Col(const void* data, int32_t start, int32_t numOfRows, SMinmaxResInfo* pBuf, bool isMinFunc,
|
static void handleInt32Col(const void* data, int32_t start, int32_t numOfRows, SMinmaxResInfo* pBuf, bool isMinFunc,
|
||||||
bool signVal) {
|
bool signVal) {
|
||||||
// AVX2 version to speedup the loop
|
// AVX2 version to speedup the loop
|
||||||
if (tsAVX2Enable && tsSIMDBuiltins) {
|
if (tsAVX2Enable && tsSIMDEnable) {
|
||||||
pBuf->v = i32VectorCmpAVX2(data, numOfRows, isMinFunc, signVal);
|
pBuf->v = i32VectorCmpAVX2(data, numOfRows, isMinFunc, signVal);
|
||||||
} else {
|
} else {
|
||||||
if (!pBuf->assign) {
|
if (!pBuf->assign) {
|
||||||
|
@ -502,7 +502,7 @@ static void handleFloatCol(SColumnInfoData* pCol, int32_t start, int32_t numOfRo
|
||||||
float* val = (float*)&pBuf->v;
|
float* val = (float*)&pBuf->v;
|
||||||
|
|
||||||
// AVX version to speedup the loop
|
// AVX version to speedup the loop
|
||||||
if (tsAVXEnable && tsSIMDBuiltins) {
|
if (tsAVXEnable && tsSIMDEnable) {
|
||||||
*val = floatVectorCmpAVX(pData, numOfRows, isMinFunc);
|
*val = floatVectorCmpAVX(pData, numOfRows, isMinFunc);
|
||||||
} else {
|
} else {
|
||||||
if (!pBuf->assign) {
|
if (!pBuf->assign) {
|
||||||
|
@ -533,7 +533,7 @@ static void handleDoubleCol(SColumnInfoData* pCol, int32_t start, int32_t numOfR
|
||||||
double* val = (double*)&pBuf->v;
|
double* val = (double*)&pBuf->v;
|
||||||
|
|
||||||
// AVX version to speedup the loop
|
// AVX version to speedup the loop
|
||||||
if (tsAVXEnable && tsSIMDBuiltins) {
|
if (tsAVXEnable && tsSIMDEnable) {
|
||||||
*val = (double)doubleVectorCmpAVX(pData, numOfRows, isMinFunc);
|
*val = (double)doubleVectorCmpAVX(pData, numOfRows, isMinFunc);
|
||||||
} else {
|
} else {
|
||||||
if (!pBuf->assign) {
|
if (!pBuf->assign) {
|
||||||
|
|
|
@ -44,7 +44,7 @@ typedef struct {
|
||||||
int64_t defaultCfInit;
|
int64_t defaultCfInit;
|
||||||
} SBackendWrapper;
|
} SBackendWrapper;
|
||||||
|
|
||||||
void* streamBackendInit(const char* path, int64_t chkpId);
|
void* streamBackendInit(const char* path, int64_t chkpId, int32_t vgId);
|
||||||
void streamBackendCleanup(void* arg);
|
void streamBackendCleanup(void* arg);
|
||||||
void streamBackendHandleCleanup(void* arg);
|
void streamBackendHandleCleanup(void* arg);
|
||||||
int32_t streamBackendLoadCheckpointInfo(void* pMeta);
|
int32_t streamBackendLoadCheckpointInfo(void* pMeta);
|
||||||
|
|
|
@ -469,11 +469,11 @@ int32_t rebuildDirFromCheckpoint(const char* path, int64_t chkpId, char** dst) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void* streamBackendInit(const char* streamPath, int64_t chkpId) {
|
void* streamBackendInit(const char* streamPath, int64_t chkpId, int32_t vgId) {
|
||||||
char* backendPath = NULL;
|
char* backendPath = NULL;
|
||||||
int32_t code = rebuildDirFromCheckpoint(streamPath, chkpId, &backendPath);
|
int32_t code = rebuildDirFromCheckpoint(streamPath, chkpId, &backendPath);
|
||||||
|
|
||||||
stDebug("start to init stream backend at %s, checkpointid: %" PRId64 "", backendPath, chkpId);
|
stDebug("start to init stream backend at %s, checkpointid: %" PRId64 " vgId:%d", backendPath, chkpId, vgId);
|
||||||
|
|
||||||
uint32_t dbMemLimit = nextPow2(tsMaxStreamBackendCache) << 20;
|
uint32_t dbMemLimit = nextPow2(tsMaxStreamBackendCache) << 20;
|
||||||
SBackendWrapper* pHandle = taosMemoryCalloc(1, sizeof(SBackendWrapper));
|
SBackendWrapper* pHandle = taosMemoryCalloc(1, sizeof(SBackendWrapper));
|
||||||
|
@ -534,7 +534,7 @@ void* streamBackendInit(const char* streamPath, int64_t chkpId) {
|
||||||
if (cfs != NULL) {
|
if (cfs != NULL) {
|
||||||
rocksdb_list_column_families_destroy(cfs, nCf);
|
rocksdb_list_column_families_destroy(cfs, nCf);
|
||||||
}
|
}
|
||||||
stDebug("succ to init stream backend at %s, backend:%p", backendPath, pHandle);
|
stDebug("succ to init stream backend at %s, backend:%p, vgId:%d", backendPath, pHandle, vgId);
|
||||||
taosMemoryFreeClear(backendPath);
|
taosMemoryFreeClear(backendPath);
|
||||||
|
|
||||||
return (void*)pHandle;
|
return (void*)pHandle;
|
||||||
|
|
|
@ -299,9 +299,12 @@ int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
ASSERT(p->chkInfo.checkpointId < p->checkpointingId && p->checkpointingId == checkpointId);
|
ASSERT(p->chkInfo.checkpointId < p->checkpointingId && p->checkpointingId == checkpointId &&
|
||||||
|
p->chkInfo.checkpointVer <= p->chkInfo.processedVer);
|
||||||
|
|
||||||
p->chkInfo.checkpointId = p->checkpointingId;
|
p->chkInfo.checkpointId = p->checkpointingId;
|
||||||
|
p->chkInfo.checkpointVer = p->chkInfo.processedVer;
|
||||||
|
|
||||||
streamTaskClearCheckInfo(p);
|
streamTaskClearCheckInfo(p);
|
||||||
|
|
||||||
char* str = NULL;
|
char* str = NULL;
|
||||||
|
|
|
@ -129,6 +129,7 @@ SStreamDataSubmit* streamDataSubmitNew(SPackedData* pData, int32_t type) {
|
||||||
void streamDataSubmitDestroy(SStreamDataSubmit* pDataSubmit) {
|
void streamDataSubmitDestroy(SStreamDataSubmit* pDataSubmit) {
|
||||||
ASSERT(pDataSubmit->type == STREAM_INPUT__DATA_SUBMIT);
|
ASSERT(pDataSubmit->type == STREAM_INPUT__DATA_SUBMIT);
|
||||||
taosMemoryFree(pDataSubmit->submit.msgStr);
|
taosMemoryFree(pDataSubmit->submit.msgStr);
|
||||||
|
taosFreeQitem(pDataSubmit);
|
||||||
}
|
}
|
||||||
|
|
||||||
SStreamMergedSubmit* streamMergedSubmitNew() {
|
SStreamMergedSubmit* streamMergedSubmitNew() {
|
||||||
|
@ -208,12 +209,10 @@ void streamFreeQitem(SStreamQueueItem* data) {
|
||||||
if (type == STREAM_INPUT__GET_RES) {
|
if (type == STREAM_INPUT__GET_RES) {
|
||||||
blockDataDestroy(((SStreamTrigger*)data)->pBlock);
|
blockDataDestroy(((SStreamTrigger*)data)->pBlock);
|
||||||
taosFreeQitem(data);
|
taosFreeQitem(data);
|
||||||
} else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE || type == STREAM_INPUT__TRANS_STATE) {
|
} else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE) {
|
||||||
taosArrayDestroyEx(((SStreamDataBlock*)data)->blocks, (FDelete)blockDataFreeRes);
|
destroyStreamDataBlock((SStreamDataBlock*)data);
|
||||||
taosFreeQitem(data);
|
|
||||||
} else if (type == STREAM_INPUT__DATA_SUBMIT) {
|
} else if (type == STREAM_INPUT__DATA_SUBMIT) {
|
||||||
streamDataSubmitDestroy((SStreamDataSubmit*)data);
|
streamDataSubmitDestroy((SStreamDataSubmit*)data);
|
||||||
taosFreeQitem(data);
|
|
||||||
} else if (type == STREAM_INPUT__MERGED_SUBMIT) {
|
} else if (type == STREAM_INPUT__MERGED_SUBMIT) {
|
||||||
SStreamMergedSubmit* pMerge = (SStreamMergedSubmit*)data;
|
SStreamMergedSubmit* pMerge = (SStreamMergedSubmit*)data;
|
||||||
|
|
||||||
|
@ -228,7 +227,7 @@ void streamFreeQitem(SStreamQueueItem* data) {
|
||||||
SStreamRefDataBlock* pRefBlock = (SStreamRefDataBlock*)data;
|
SStreamRefDataBlock* pRefBlock = (SStreamRefDataBlock*)data;
|
||||||
blockDataDestroy(pRefBlock->pBlock);
|
blockDataDestroy(pRefBlock->pBlock);
|
||||||
taosFreeQitem(pRefBlock);
|
taosFreeQitem(pRefBlock);
|
||||||
} else if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER) {
|
} else if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER || type == STREAM_INPUT__TRANS_STATE) {
|
||||||
SStreamDataBlock* pBlock = (SStreamDataBlock*) data;
|
SStreamDataBlock* pBlock = (SStreamDataBlock*) data;
|
||||||
taosArrayDestroyEx(pBlock->blocks, freeItems);
|
taosArrayDestroyEx(pBlock->blocks, freeItems);
|
||||||
taosFreeQitem(pBlock);
|
taosFreeQitem(pBlock);
|
||||||
|
|
|
@ -593,7 +593,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
|
||||||
const SStreamQueueItem* pItem = pInput;
|
const SStreamQueueItem* pItem = pInput;
|
||||||
stDebug("s-task:%s start to process batch of blocks, num:%d, type:%d", id, numOfBlocks, pItem->type);
|
stDebug("s-task:%s start to process batch of blocks, num:%d, type:%d", id, numOfBlocks, pItem->type);
|
||||||
|
|
||||||
int64_t ver = pTask->chkInfo.checkpointVer;
|
int64_t ver = pTask->chkInfo.processedVer;
|
||||||
doSetStreamInputBlock(pTask, pInput, &ver, id);
|
doSetStreamInputBlock(pTask, pInput, &ver, id);
|
||||||
|
|
||||||
int64_t resSize = 0;
|
int64_t resSize = 0;
|
||||||
|
@ -604,13 +604,16 @@ int32_t streamExecForAll(SStreamTask* pTask) {
|
||||||
stDebug("s-task:%s batch of input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d", id, el,
|
stDebug("s-task:%s batch of input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d", id, el,
|
||||||
SIZE_IN_MiB(resSize), totalBlocks);
|
SIZE_IN_MiB(resSize), totalBlocks);
|
||||||
|
|
||||||
// update the currentVer if processing the submit blocks.
|
SCheckpointInfo* pInfo = &pTask->chkInfo;
|
||||||
ASSERT(pTask->chkInfo.checkpointVer <= pTask->chkInfo.nextProcessVer && ver >= pTask->chkInfo.checkpointVer);
|
|
||||||
|
|
||||||
if (ver != pTask->chkInfo.checkpointVer) {
|
// update the currentVer if processing the submit blocks.
|
||||||
stDebug("s-task:%s update checkpointVer(unsaved) from %" PRId64 " to %" PRId64 ", nextProcessVer:%" PRId64,
|
ASSERT(pInfo->checkpointVer <= pInfo->nextProcessVer && ver >= pInfo->checkpointVer);
|
||||||
pTask->id.idStr, pTask->chkInfo.checkpointVer, ver, pTask->chkInfo.nextProcessVer);
|
|
||||||
pTask->chkInfo.checkpointVer = ver;
|
if (ver != pInfo->processedVer) {
|
||||||
|
stDebug("s-task:%s update processedVer(unsaved) from %" PRId64 " to %" PRId64 " nextProcessVer:%" PRId64
|
||||||
|
" ckpt:%" PRId64,
|
||||||
|
pTask->id.idStr, pInfo->processedVer, ver, pInfo->nextProcessVer, pInfo->checkpointVer);
|
||||||
|
pInfo->processedVer = ver;
|
||||||
}
|
}
|
||||||
|
|
||||||
streamFreeQitem(pInput);
|
streamFreeQitem(pInput);
|
||||||
|
|
|
@ -194,10 +194,10 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
|
||||||
taosInitRWLatch(&pMeta->chkpDirLock);
|
taosInitRWLatch(&pMeta->chkpDirLock);
|
||||||
|
|
||||||
pMeta->chkpId = streamMetaGetLatestCheckpointId(pMeta);
|
pMeta->chkpId = streamMetaGetLatestCheckpointId(pMeta);
|
||||||
pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId);
|
pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId, pMeta->vgId);
|
||||||
while (pMeta->streamBackend == NULL) {
|
while (pMeta->streamBackend == NULL) {
|
||||||
taosMsleep(100);
|
taosMsleep(100);
|
||||||
pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId);
|
pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId, vgId);
|
||||||
if (pMeta->streamBackend == NULL) {
|
if (pMeta->streamBackend == NULL) {
|
||||||
stInfo("vgId:%d failed to init stream backend, retry in 100ms", pMeta->vgId);
|
stInfo("vgId:%d failed to init stream backend, retry in 100ms", pMeta->vgId);
|
||||||
}
|
}
|
||||||
|
@ -262,7 +262,8 @@ int32_t streamMetaReopen(SStreamMeta* pMeta) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
while ((pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId)) == NULL) {
|
// todo: not wait in a critical region
|
||||||
|
while ((pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId, pMeta->vgId)) == NULL) {
|
||||||
stInfo("vgId:%d failed to init stream backend, retry in 100ms", pMeta->vgId);
|
stInfo("vgId:%d failed to init stream backend, retry in 100ms", pMeta->vgId);
|
||||||
taosMsleep(100);
|
taosMsleep(100);
|
||||||
}
|
}
|
||||||
|
@ -852,6 +853,37 @@ static void clearHbMsg(SStreamHbMsg* pMsg, SArray* pIdList) {
|
||||||
taosArrayDestroy(pIdList);
|
taosArrayDestroy(pIdList);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static bool existInHbMsg(SStreamHbMsg* pMsg, SDownstreamTaskEpset* pTaskEpset) {
|
||||||
|
int32_t numOfExisted = taosArrayGetSize(pMsg->pUpdateNodes);
|
||||||
|
for (int k = 0; k < numOfExisted; ++k) {
|
||||||
|
if (pTaskEpset->nodeId == *(int32_t*)taosArrayGet(pMsg->pUpdateNodes, k)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void addUpdateNodeIntoHbMsg(SStreamTask* pTask, SStreamHbMsg* pMsg) {
|
||||||
|
SStreamMeta* pMeta = pTask->pMeta;
|
||||||
|
|
||||||
|
taosThreadMutexLock(&pTask->lock);
|
||||||
|
|
||||||
|
int32_t num = taosArrayGetSize(pTask->outputInfo.pDownstreamUpdateList);
|
||||||
|
for (int j = 0; j < num; ++j) {
|
||||||
|
SDownstreamTaskEpset* pTaskEpset = taosArrayGet(pTask->outputInfo.pDownstreamUpdateList, j);
|
||||||
|
|
||||||
|
bool exist = existInHbMsg(pMsg, pTaskEpset);
|
||||||
|
if (!exist) {
|
||||||
|
taosArrayPush(pMsg->pUpdateNodes, &pTaskEpset->nodeId);
|
||||||
|
stDebug("vgId:%d nodeId:%d added into hb update list, total:%d", pMeta->vgId, pTaskEpset->nodeId,
|
||||||
|
(int32_t)taosArrayGetSize(pMsg->pUpdateNodes));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
taosArrayClear(pTask->outputInfo.pDownstreamUpdateList);
|
||||||
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
|
}
|
||||||
|
|
||||||
void metaHbToMnode(void* param, void* tmrId) {
|
void metaHbToMnode(void* param, void* tmrId) {
|
||||||
int64_t rid = *(int64_t*)param;
|
int64_t rid = *(int64_t*)param;
|
||||||
|
|
||||||
|
@ -947,28 +979,7 @@ void metaHbToMnode(void* param, void* tmrId) {
|
||||||
walReaderValidVersionRange((*pTask)->exec.pWalReader, &entry.verStart, &entry.verEnd);
|
walReaderValidVersionRange((*pTask)->exec.pWalReader, &entry.verStart, &entry.verEnd);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosThreadMutexLock(&(*pTask)->lock);
|
addUpdateNodeIntoHbMsg(*pTask, &hbMsg);
|
||||||
int32_t num = taosArrayGetSize((*pTask)->outputInfo.pDownstreamUpdateList);
|
|
||||||
for (int j = 0; j < num; ++j) {
|
|
||||||
int32_t* pNodeId = taosArrayGet((*pTask)->outputInfo.pDownstreamUpdateList, j);
|
|
||||||
|
|
||||||
bool exist = false;
|
|
||||||
int32_t numOfExisted = taosArrayGetSize(hbMsg.pUpdateNodes);
|
|
||||||
for (int k = 0; k < numOfExisted; ++k) {
|
|
||||||
if (*pNodeId == *(int32_t*)taosArrayGet(hbMsg.pUpdateNodes, k)) {
|
|
||||||
exist = true;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!exist) {
|
|
||||||
taosArrayPush(hbMsg.pUpdateNodes, pNodeId);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
taosArrayClear((*pTask)->outputInfo.pDownstreamUpdateList);
|
|
||||||
taosThreadMutexUnlock(&(*pTask)->lock);
|
|
||||||
|
|
||||||
taosArrayPush(hbMsg.pTaskStatus, &entry);
|
taosArrayPush(hbMsg.pTaskStatus, &entry);
|
||||||
if (!hasMnodeEpset) {
|
if (!hasMnodeEpset) {
|
||||||
epsetAssign(&epset, &(*pTask)->info.mnodeEpset);
|
epsetAssign(&epset, &(*pTask)->info.mnodeEpset);
|
||||||
|
@ -1008,7 +1019,7 @@ void metaHbToMnode(void* param, void* tmrId) {
|
||||||
|
|
||||||
pMeta->pHbInfo->hbCount += 1;
|
pMeta->pHbInfo->hbCount += 1;
|
||||||
|
|
||||||
stDebug("vgId:%d, build and send hb to mnode, numOfTasks:%d total:%d", pMeta->vgId, hbMsg.numOfTasks,
|
stDebug("vgId:%d build and send hb to mnode, numOfTasks:%d total:%d", pMeta->vgId, hbMsg.numOfTasks,
|
||||||
pMeta->pHbInfo->hbCount);
|
pMeta->pHbInfo->hbCount);
|
||||||
tmsgSendReq(&epset, &msg);
|
tmsgSendReq(&epset, &msg);
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -270,7 +270,6 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem)
|
||||||
"s-task:%s inputQ is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) stop to push data",
|
"s-task:%s inputQ is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) stop to push data",
|
||||||
pTask->id.idStr, STREAM_TASK_QUEUE_CAPACITY, STREAM_TASK_QUEUE_CAPACITY_IN_SIZE, total, size);
|
pTask->id.idStr, STREAM_TASK_QUEUE_CAPACITY, STREAM_TASK_QUEUE_CAPACITY_IN_SIZE, total, size);
|
||||||
streamDataSubmitDestroy(px);
|
streamDataSubmitDestroy(px);
|
||||||
taosFreeQitem(pItem);
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -280,7 +279,6 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem)
|
||||||
int32_t code = taosWriteQitem(pQueue, pItem);
|
int32_t code = taosWriteQitem(pQueue, pItem);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
streamDataSubmitDestroy(px);
|
streamDataSubmitDestroy(px);
|
||||||
taosFreeQitem(pItem);
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -296,13 +294,13 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem)
|
||||||
|
|
||||||
stTrace("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort",
|
stTrace("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort",
|
||||||
pTask->id.idStr, STREAM_TASK_QUEUE_CAPACITY, STREAM_TASK_QUEUE_CAPACITY_IN_SIZE, total, size);
|
pTask->id.idStr, STREAM_TASK_QUEUE_CAPACITY, STREAM_TASK_QUEUE_CAPACITY_IN_SIZE, total, size);
|
||||||
destroyStreamDataBlock((SStreamDataBlock*)pItem);
|
streamFreeQitem(pItem);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t code = taosWriteQitem(pQueue, pItem);
|
int32_t code = taosWriteQitem(pQueue, pItem);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
destroyStreamDataBlock((SStreamDataBlock*)pItem);
|
streamFreeQitem(pItem);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -312,7 +310,7 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem)
|
||||||
type == STREAM_INPUT__TRANS_STATE) {
|
type == STREAM_INPUT__TRANS_STATE) {
|
||||||
int32_t code = taosWriteQitem(pQueue, pItem);
|
int32_t code = taosWriteQitem(pQueue, pItem);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
taosFreeQitem(pItem);
|
streamFreeQitem(pItem);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -323,7 +321,7 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem)
|
||||||
// use the default memory limit, refactor later.
|
// use the default memory limit, refactor later.
|
||||||
int32_t code = taosWriteQitem(pQueue, pItem);
|
int32_t code = taosWriteQitem(pQueue, pItem);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
taosFreeQitem(pItem);
|
streamFreeQitem(pItem);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -562,7 +562,6 @@ int32_t streamTaskPutTranstateIntoInputQ(SStreamTask* pTask) {
|
||||||
|
|
||||||
taosMemoryFree(pBlock);
|
taosMemoryFree(pBlock);
|
||||||
if (streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pTranstate) < 0) {
|
if (streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pTranstate) < 0) {
|
||||||
taosFreeQitem(pTranstate);
|
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1084,7 +1083,7 @@ int32_t streamMetaUpdateTaskDownstreamStatus(SStreamTask* pTask, int64_t startTs
|
||||||
int32_t numOfTotal = streamMetaGetNumOfTasks(pMeta);
|
int32_t numOfTotal = streamMetaGetNumOfTasks(pMeta);
|
||||||
|
|
||||||
if (taosHashGetSize(pStartInfo->pReadyTaskSet) + taosHashGetSize(pStartInfo->pFailedTaskSet) == numOfTotal) {
|
if (taosHashGetSize(pStartInfo->pReadyTaskSet) + taosHashGetSize(pStartInfo->pFailedTaskSet) == numOfTotal) {
|
||||||
pStartInfo->readyTs = pTask->execInfo.start;
|
pStartInfo->readyTs = taosGetTimestampMs();
|
||||||
pStartInfo->elapsedTime = (pStartInfo->startTs != 0) ? pStartInfo->readyTs - pStartInfo->startTs : 0;
|
pStartInfo->elapsedTime = (pStartInfo->startTs != 0) ? pStartInfo->readyTs - pStartInfo->startTs : 0;
|
||||||
|
|
||||||
stDebug("vgId:%d all %d task(s) check downstream completed, last completed task:%s level:%d, startTs:%" PRId64
|
stDebug("vgId:%d all %d task(s) check downstream completed, last completed task:%s level:%d, startTs:%" PRId64
|
||||||
|
|
|
@ -431,8 +431,10 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
|
||||||
pTask->outputq.status = TASK_OUTPUT_STATUS__NORMAL;
|
pTask->outputq.status = TASK_OUTPUT_STATUS__NORMAL;
|
||||||
pTask->pMeta = pMeta;
|
pTask->pMeta = pMeta;
|
||||||
|
|
||||||
pTask->chkInfo.checkpointVer = ver - 1;
|
pTask->chkInfo.checkpointVer = ver - 1; // only update when generating checkpoint
|
||||||
pTask->chkInfo.nextProcessVer = ver;
|
pTask->chkInfo.processedVer = ver - 1; // already processed version
|
||||||
|
|
||||||
|
pTask->chkInfo.nextProcessVer = ver; // next processed version
|
||||||
pTask->dataRange.range.maxVer = ver;
|
pTask->dataRange.range.maxVer = ver;
|
||||||
pTask->dataRange.range.minVer = ver;
|
pTask->dataRange.range.minVer = ver;
|
||||||
pTask->pMsgCb = pMsgCb;
|
pTask->pMsgCb = pMsgCb;
|
||||||
|
|
|
@ -12,7 +12,7 @@ class StreamStateEnv : public ::testing::Test {
|
||||||
protected:
|
protected:
|
||||||
virtual void SetUp() {
|
virtual void SetUp() {
|
||||||
streamMetaInit();
|
streamMetaInit();
|
||||||
backend = streamBackendInit(path, 0);
|
backend = streamBackendInit(path, 0, 0);
|
||||||
}
|
}
|
||||||
virtual void TearDown() {
|
virtual void TearDown() {
|
||||||
streamMetaCleanup();
|
streamMetaCleanup();
|
||||||
|
|
|
@ -37,11 +37,12 @@ float tsNumOfCores = 0;
|
||||||
int64_t tsTotalMemoryKB = 0;
|
int64_t tsTotalMemoryKB = 0;
|
||||||
char *tsProcPath = NULL;
|
char *tsProcPath = NULL;
|
||||||
|
|
||||||
char tsSIMDBuiltins = 0;
|
char tsSIMDEnable = 0;
|
||||||
char tsSSE42Enable = 0;
|
char tsSSE42Enable = 0;
|
||||||
char tsAVXEnable = 0;
|
char tsAVXEnable = 0;
|
||||||
char tsAVX2Enable = 0;
|
char tsAVX2Enable = 0;
|
||||||
char tsFMAEnable = 0;
|
char tsFMAEnable = 0;
|
||||||
|
char tsAVX512Enable = 0;
|
||||||
|
|
||||||
void osDefaultInit() {
|
void osDefaultInit() {
|
||||||
taosSeedRand(taosSafeRand());
|
taosSeedRand(taosSafeRand());
|
||||||
|
|
|
@ -250,7 +250,7 @@ void taosGetSystemInfo() {
|
||||||
taosGetCpuCores(&tsNumOfCores, false);
|
taosGetCpuCores(&tsNumOfCores, false);
|
||||||
taosGetTotalMemory(&tsTotalMemoryKB);
|
taosGetTotalMemory(&tsTotalMemoryKB);
|
||||||
taosGetCpuUsage(NULL, NULL);
|
taosGetCpuUsage(NULL, NULL);
|
||||||
taosGetCpuInstructions(&tsSSE42Enable, &tsAVXEnable, &tsAVX2Enable, &tsFMAEnable);
|
taosGetCpuInstructions(&tsSSE42Enable, &tsAVXEnable, &tsAVX2Enable, &tsFMAEnable, &tsAVX512Enable);
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -602,7 +602,7 @@ void taosGetCpuUsage(double *cpu_system, double *cpu_engine) {
|
||||||
: "0"(level))
|
: "0"(level))
|
||||||
|
|
||||||
// todo add for windows and mac
|
// todo add for windows and mac
|
||||||
int32_t taosGetCpuInstructions(char* sse42, char* avx, char* avx2, char* fma) {
|
int32_t taosGetCpuInstructions(char* sse42, char* avx, char* avx2, char* fma, char* avx512) {
|
||||||
#ifdef WINDOWS
|
#ifdef WINDOWS
|
||||||
#elif defined(_TD_DARWIN_64)
|
#elif defined(_TD_DARWIN_64)
|
||||||
#else
|
#else
|
||||||
|
@ -610,12 +610,6 @@ int32_t taosGetCpuInstructions(char* sse42, char* avx, char* avx2, char* fma) {
|
||||||
#ifdef _TD_X86_
|
#ifdef _TD_X86_
|
||||||
// Since the compiler is not support avx/avx2 instructions, the global variables always need to be
|
// Since the compiler is not support avx/avx2 instructions, the global variables always need to be
|
||||||
// set to be false
|
// set to be false
|
||||||
//#if __AVX__ || __AVX2__
|
|
||||||
// tsSIMDBuiltins = true;
|
|
||||||
//#else
|
|
||||||
// tsSIMDBuiltins = false;
|
|
||||||
//#endif
|
|
||||||
|
|
||||||
uint32_t eax = 0, ebx = 0, ecx = 0, edx = 0;
|
uint32_t eax = 0, ebx = 0, ecx = 0, edx = 0;
|
||||||
|
|
||||||
int32_t ret = __get_cpuid(1, &eax, &ebx, &ecx, &edx);
|
int32_t ret = __get_cpuid(1, &eax, &ebx, &ecx, &edx);
|
||||||
|
@ -631,6 +625,7 @@ int32_t taosGetCpuInstructions(char* sse42, char* avx, char* avx2, char* fma) {
|
||||||
// Ref to https://gcc.gnu.org/bugzilla/show_bug.cgi?id=77756
|
// Ref to https://gcc.gnu.org/bugzilla/show_bug.cgi?id=77756
|
||||||
__cpuid_fix(7u, eax, ebx, ecx, edx);
|
__cpuid_fix(7u, eax, ebx, ecx, edx);
|
||||||
*avx2 = (char) ((ebx & bit_AVX2) == bit_AVX2);
|
*avx2 = (char) ((ebx & bit_AVX2) == bit_AVX2);
|
||||||
|
*avx512 = (char)((ebx & bit_AVX512F) == bit_AVX512F);
|
||||||
#endif // _TD_X86_
|
#endif // _TD_X86_
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
|
|
@ -283,7 +283,7 @@ int32_t tsDecompressINTImp(const char *const input, const int32_t nelements, cha
|
||||||
int32_t batch = num >> 2;
|
int32_t batch = num >> 2;
|
||||||
int32_t remain = num & 0x03;
|
int32_t remain = num & 0x03;
|
||||||
if (selector == 0 || selector == 1) {
|
if (selector == 0 || selector == 1) {
|
||||||
if (tsAVX2Enable && tsSIMDBuiltins) {
|
if (tsAVX2Enable && tsSIMDEnable) {
|
||||||
for (int32_t i = 0; i < batch; ++i) {
|
for (int32_t i = 0; i < batch; ++i) {
|
||||||
__m256i prev = _mm256_set1_epi64x(prev_value);
|
__m256i prev = _mm256_set1_epi64x(prev_value);
|
||||||
_mm256_storeu_si256((__m256i *)&p[_pos], prev);
|
_mm256_storeu_si256((__m256i *)&p[_pos], prev);
|
||||||
|
@ -300,7 +300,7 @@ int32_t tsDecompressINTImp(const char *const input, const int32_t nelements, cha
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (tsAVX2Enable && tsSIMDBuiltins) {
|
if (tsAVX2Enable && tsSIMDEnable) {
|
||||||
__m256i base = _mm256_set1_epi64x(w);
|
__m256i base = _mm256_set1_epi64x(w);
|
||||||
__m256i maskVal = _mm256_set1_epi64x(mask);
|
__m256i maskVal = _mm256_set1_epi64x(mask);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue