Merge branch '3.0' of https://github.com/taosdata/TDengine into 3.0
This commit is contained in:
commit
bdfb8ab153
|
@ -169,11 +169,48 @@ ELSE ()
|
||||||
SET(COMPILER_SUPPORT_AVX512VL false)
|
SET(COMPILER_SUPPORT_AVX512VL false)
|
||||||
ELSE()
|
ELSE()
|
||||||
CHECK_C_COMPILER_FLAG("-mfma" COMPILER_SUPPORT_FMA)
|
CHECK_C_COMPILER_FLAG("-mfma" COMPILER_SUPPORT_FMA)
|
||||||
CHECK_C_COMPILER_FLAG("-mavx" COMPILER_SUPPORT_AVX)
|
|
||||||
CHECK_C_COMPILER_FLAG("-mavx2" COMPILER_SUPPORT_AVX2)
|
|
||||||
CHECK_C_COMPILER_FLAG("-mavx512f" COMPILER_SUPPORT_AVX512F)
|
CHECK_C_COMPILER_FLAG("-mavx512f" COMPILER_SUPPORT_AVX512F)
|
||||||
CHECK_C_COMPILER_FLAG("-mavx512vbmi" COMPILER_SUPPORT_AVX512BMI)
|
CHECK_C_COMPILER_FLAG("-mavx512vbmi" COMPILER_SUPPORT_AVX512BMI)
|
||||||
CHECK_C_COMPILER_FLAG("-mavx512vl" COMPILER_SUPPORT_AVX512VL)
|
CHECK_C_COMPILER_FLAG("-mavx512vl" COMPILER_SUPPORT_AVX512VL)
|
||||||
|
|
||||||
|
INCLUDE(CheckCSourceRuns)
|
||||||
|
SET(CMAKE_REQUIRED_FLAGS "-mavx")
|
||||||
|
check_c_source_runs("
|
||||||
|
#include <immintrin.h>
|
||||||
|
int main() {
|
||||||
|
__m256d a, b, c;
|
||||||
|
double buf[4] = {0};
|
||||||
|
a = _mm256_loadu_pd(buf);
|
||||||
|
b = _mm256_loadu_pd(buf);
|
||||||
|
c = _mm256_add_pd(a, b);
|
||||||
|
_mm256_storeu_pd(buf, c);
|
||||||
|
for (int i = 0; i < sizeof(buf) / sizeof(buf[0]); ++i) {
|
||||||
|
if (buf[i] != 0) {
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
" COMPILER_SUPPORT_AVX)
|
||||||
|
|
||||||
|
SET(CMAKE_REQUIRED_FLAGS "-mavx2")
|
||||||
|
check_c_source_runs("
|
||||||
|
#include <immintrin.h>
|
||||||
|
int main() {
|
||||||
|
__m256i a, b, c;
|
||||||
|
int buf[8] = {0};
|
||||||
|
a = _mm256_loadu_si256((__m256i *)buf);
|
||||||
|
b = _mm256_loadu_si256((__m256i *)buf);
|
||||||
|
c = _mm256_and_si256(a, b);
|
||||||
|
_mm256_storeu_si256((__m256i *)buf, c);
|
||||||
|
for (int i = 0; i < sizeof(buf) / sizeof(buf[0]); ++i) {
|
||||||
|
if (buf[i] != 0) {
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
" COMPILER_SUPPORT_AVX2)
|
||||||
ENDIF()
|
ENDIF()
|
||||||
|
|
||||||
IF (COMPILER_SUPPORT_SSE42)
|
IF (COMPILER_SUPPORT_SSE42)
|
||||||
|
|
|
@ -10,7 +10,7 @@ description: 3.3.3.0 版本说明
|
||||||
4. TDengine支持macOS企业版客户端 [企业版]
|
4. TDengine支持macOS企业版客户端 [企业版]
|
||||||
5. taosX日志默认不写入syslog [企业版]
|
5. taosX日志默认不写入syslog [企业版]
|
||||||
6. 服务端记录所有慢查询信息到log库
|
6. 服务端记录所有慢查询信息到log库
|
||||||
7. show cluster machines 查询结果中添加服务端版本号
|
7. show cluster machines 查询结果中添加服务端版本号 [企业版]
|
||||||
8. 删除保留关键字LEVEL/ENCODE/COMPRESS, 可以作为列名/表名/数据库名等使用
|
8. 删除保留关键字LEVEL/ENCODE/COMPRESS, 可以作为列名/表名/数据库名等使用
|
||||||
9. 禁止动态修改临时目录
|
9. 禁止动态修改临时目录
|
||||||
10. round 函数:支持四舍五入的精度
|
10. round 函数:支持四舍五入的精度
|
||||||
|
|
|
@ -754,7 +754,7 @@ int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta);
|
||||||
int32_t streamMetaAcquireTaskNoLock(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, SStreamTask** pTask);
|
int32_t streamMetaAcquireTaskNoLock(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, SStreamTask** pTask);
|
||||||
int32_t streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, SStreamTask** pTask);
|
int32_t streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, SStreamTask** pTask);
|
||||||
void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask);
|
void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask);
|
||||||
void streamMetaAcquireOneTask(SStreamTask* pTask);
|
int32_t streamMetaAcquireOneTask(SStreamTask* pTask);
|
||||||
void streamMetaClear(SStreamMeta* pMeta);
|
void streamMetaClear(SStreamMeta* pMeta);
|
||||||
void streamMetaInitBackend(SStreamMeta* pMeta);
|
void streamMetaInitBackend(SStreamMeta* pMeta);
|
||||||
int32_t streamMetaCommit(SStreamMeta* pMeta);
|
int32_t streamMetaCommit(SStreamMeta* pMeta);
|
||||||
|
|
|
@ -84,7 +84,7 @@ void taos_cleanup(void) {
|
||||||
taosCloseRef(id);
|
taosCloseRef(id);
|
||||||
|
|
||||||
nodesDestroyAllocatorSet();
|
nodesDestroyAllocatorSet();
|
||||||
cleanupAppInfo();
|
// cleanupAppInfo();
|
||||||
rpcCleanup();
|
rpcCleanup();
|
||||||
tscDebug("rpc cleanup");
|
tscDebug("rpc cleanup");
|
||||||
|
|
||||||
|
|
|
@ -2625,6 +2625,8 @@ int32_t dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf
|
||||||
uError("func %s failed to convert to ucs charset since %s", __func__, tstrerror(code));
|
uError("func %s failed to convert to ucs charset since %s", __func__, tstrerror(code));
|
||||||
lino = __LINE__;
|
lino = __LINE__;
|
||||||
goto _exit;
|
goto _exit;
|
||||||
|
} else { // reset the length value
|
||||||
|
code = TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
len += tsnprintf(dumpBuf + len, size - len, " %15s |", pBuf);
|
len += tsnprintf(dumpBuf + len, size - len, " %15s |", pBuf);
|
||||||
if (len >= size - 1) goto _exit;
|
if (len >= size - 1) goto _exit;
|
||||||
|
|
|
@ -74,10 +74,6 @@ void dmGetMonitorSystemInfo(SMonSysInfo *pInfo) {
|
||||||
}
|
}
|
||||||
pInfo->mem_total = tsTotalMemoryKB;
|
pInfo->mem_total = tsTotalMemoryKB;
|
||||||
pInfo->disk_engine = 0;
|
pInfo->disk_engine = 0;
|
||||||
code = osUpdate();
|
|
||||||
if (code != 0) {
|
|
||||||
dError("failed to update os info since %s", tstrerror(code));
|
|
||||||
}
|
|
||||||
pInfo->disk_used = tsDataSpace.size.used;
|
pInfo->disk_used = tsDataSpace.size.used;
|
||||||
pInfo->disk_total = tsDataSpace.size.total;
|
pInfo->disk_total = tsDataSpace.size.total;
|
||||||
code = taosGetCardInfoDelta(&pInfo->net_in, &pInfo->net_out);
|
code = taosGetCardInfoDelta(&pInfo->net_in, &pInfo->net_out);
|
||||||
|
|
|
@ -1811,6 +1811,7 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
mInfo("stream:%s,%" PRId64 " start to resume stream from pause", resumeReq.name, pStream->uid);
|
||||||
if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb) != 0) {
|
if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb) != 0) {
|
||||||
sdbRelease(pMnode->pSdb, pStream);
|
sdbRelease(pMnode->pSdb, pStream);
|
||||||
return -1;
|
return -1;
|
||||||
|
|
|
@ -61,7 +61,6 @@ static int32_t doSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTa
|
||||||
static int32_t doSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) {
|
static int32_t doSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) {
|
||||||
SVDropStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVDropStreamTaskReq));
|
SVDropStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVDropStreamTaskReq));
|
||||||
if (pReq == NULL) {
|
if (pReq == NULL) {
|
||||||
// terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -93,7 +92,6 @@ static int32_t doSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamTask *pT
|
||||||
if (pReq == NULL) {
|
if (pReq == NULL) {
|
||||||
mError("failed to malloc in resume stream, size:%" PRIzu ", code:%s", sizeof(SVResumeStreamTaskReq),
|
mError("failed to malloc in resume stream, size:%" PRIzu ", code:%s", sizeof(SVResumeStreamTaskReq),
|
||||||
tstrerror(TSDB_CODE_OUT_OF_MEMORY));
|
tstrerror(TSDB_CODE_OUT_OF_MEMORY));
|
||||||
// terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -106,19 +104,18 @@ static int32_t doSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamTask *pT
|
||||||
bool hasEpset = false;
|
bool hasEpset = false;
|
||||||
int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
|
int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
|
||||||
if (code != TSDB_CODE_SUCCESS || (!hasEpset)) {
|
if (code != TSDB_CODE_SUCCESS || (!hasEpset)) {
|
||||||
terrno = code;
|
|
||||||
taosMemoryFree(pReq);
|
taosMemoryFree(pReq);
|
||||||
return terrno;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
code = setTransAction(pTrans, pReq, sizeof(SVResumeStreamTaskReq), TDMT_STREAM_TASK_RESUME, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID);
|
code = setTransAction(pTrans, pReq, sizeof(SVResumeStreamTaskReq), TDMT_STREAM_TASK_RESUME, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
taosMemoryFree(pReq);
|
taosMemoryFree(pReq);
|
||||||
return terrno;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
mDebug("set the resume action for trans:%d", pTrans->id);
|
mDebug("set the resume action for trans:%d", pTrans->id);
|
||||||
return 0;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t doSetDropActionFromId(SMnode *pMnode, STrans *pTrans, SOrphanTask* pTask) {
|
static int32_t doSetDropActionFromId(SMnode *pMnode, STrans *pTrans, SOrphanTask* pTask) {
|
||||||
|
|
|
@ -692,7 +692,7 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen
|
||||||
STaskId id = {.streamId = pReq->streamId, .taskId = pReq->taskId};
|
STaskId id = {.streamId = pReq->streamId, .taskId = pReq->taskId};
|
||||||
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
||||||
if ((ppTask != NULL) && ((*ppTask) != NULL)) {
|
if ((ppTask != NULL) && ((*ppTask) != NULL)) {
|
||||||
streamMetaAcquireOneTask(*ppTask);
|
int32_t unusedRetRef = streamMetaAcquireOneTask(*ppTask);
|
||||||
SStreamTask* pTask = *ppTask;
|
SStreamTask* pTask = *ppTask;
|
||||||
|
|
||||||
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
|
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
|
||||||
|
@ -1119,10 +1119,6 @@ static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t
|
||||||
int32_t vgId = pMeta->vgId;
|
int32_t vgId = pMeta->vgId;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
if (pTask == NULL) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
streamTaskResume(pTask);
|
streamTaskResume(pTask);
|
||||||
ETaskStatus status = streamTaskGetStatus(pTask).state;
|
ETaskStatus status = streamTaskGetStatus(pTask).state;
|
||||||
|
|
||||||
|
@ -1150,7 +1146,6 @@ static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1173,6 +1168,7 @@ int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* m
|
||||||
|
|
||||||
code = tqProcessTaskResumeImpl(handle, pTask, sversion, pReq->igUntreated, fromVnode);
|
code = tqProcessTaskResumeImpl(handle, pTask, sversion, pReq->igUntreated, fromVnode);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1186,6 +1182,7 @@ int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* m
|
||||||
streamMutexUnlock(&pHTask->lock);
|
streamMutexUnlock(&pHTask->lock);
|
||||||
|
|
||||||
code = tqProcessTaskResumeImpl(handle, pHTask, sversion, pReq->igUntreated, fromVnode);
|
code = tqProcessTaskResumeImpl(handle, pHTask, sversion, pReq->igUntreated, fromVnode);
|
||||||
|
streamMetaReleaseTask(pMeta, pHTask);
|
||||||
}
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
|
|
|
@ -855,6 +855,7 @@ static int32_t loadFileBlockBrinInfo(STsdbReader* pReader, SArray* pIndexList, S
|
||||||
STableBlockScanInfo** p = taosArrayGetLast(pTableScanInfoList);
|
STableBlockScanInfo** p = taosArrayGetLast(pTableScanInfoList);
|
||||||
if (p == NULL) {
|
if (p == NULL) {
|
||||||
clearBrinBlockIter(&iter);
|
clearBrinBlockIter(&iter);
|
||||||
|
tsdbError("invalid param, empty in tablescanInfoList, %s", pReader->idStr);
|
||||||
return TSDB_CODE_INVALID_PARA;
|
return TSDB_CODE_INVALID_PARA;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -5256,7 +5257,7 @@ int32_t tsdbNextDataBlock2(STsdbReader* pReader, bool* hasNext) {
|
||||||
// NOTE: the following codes is used to perform test for suspend/resume for tsdbReader when it blocks the commit
|
// NOTE: the following codes is used to perform test for suspend/resume for tsdbReader when it blocks the commit
|
||||||
// the data should be ingested in round-robin and all the child tables should be createted before ingesting data
|
// the data should be ingested in round-robin and all the child tables should be createted before ingesting data
|
||||||
// the version range of query will be used to identify the correctness of suspend/resume functions.
|
// the version range of query will be used to identify the correctness of suspend/resume functions.
|
||||||
// this function will blocked before loading the SECOND block from vnode-buffer, and restart itself from sst-files
|
// this function will be blocked before loading the SECOND block from vnode-buffer, and restart itself from sst-files
|
||||||
#if SUSPEND_RESUME_TEST
|
#if SUSPEND_RESUME_TEST
|
||||||
if (!pReader->status.suspendInvoked && !pReader->status.loadFromFile) {
|
if (!pReader->status.suspendInvoked && !pReader->status.loadFromFile) {
|
||||||
tsem_wait(&pReader->resumeAfterSuspend);
|
tsem_wait(&pReader->resumeAfterSuspend);
|
||||||
|
@ -5909,6 +5910,7 @@ int32_t tsdbGetTableSchema(SMeta* pMeta, int64_t uid, STSchema** pSchema, int64_
|
||||||
} else if (mr.me.type == TSDB_NORMAL_TABLE) { // do nothing
|
} else if (mr.me.type == TSDB_NORMAL_TABLE) { // do nothing
|
||||||
} else {
|
} else {
|
||||||
code = TSDB_CODE_INVALID_PARA;
|
code = TSDB_CODE_INVALID_PARA;
|
||||||
|
tsdbError("invalid mr.me.type:%d, code:%s", mr.me.type, tstrerror(code));
|
||||||
metaReaderClear(&mr);
|
metaReaderClear(&mr);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
|
@ -633,47 +633,44 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case TDMT_STREAM_TASK_DEPLOY: {
|
case TDMT_STREAM_TASK_DEPLOY: {
|
||||||
int32_t code = tqProcessTaskDeployReq(pVnode->pTq, ver, pReq, len);
|
if ((code = tqProcessTaskDeployReq(pVnode->pTq, ver, pReq, len)) != TSDB_CODE_SUCCESS) {
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
terrno = code;
|
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
} break;
|
} break;
|
||||||
case TDMT_STREAM_TASK_DROP: {
|
case TDMT_STREAM_TASK_DROP: {
|
||||||
if (tqProcessTaskDropReq(pVnode->pTq, pMsg->pCont, pMsg->contLen) < 0) {
|
if ((code = tqProcessTaskDropReq(pVnode->pTq, pMsg->pCont, pMsg->contLen)) < 0) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
} break;
|
} break;
|
||||||
case TDMT_STREAM_TASK_UPDATE_CHKPT: {
|
case TDMT_STREAM_TASK_UPDATE_CHKPT: {
|
||||||
if (tqProcessTaskUpdateCheckpointReq(pVnode->pTq, pMsg->pCont, pMsg->contLen) < 0) {
|
if ((code = tqProcessTaskUpdateCheckpointReq(pVnode->pTq, pMsg->pCont, pMsg->contLen)) < 0) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
} break;
|
} break;
|
||||||
case TDMT_STREAM_CONSEN_CHKPT: {
|
case TDMT_STREAM_CONSEN_CHKPT: {
|
||||||
if (pVnode->restored) {
|
if (pVnode->restored && (code = tqProcessTaskConsenChkptIdReq(pVnode->pTq, pMsg)) < 0) {
|
||||||
if (tqProcessTaskConsenChkptIdReq(pVnode->pTq, pMsg) < 0) {
|
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
} break;
|
} break;
|
||||||
case TDMT_STREAM_TASK_PAUSE: {
|
case TDMT_STREAM_TASK_PAUSE: {
|
||||||
if (pVnode->restored && vnodeIsLeader(pVnode) &&
|
if (pVnode->restored && vnodeIsLeader(pVnode) &&
|
||||||
tqProcessTaskPauseReq(pVnode->pTq, ver, pMsg->pCont, pMsg->contLen) < 0) {
|
(code = tqProcessTaskPauseReq(pVnode->pTq, ver, pMsg->pCont, pMsg->contLen)) < 0) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
} break;
|
} break;
|
||||||
case TDMT_STREAM_TASK_RESUME: {
|
case TDMT_STREAM_TASK_RESUME: {
|
||||||
if (pVnode->restored && vnodeIsLeader(pVnode) &&
|
if (pVnode->restored && vnodeIsLeader(pVnode) &&
|
||||||
tqProcessTaskResumeReq(pVnode->pTq, ver, pMsg->pCont, pMsg->contLen) < 0) {
|
(code = tqProcessTaskResumeReq(pVnode->pTq, ver, pMsg->pCont, pMsg->contLen)) < 0) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
} break;
|
} break;
|
||||||
case TDMT_VND_STREAM_TASK_RESET: {
|
case TDMT_VND_STREAM_TASK_RESET: {
|
||||||
if (pVnode->restored && vnodeIsLeader(pVnode)) {
|
if (pVnode->restored && vnodeIsLeader(pVnode) &&
|
||||||
if (tqProcessTaskResetReq(pVnode->pTq, pMsg) < 0) {
|
(code = tqProcessTaskResetReq(pVnode->pTq, pMsg)) < 0) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
} break;
|
} break;
|
||||||
case TDMT_VND_ALTER_CONFIRM:
|
case TDMT_VND_ALTER_CONFIRM:
|
||||||
needCommit = pVnode->config.hashChange;
|
needCommit = pVnode->config.hashChange;
|
||||||
|
@ -693,10 +690,10 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg
|
||||||
case TDMT_VND_DROP_INDEX:
|
case TDMT_VND_DROP_INDEX:
|
||||||
vnodeProcessDropIndexReq(pVnode, ver, pReq, len, pRsp);
|
vnodeProcessDropIndexReq(pVnode, ver, pReq, len, pRsp);
|
||||||
break;
|
break;
|
||||||
case TDMT_VND_STREAM_CHECK_POINT_SOURCE:
|
case TDMT_VND_STREAM_CHECK_POINT_SOURCE: // always return true
|
||||||
tqProcessTaskCheckPointSourceReq(pVnode->pTq, pMsg, pRsp);
|
tqProcessTaskCheckPointSourceReq(pVnode->pTq, pMsg, pRsp);
|
||||||
break;
|
break;
|
||||||
case TDMT_VND_STREAM_TASK_UPDATE:
|
case TDMT_VND_STREAM_TASK_UPDATE: // always return true
|
||||||
tqProcessTaskUpdateReq(pVnode->pTq, pMsg);
|
tqProcessTaskUpdateReq(pVnode->pTq, pMsg);
|
||||||
break;
|
break;
|
||||||
case TDMT_VND_COMPACT:
|
case TDMT_VND_COMPACT:
|
||||||
|
@ -752,7 +749,7 @@ _exit:
|
||||||
|
|
||||||
_err:
|
_err:
|
||||||
vError("vgId:%d, process %s request failed since %s, ver:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType),
|
vError("vgId:%d, process %s request failed since %s, ver:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType),
|
||||||
tstrerror(terrno), ver);
|
tstrerror(code), ver);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -551,7 +551,7 @@ void appendTagFields(char* buf, int32_t* len, STableCfg* pCfg) {
|
||||||
(int32_t)((pSchema->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE));
|
(int32_t)((pSchema->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE));
|
||||||
}
|
}
|
||||||
|
|
||||||
*len += tsnprintf(buf + VARSTR_HEADER_SIZE + *len, sizeof(type) - (VARSTR_HEADER_SIZE + *len), "%s`%s` %s",
|
*len += tsnprintf(buf + VARSTR_HEADER_SIZE + *len, SHOW_CREATE_TB_RESULT_FIELD2_LEN - (VARSTR_HEADER_SIZE + *len), "%s`%s` %s",
|
||||||
((i > 0) ? ", " : ""), pSchema->name, type);
|
((i > 0) ? ", " : ""), pSchema->name, type);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -299,7 +299,7 @@ void streamTaskStartMonitorCheckRsp(SStreamTask* pTask) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*SStreamTask* p = */ streamMetaAcquireOneTask(pTask); // add task ref here
|
int32_t unusedRetRef = streamMetaAcquireOneTask(pTask); // add task ref here
|
||||||
streamTaskInitTaskCheckInfo(pInfo, &pTask->outputInfo, taosGetTimestampMs());
|
streamTaskInitTaskCheckInfo(pInfo, &pTask->outputInfo, taosGetTimestampMs());
|
||||||
|
|
||||||
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
|
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
|
||||||
|
|
|
@ -347,7 +347,8 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
|
||||||
if (old == 0) {
|
if (old == 0) {
|
||||||
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
|
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
|
||||||
stDebug("s-task:%s start checkpoint-trigger monitor in 10s, ref:%d ", pTask->id.idStr, ref);
|
stDebug("s-task:%s start checkpoint-trigger monitor in 10s, ref:%d ", pTask->id.idStr, ref);
|
||||||
streamMetaAcquireOneTask(pTask);
|
|
||||||
|
int32_t unusedRetRef = streamMetaAcquireOneTask(pTask);
|
||||||
streamTmrStart(checkpointTriggerMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId,
|
streamTmrStart(checkpointTriggerMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId,
|
||||||
"trigger-recv-monitor");
|
"trigger-recv-monitor");
|
||||||
pTmrInfo->launchChkptId = pActiveInfo->activeId;
|
pTmrInfo->launchChkptId = pActiveInfo->activeId;
|
||||||
|
|
|
@ -1162,7 +1162,7 @@ int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask) {
|
||||||
if (old == 0) {
|
if (old == 0) {
|
||||||
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
|
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
|
||||||
stDebug("s-task:%s start checkpoint-ready monitor in 10s, ref:%d ", pTask->id.idStr, ref);
|
stDebug("s-task:%s start checkpoint-ready monitor in 10s, ref:%d ", pTask->id.idStr, ref);
|
||||||
streamMetaAcquireOneTask(pTask);
|
int32_t unusedRetRef = streamMetaAcquireOneTask(pTask);
|
||||||
|
|
||||||
streamTmrStart(chkptReadyMsgSendMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId,
|
streamTmrStart(chkptReadyMsgSendMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId,
|
||||||
"chkpt-ready-monitor");
|
"chkpt-ready-monitor");
|
||||||
|
|
|
@ -753,12 +753,17 @@ int32_t streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t task
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
void streamMetaAcquireOneTask(SStreamTask* pTask) {
|
int32_t streamMetaAcquireOneTask(SStreamTask* pTask) {
|
||||||
int32_t ref = atomic_add_fetch_32(&pTask->refCnt, 1);
|
int32_t ref = atomic_add_fetch_32(&pTask->refCnt, 1);
|
||||||
stTrace("s-task:%s acquire task, ref:%d", pTask->id.idStr, ref);
|
stTrace("s-task:%s acquire task, ref:%d", pTask->id.idStr, ref);
|
||||||
|
return ref;
|
||||||
}
|
}
|
||||||
|
|
||||||
void streamMetaReleaseTask(SStreamMeta* UNUSED_PARAM(pMeta), SStreamTask* pTask) {
|
void streamMetaReleaseTask(SStreamMeta* UNUSED_PARAM(pMeta), SStreamTask* pTask) {
|
||||||
|
if (pTask == NULL) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t taskId = pTask->id.taskId;
|
int32_t taskId = pTask->id.taskId;
|
||||||
int32_t ref = atomic_sub_fetch_32(&pTask->refCnt, 1);
|
int32_t ref = atomic_sub_fetch_32(&pTask->refCnt, 1);
|
||||||
|
|
||||||
|
@ -862,7 +867,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
|
||||||
ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
||||||
if (ppTask) {
|
if (ppTask) {
|
||||||
pTask = *ppTask;
|
pTask = *ppTask;
|
||||||
// it is an fill-history task, remove the related stream task's id that points to it
|
// it is a fill-history task, remove the related stream task's id that points to it
|
||||||
if (pTask->info.fillHistory == 0) {
|
if (pTask->info.fillHistory == 0) {
|
||||||
int32_t ret = atomic_sub_fetch_32(&pMeta->numOfStreamTasks, 1);
|
int32_t ret = atomic_sub_fetch_32(&pMeta->numOfStreamTasks, 1);
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,7 +22,7 @@ static void streamTaskSchedHelper(void* param, void* tmrId);
|
||||||
void streamSetupScheduleTrigger(SStreamTask* pTask) {
|
void streamSetupScheduleTrigger(SStreamTask* pTask) {
|
||||||
int64_t delaySchema = pTask->info.delaySchedParam;
|
int64_t delaySchema = pTask->info.delaySchedParam;
|
||||||
if (delaySchema != 0 && pTask->info.fillHistory == 0) {
|
if (delaySchema != 0 && pTask->info.fillHistory == 0) {
|
||||||
int32_t ref = atomic_add_fetch_32(&pTask->refCnt, 1);
|
int32_t ref = streamMetaAcquireOneTask(pTask);
|
||||||
stDebug("s-task:%s setup scheduler trigger, ref:%d delay:%" PRId64 " ms", pTask->id.idStr, ref,
|
stDebug("s-task:%s setup scheduler trigger, ref:%d delay:%" PRId64 " ms", pTask->id.idStr, ref,
|
||||||
pTask->info.delaySchedParam);
|
pTask->info.delaySchedParam);
|
||||||
|
|
||||||
|
@ -63,7 +63,11 @@ int32_t streamTaskSchedTask(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int3
|
||||||
pRunReq->reqType = execType;
|
pRunReq->reqType = execType;
|
||||||
|
|
||||||
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)};
|
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)};
|
||||||
return tmsgPutToQueue(pMsgCb, STREAM_QUEUE, &msg);
|
int32_t code = tmsgPutToQueue(pMsgCb, STREAM_QUEUE, &msg);
|
||||||
|
if (code) {
|
||||||
|
stError("vgId:%d failed to put msg into stream queue, code:%s, %x", vgId, tstrerror(code), taskId);
|
||||||
|
}
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
void streamTaskClearSchedIdleInfo(SStreamTask* pTask) { pTask->status.schedIdleTime = 0; }
|
void streamTaskClearSchedIdleInfo(SStreamTask* pTask) { pTask->status.schedIdleTime = 0; }
|
||||||
|
@ -76,7 +80,7 @@ void streamTaskResumeInFuture(SStreamTask* pTask) {
|
||||||
pTask->status.schedIdleTime, ref);
|
pTask->status.schedIdleTime, ref);
|
||||||
|
|
||||||
// add one ref count for task
|
// add one ref count for task
|
||||||
streamMetaAcquireOneTask(pTask);
|
int32_t unusedRetRef = streamMetaAcquireOneTask(pTask);
|
||||||
streamTmrStart(streamTaskResumeHelper, pTask->status.schedIdleTime, pTask, streamTimer, &pTask->schedInfo.pIdleTimer,
|
streamTmrStart(streamTaskResumeHelper, pTask->status.schedIdleTime, pTask, streamTimer, &pTask->schedInfo.pIdleTimer,
|
||||||
pTask->pMeta->vgId, "resume-task-tmr");
|
pTask->pMeta->vgId, "resume-task-tmr");
|
||||||
}
|
}
|
||||||
|
|
|
@ -258,10 +258,12 @@ void tFreeStreamTask(SStreamTask* pTask) {
|
||||||
|
|
||||||
if (pTask->inputq.queue) {
|
if (pTask->inputq.queue) {
|
||||||
streamQueueClose(pTask->inputq.queue, pTask->id.taskId);
|
streamQueueClose(pTask->inputq.queue, pTask->id.taskId);
|
||||||
|
pTask->inputq.queue = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pTask->outputq.queue) {
|
if (pTask->outputq.queue) {
|
||||||
streamQueueClose(pTask->outputq.queue, pTask->id.taskId);
|
streamQueueClose(pTask->outputq.queue, pTask->id.taskId);
|
||||||
|
pTask->outputq.queue = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pTask->exec.qmsg) {
|
if (pTask->exec.qmsg) {
|
||||||
|
@ -275,6 +277,7 @@ void tFreeStreamTask(SStreamTask* pTask) {
|
||||||
|
|
||||||
if (pTask->exec.pWalReader != NULL) {
|
if (pTask->exec.pWalReader != NULL) {
|
||||||
walCloseReader(pTask->exec.pWalReader);
|
walCloseReader(pTask->exec.pWalReader);
|
||||||
|
pTask->exec.pWalReader = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
streamClearChkptReadyMsg(pTask->chkInfo.pActiveInfo);
|
streamClearChkptReadyMsg(pTask->chkInfo.pActiveInfo);
|
||||||
|
|
|
@ -501,9 +501,10 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even
|
||||||
if (pTrans == NULL) {
|
if (pTrans == NULL) {
|
||||||
ETaskStatus s = pSM->current.state;
|
ETaskStatus s = pSM->current.state;
|
||||||
|
|
||||||
if (s != TASK_STATUS__DROPPING && s != TASK_STATUS__PAUSE && s != TASK_STATUS__STOP &&
|
if (s != TASK_STATUS__DROPPING && s != TASK_STATUS__PAUSE && s != TASK_STATUS__STOP && s != TASK_STATUS__UNINIT &&
|
||||||
s != TASK_STATUS__UNINIT && s != TASK_STATUS__READY) {
|
s != TASK_STATUS__READY) {
|
||||||
stError("s-task:%s invalid task status:%s on handling event:%s success", id, pSM->current.name, GET_EVT_NAME(pSM->prev.evt));
|
stError("s-task:%s invalid task status:%s on handling event:%s success", id, pSM->current.name,
|
||||||
|
GET_EVT_NAME(pSM->prev.evt));
|
||||||
}
|
}
|
||||||
|
|
||||||
// the pSM->prev.evt may be 0, so print string is not appropriate.
|
// the pSM->prev.evt may be 0, so print string is not appropriate.
|
||||||
|
@ -521,11 +522,15 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even
|
||||||
return TSDB_CODE_STREAM_INVALID_STATETRANS;
|
return TSDB_CODE_STREAM_INVALID_STATETRANS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// repeat pause will not overwrite the previous pause state
|
||||||
|
if (pSM->current.state != TASK_STATUS__PAUSE || pTrans->next.state != TASK_STATUS__PAUSE) {
|
||||||
keepPrevInfo(pSM);
|
keepPrevInfo(pSM);
|
||||||
|
|
||||||
pSM->current = pTrans->next;
|
pSM->current = pTrans->next;
|
||||||
pSM->pActiveTrans = NULL;
|
} else {
|
||||||
|
stDebug("s-task:%s repeat pause evt recv, not update prev status", id);
|
||||||
|
}
|
||||||
|
|
||||||
|
pSM->pActiveTrans = NULL;
|
||||||
// todo remove it
|
// todo remove it
|
||||||
// todo: handle the error code
|
// todo: handle the error code
|
||||||
// on success callback, add into lock if necessary, or maybe we should add an option for this?
|
// on success callback, add into lock if necessary, or maybe we should add an option for this?
|
||||||
|
|
|
@ -56,7 +56,7 @@ void streamTmrStart(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* pParam, void*
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
stDebug("vgId:%d start %s tmr succ", vgId, pMsg);
|
stTrace("vgId:%d start %s tmr succ", vgId, pMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
void streamTmrStop(tmr_h tmrId) {
|
void streamTmrStop(tmr_h tmrId) {
|
||||||
|
|
|
@ -24,7 +24,6 @@ extern "C" {
|
||||||
|
|
||||||
#define TIMER_MAX_MS 0x7FFFFFFF
|
#define TIMER_MAX_MS 0x7FFFFFFF
|
||||||
#define PING_TIMER_MS 5000
|
#define PING_TIMER_MS 5000
|
||||||
#define HEARTBEAT_TICK_NUM 20
|
|
||||||
|
|
||||||
typedef struct SSyncEnv {
|
typedef struct SSyncEnv {
|
||||||
uint8_t isStart;
|
uint8_t isStart;
|
||||||
|
|
|
@ -977,9 +977,10 @@ static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
|
||||||
pData->logicClock = pSyncTimer->logicClock;
|
pData->logicClock = pSyncTimer->logicClock;
|
||||||
pData->execTime = tsNow + pSyncTimer->timerMS;
|
pData->execTime = tsNow + pSyncTimer->timerMS;
|
||||||
|
|
||||||
sTrace("vgId:%d, start hb timer, rid:%" PRId64 " addr:%" PRId64, pSyncNode->vgId, pData->rid, pData->destId.addr);
|
sTrace("vgId:%d, start hb timer, rid:%" PRId64 " addr:%" PRId64 " at %d", pSyncNode->vgId, pData->rid,
|
||||||
|
pData->destId.addr, pSyncTimer->timerMS);
|
||||||
|
|
||||||
TAOS_CHECK_RETURN(taosTmrReset(pSyncTimer->timerCb, pSyncTimer->timerMS / HEARTBEAT_TICK_NUM, (void*)(pData->rid),
|
TAOS_CHECK_RETURN(taosTmrReset(pSyncTimer->timerCb, pSyncTimer->timerMS, (void*)(pData->rid),
|
||||||
syncEnv()->pTimerManager, &pSyncTimer->pTimer));
|
syncEnv()->pTimerManager, &pSyncTimer->pTimer));
|
||||||
} else {
|
} else {
|
||||||
code = TSDB_CODE_SYN_INTERNAL_ERROR;
|
code = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||||
|
@ -2711,7 +2712,8 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
sTrace("vgId:%d, eq peer hb timer, rid:%" PRId64 " addr:%" PRId64, pSyncNode->vgId, hbDataRid, pData->destId.addr);
|
sTrace("vgId:%d, peer hb timer execution, rid:%" PRId64 " addr:%" PRId64, pSyncNode->vgId, hbDataRid,
|
||||||
|
pData->destId.addr);
|
||||||
|
|
||||||
if (pSyncNode->totalReplicaNum > 1) {
|
if (pSyncNode->totalReplicaNum > 1) {
|
||||||
int64_t timerLogicClock = atomic_load_64(&pSyncTimer->logicClock);
|
int64_t timerLogicClock = atomic_load_64(&pSyncTimer->logicClock);
|
||||||
|
@ -2753,13 +2755,12 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
|
||||||
if (ret != 0) {
|
if (ret != 0) {
|
||||||
sError("vgId:%d, failed to send heartbeat since %s", pSyncNode->vgId, tstrerror(ret));
|
sError("vgId:%d, failed to send heartbeat since %s", pSyncNode->vgId, tstrerror(ret));
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (syncIsInit()) {
|
if (syncIsInit()) {
|
||||||
// sTrace("vgId:%d, reset peer hb timer", pSyncNode->vgId);
|
sTrace("vgId:%d, reset peer hb timer at %d", pSyncNode->vgId, pSyncTimer->timerMS);
|
||||||
if ((code = taosTmrReset(syncNodeEqPeerHeartbeatTimer, pSyncTimer->timerMS / HEARTBEAT_TICK_NUM,
|
if ((code = taosTmrReset(syncNodeEqPeerHeartbeatTimer, pSyncTimer->timerMS, (void*)hbDataRid,
|
||||||
(void*)hbDataRid, syncEnv()->pTimerManager, &pSyncTimer->pTimer)) != 0) {
|
syncEnv()->pTimerManager, &pSyncTimer->pTimer)) != 0) {
|
||||||
sError("vgId:%d, reset peer hb timer error, %s", pSyncNode->vgId, tstrerror(code));
|
sError("vgId:%d, reset peer hb timer error, %s", pSyncNode->vgId, tstrerror(code));
|
||||||
syncNodeRelease(pSyncNode);
|
syncNodeRelease(pSyncNode);
|
||||||
syncHbTimerDataRelease(pData);
|
syncHbTimerDataRelease(pData);
|
||||||
|
|
|
@ -200,6 +200,7 @@ void* taosArrayPop(SArray* pArray) {
|
||||||
void* taosArrayGet(const SArray* pArray, size_t index) {
|
void* taosArrayGet(const SArray* pArray, size_t index) {
|
||||||
if (NULL == pArray) {
|
if (NULL == pArray) {
|
||||||
terrno = TSDB_CODE_INVALID_PARA;
|
terrno = TSDB_CODE_INVALID_PARA;
|
||||||
|
uError("failed to return value from array of null ptr");
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue