diff --git a/docs/zh/12-taos-sql/24-show.md b/docs/zh/12-taos-sql/24-show.md index db9e3fd136..4bd1e52284 100644 --- a/docs/zh/12-taos-sql/24-show.md +++ b/docs/zh/12-taos-sql/24-show.md @@ -211,10 +211,10 @@ SHOW USERS; 显示当前系统中所有用户的信息。包括用户自定义的用户和系统默认用户。 -## SHOW VARIABLES +## SHOW CLUSTER VARIABLES(3.0.1.6 之前为 SHOW VARIABLES) ```sql -SHOW VARIABLES; +SHOW CLUSTER VARIABLES; SHOW DNODE dnode_id VARIABLES; ``` diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 7df6124152..99bbfde3e1 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -126,6 +126,9 @@ extern char tsSmlChildTableName[]; extern char tsSmlTagName[]; extern bool tsSmlDataFormat; +// wal +extern int64_t tsWalFsyncDataSizeLimit; + // internal extern int32_t tsTransPullupInterval; extern int32_t tsMqRebalanceInterval; diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 67c2a9081c..76b13579c1 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -57,10 +57,11 @@ extern int32_t tMsgDict[]; #define TMSG_SEG_SEQ(TYPE) ((TYPE)&0xff) #define TMSG_INFO(TYPE) \ ((TYPE) < TDMT_DND_MAX_MSG || (TYPE) < TDMT_MND_MAX_MSG || (TYPE) < TDMT_VND_MAX_MSG || (TYPE) < TDMT_SCH_MAX_MSG || \ - (TYPE) < TDMT_VND_TMQ_MAX_MSG || (TYPE) < TDMT_STREAM_MAX_MSG || (TYPE) < TDMT_VND_STREAM_MAX_MSG || \ - (TYPE) < TDMT_MON_MAX_MSG || (TYPE) < TDMT_SYNC_MAX_MSG) \ + (TYPE) < TDMT_STREAM_MAX_MSG || (TYPE) < TDMT_MON_MAX_MSG || (TYPE) < TDMT_SYNC_MAX_MSG) || \ + (TYPE) < TDMT_VND_STREAM_MSG || (TYPE) < TDMT_VND_TMQ_MSG \ ? tMsgInfo[tMsgDict[TMSG_SEG_CODE(TYPE)] + TMSG_SEG_SEQ(TYPE)] \ : 0 + #define TMSG_INDEX(TYPE) (tMsgDict[TMSG_SEG_CODE(TYPE)] + TMSG_SEG_SEQ(TYPE)) typedef uint16_t tmsg_t; diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 3d9d55b101..70145b434d 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -139,6 +139,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_MND_BATCH_META, "batch-meta", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TABLE_CFG, "table-cfg", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TMQ_CREATE_TOPIC, "create-topic", SMCreateTopicReq, SMCreateTopicRsp) + TD_DEF_MSG_TYPE(TDMT_MND_UNUSED1, "unused", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TMQ_DROP_TOPIC, "drop-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TMQ_SUBSCRIBE, "subscribe", SCMSubscribeReq, SCMSubscribeRsp) TD_DEF_MSG_TYPE(TDMT_MND_TMQ_ASK_EP, "ask-ep", SMqAskEpReq, SMqAskEpRsp) @@ -147,7 +148,8 @@ enum { TD_DEF_MSG_TYPE(TDMT_MND_TMQ_HB, "consumer-hb", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TMQ_DO_REBALANCE, "do-rebalance", SMqDoRebalanceMsg, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TMQ_DROP_CGROUP, "drop-cgroup", SMqDropCGroupReq, SMqDropCGroupRsp) - TD_DEF_MSG_TYPE(TDMT_MND_MQ_TIMER, "mq-tmr", SMTimerReq, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_UNUSED2, "unused2", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_TMQ_TIMER, "mq-tmr", SMTimerReq, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TELEM_TIMER, "telem-tmr", SMTimerReq, SMTimerReq) TD_DEF_MSG_TYPE(TDMT_MND_TRANS_TIMER, "trans-tmr", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TTL_TIMER, "ttl-tmr", NULL, NULL) @@ -184,6 +186,21 @@ enum { TD_DEF_MSG_TYPE(TDMT_VND_CREATE_STB, "vnode-create-stb", SVCreateStbReq, NULL) TD_DEF_MSG_TYPE(TDMT_VND_ALTER_STB, "vnode-alter-stb", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_DROP_STB, "vnode-drop-stb", SVDropStbReq, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_UNUSED1, "vnode-unused1", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_UNUSED2, "vnode-unused2", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_UNUSED3, "vnode-unused3", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_UNUSED4, "vnode-unused4", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_UNUSED5, "vnode-unused5", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_UNUSED6, "vnode-unused6", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_UNUSED7, "vnode-unused7", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_UNUSED8, "vnode-unused8", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_UNUSED9, "vnode-unused9", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_UNUSED10, "vnode-unused10", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_UNUSED11, "vnode-unused11", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_UNUSED12, "vnode-unused12", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_UNUSED13, "vnode-unused13", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_UNUSED14, "vnode-unused14", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_UNUSED15, "vnode-unused15", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_CREATE_SMA, "vnode-create-sma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_SMA, "vnode-cancel-sma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_DROP_SMA, "vnode-drop-sma", NULL, NULL) @@ -215,30 +232,17 @@ enum { TD_DEF_MSG_TYPE(TDMT_SCH_LINK_BROKEN, "link-broken", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SCH_MAX_MSG, "sch-max", NULL, NULL) - TD_NEW_MSG_SEG(TDMT_VND_TMQ_MSG) - TD_DEF_MSG_TYPE(TDMT_VND_TMQ_SUBSCRIBE, "vnode-tmq-subscribe", SMqRebVgReq, SMqRebVgRsp) - TD_DEF_MSG_TYPE(TDMT_VND_TMQ_DELETE_SUB, "vnode-tmq-delete-sub", SMqVDeleteReq, SMqVDeleteRsp) - TD_DEF_MSG_TYPE(TDMT_VND_TMQ_COMMIT_OFFSET, "vnode-tmq-commit-offset", STqOffset, STqOffset) - TD_DEF_MSG_TYPE(TDMT_VND_TMQ_ADD_CHECKINFO, "vnode-tmq-add-checkinfo", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_TMQ_DEL_CHECKINFO, "vnode-del-checkinfo", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_TMQ_CONSUME, "vnode-tmq-consume", SMqPollReq, SMqDataBlkRsp) - TD_DEF_MSG_TYPE(TDMT_VND_TMQ_MAX_MSG, "vnd-tmq-max", NULL, NULL) TD_NEW_MSG_SEG(TDMT_STREAM_MSG) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_DEPLOY, "stream-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_DROP, "stream-task-drop", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_RUN, "stream-task-run", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_DISPATCH, "stream-task-dispatch", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_STREAM_UNUSED1, "stream-unused1", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_RETRIEVE, "stream-retrieve", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_RECOVER_FINISH, "vnode-stream-finish", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_MAX_MSG, "stream-max", NULL, NULL) - TD_NEW_MSG_SEG(TDMT_VND_STREAM_MSG) - TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRIGGER, "vnode-stream-trigger", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_STREAM_RECOVER_STEP1, "vnode-stream-recover1", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_STREAM_RECOVER_STEP2, "vnode-stream-recover2", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_STREAM_MAX_MSG, "vnd-stream-max", NULL, NULL) - TD_NEW_MSG_SEG(TDMT_MON_MSG) TD_DEF_MSG_TYPE(TDMT_MON_MAX_MSG, "monitor-max", NULL, NULL) @@ -270,6 +274,22 @@ enum { TD_DEF_MSG_TYPE(TDMT_SYNC_LOCAL_CMD, "sync-local-cmd", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_MAX_MSG, "sync-max", NULL, NULL) + TD_NEW_MSG_SEG(TDMT_VND_STREAM_MSG) + TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRIGGER, "vnode-stream-trigger", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_STREAM_RECOVER_STEP1, "vnode-stream-recover1", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_STREAM_RECOVER_STEP2, "vnode-stream-recover2", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_STREAM_MAX_MSG, "vnd-stream-max", NULL, NULL) + + TD_NEW_MSG_SEG(TDMT_VND_TMQ_MSG) + TD_DEF_MSG_TYPE(TDMT_VND_TMQ_SUBSCRIBE, "vnode-tmq-subscribe", SMqRebVgReq, SMqRebVgRsp) + TD_DEF_MSG_TYPE(TDMT_VND_TMQ_DELETE_SUB, "vnode-tmq-delete-sub", SMqVDeleteReq, SMqVDeleteRsp) + TD_DEF_MSG_TYPE(TDMT_VND_TMQ_COMMIT_OFFSET, "vnode-tmq-commit-offset", STqOffset, STqOffset) + TD_DEF_MSG_TYPE(TDMT_VND_TMQ_ADD_CHECKINFO, "vnode-tmq-add-checkinfo", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_TMQ_DEL_CHECKINFO, "vnode-del-checkinfo", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_TMQ_CONSUME, "vnode-tmq-consume", SMqPollReq, SMqDataBlkRsp) + TD_DEF_MSG_TYPE(TDMT_VND_TMQ_MAX_MSG, "vnd-tmq-max", NULL, NULL) + + #if defined(TD_MSG_NUMBER_) TDMT_MAX #endif diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index 08dba5d50d..ed85fd3517 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -43,7 +43,6 @@ extern "C" { #define WAL_FILE_LEN (WAL_PATH_LEN + 32) #define WAL_MAGIC 0xFAFBFCFDF4F3F2F1ULL #define WAL_SCAN_BUF_SIZE (1024 * 1024 * 3) -#define WAL_RECOV_SIZE_LIMIT (100 * WAL_SCAN_BUF_SIZE) typedef enum { TAOS_WAL_WRITE = 1, @@ -159,6 +158,7 @@ void walCleanUp(); // handle open and ctl SWal *walOpen(const char *path, SWalCfg *pCfg); int32_t walAlter(SWal *, SWalCfg *pCfg); +int32_t walPersist(SWal *); void walClose(SWal *); // write interfaces diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 660730f0dc..7f4a826c5e 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -156,6 +156,9 @@ char tsCompressor[32] = "ZSTD_COMPRESSOR"; // ZSTD_COMPRESSOR or GZIP_COMPR // udf bool tsStartUdfd = true; +// wal +int64_t tsWalFsyncDataSizeLimit = (100 * 1024 * 1024L); + // internal int32_t tsTransPullupInterval = 2; int32_t tsMqRebalanceInterval = 2; @@ -423,6 +426,9 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddInt32(pCfg, "uptimeInterval", tsUptimeInterval, 1, 100000, 1) != 0) return -1; if (cfgAddInt32(pCfg, "queryRsmaTolerance", tsQueryRsmaTolerance, 0, 900000, 0) != 0) return -1; + if (cfgAddInt64(pCfg, "walFsyncDataSizeLimit", tsWalFsyncDataSizeLimit, 100 * 1024 * 1024, INT64_MAX, 0) != 0) + return -1; + if (cfgAddBool(pCfg, "udf", tsStartUdfd, 0) != 0) return -1; if (cfgAddString(pCfg, "udfdResFuncs", tsUdfdResFuncs, 0) != 0) return -1; if (cfgAddString(pCfg, "udfdLdLibPath", tsUdfdLdLibPath, 0) != 0) return -1; @@ -722,6 +728,8 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsUptimeInterval = cfgGetItem(pCfg, "uptimeInterval")->i32; tsQueryRsmaTolerance = cfgGetItem(pCfg, "queryRsmaTolerance")->i32; + tsWalFsyncDataSizeLimit = cfgGetItem(pCfg, "walFsyncDataSizeLimit")->i64; + tsStartUdfd = cfgGetItem(pCfg, "udf")->bval; tstrncpy(tsUdfdResFuncs, cfgGetItem(pCfg, "udfdResFuncs")->str, sizeof(tsUdfdResFuncs)); tstrncpy(tsUdfdLdLibPath, cfgGetItem(pCfg, "udfdLdLibPath")->str, sizeof(tsUdfdLdLibPath)); diff --git a/source/dnode/mgmt/node_util/src/dmUtil.c b/source/dnode/mgmt/node_util/src/dmUtil.c index 80bb8debd2..648a9ab9ce 100644 --- a/source/dnode/mgmt/node_util/src/dmUtil.c +++ b/source/dnode/mgmt/node_util/src/dmUtil.c @@ -55,7 +55,7 @@ void *dmSetMgmtHandle(SArray *pArray, tmsg_t msgType, void *nodeMsgFp, bool need } void dmGetMonitorSystemInfo(SMonSysInfo *pInfo) { - taosGetCpuUsage(&pInfo->cpu_engine, &pInfo->cpu_system); + taosGetCpuUsage(&pInfo->cpu_system, &pInfo->cpu_engine); taosGetCpuCores(&pInfo->cpu_cores); taosGetProcMemory(&pInfo->mem_engine); taosGetSysMemory(&pInfo->mem_system); diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index aee1a7a75c..df999316eb 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -66,7 +66,7 @@ int32_t mndInitConsumer(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_MND_TMQ_SUBSCRIBE, mndProcessSubscribeReq); mndSetMsgHandle(pMnode, TDMT_MND_TMQ_HB, mndProcessMqHbReq); mndSetMsgHandle(pMnode, TDMT_MND_TMQ_ASK_EP, mndProcessAskEpReq); - mndSetMsgHandle(pMnode, TDMT_MND_MQ_TIMER, mndProcessMqTimerMsg); + mndSetMsgHandle(pMnode, TDMT_MND_TMQ_TIMER, mndProcessMqTimerMsg); mndSetMsgHandle(pMnode, TDMT_MND_TMQ_CONSUMER_LOST, mndProcessConsumerLostMsg); mndSetMsgHandle(pMnode, TDMT_MND_TMQ_CONSUMER_RECOVER, mndProcessConsumerRecoverMsg); diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index f47637bb50..f604a9289a 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -105,7 +105,7 @@ static void mndCalMqRebalance(SMnode *pMnode) { int32_t contLen = 0; void *pReq = mndBuildTimerMsg(&contLen); if (pReq != NULL) { - SRpcMsg rpcMsg = {.msgType = TDMT_MND_MQ_TIMER, .pCont = pReq, .contLen = contLen}; + SRpcMsg rpcMsg = {.msgType = TDMT_MND_TMQ_TIMER, .pCont = pReq, .contLen = contLen}; tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg); } } @@ -490,7 +490,7 @@ static int32_t mndCheckMnodeState(SRpcMsg *pMsg) { SMnode *pMnode = pMsg->info.node; SSyncState state = syncGetState(pMnode->syncMgmt.sync); - if (pMsg->msgType == TDMT_MND_MQ_TIMER || pMsg->msgType == TDMT_MND_TELEM_TIMER || + if (pMsg->msgType == TDMT_MND_TMQ_TIMER || pMsg->msgType == TDMT_MND_TELEM_TIMER || pMsg->msgType == TDMT_MND_TRANS_TIMER || pMsg->msgType == TDMT_MND_TTL_TIMER || pMsg->msgType == TDMT_MND_UPTIME_TIMER) { mTrace("timer not process since mnode restored:%d stopped:%d, sync restored:%d role:%s ", pMnode->restored, diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 47a40ea495..094db9ebd0 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1268,7 +1268,7 @@ int32_t tqProcessSubmitReq(STQ* pTq, SSubmitReq* pReq, int64_t ver) { if (pIter == NULL) break; SStreamTask* pTask = *(SStreamTask**)pIter; if (pTask->taskLevel != TASK_LEVEL__SOURCE) continue; - if (pTask->taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->taskStatus == TASK_STATUS__RECOVER1) { + if (pTask->taskStatus == TASK_STATUS__RECOVER_PREPARE) { tqDebug("skip push task %d, task status %d", pTask->taskId, pTask->taskStatus); continue; } diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 2f6ec0c39f..48e69f8f4d 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -199,6 +199,9 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea goto END; } + tqDebug("vgId:%d, taosx get msg ver %" PRId64 ", type: %s", pTq->pVnode->config.vgId, offset, + TMSG_INFO((*ppCkHead)->head.msgType)); + if ((*ppCkHead)->head.msgType == TDMT_VND_SUBMIT) { code = walFetchBody(pHandle->pWalReader, ppCkHead); @@ -216,17 +219,20 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea SWalCont* pHead = &((*ppCkHead)->head); if (IS_META_MSG(pHead->msgType)) { code = walFetchBody(pHandle->pWalReader, ppCkHead); - if (code < 0) { ASSERT(0); *fetchOffset = offset; code = -1; goto END; } + if (isValValidForTable(pHandle, pHead)) { *fetchOffset = offset; code = 0; goto END; + } else { + offset++; + continue; } } } @@ -586,8 +592,39 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) { taosHashPut(pExec->execHandle.execDb.pFilterOutTbUid, &tbUid, sizeof(int64_t), NULL, 0); } } + } else if (pExec->execHandle.subType == TOPIC_SUB_TYPE__TABLE) { + if (isAdd) { + SArray* qa = taosArrayInit(4, sizeof(tb_uid_t)); + SMetaReader mr = {0}; + metaReaderInit(&mr, pTq->pVnode->pMeta, 0); + for (int32_t i = 0; i < taosArrayGetSize(tbUidList); ++i) { + uint64_t* id = (uint64_t*)taosArrayGet(tbUidList, i); + + int32_t code = metaGetTableEntryByUid(&mr, *id); + if (code != TSDB_CODE_SUCCESS) { + qError("failed to get table meta, uid:%" PRIu64 " code:%s", *id, tstrerror(terrno)); + continue; + } + + tDecoderClear(&mr.coder); + + if (mr.me.type != TSDB_CHILD_TABLE || mr.me.ctbEntry.suid != pExec->execHandle.execTb.suid) { + tqDebug("table uid %" PRId64 " does not add to tq handle", *id); + continue; + } + tqDebug("table uid %" PRId64 " add to tq handle", *id); + taosArrayPush(qa, id); + } + metaReaderClear(&mr); + if (taosArrayGetSize(qa) > 0) { + tqReaderAddTbUidList(pExec->execHandle.pExecReader, qa); + } + taosArrayDestroy(qa); + } else { + // TODO handle delete table from stb + } } else { - // tq update id + ASSERT(0); } } while (1) { diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index a964a46406..76236e5078 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -684,7 +684,7 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) { if (*state->pDataFReader != NULL) { tsdbDataFReaderClose(state->pDataFReader); - resetLastBlockLoadInfo(state->pLoadInfo); + // resetLastBlockLoadInfo(state->pLoadInfo); } code = tsdbDataFReaderOpen(state->pDataFReader, state->pTsdb, pFileSet); @@ -764,7 +764,7 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) { if (--state->iBlock < 0) { tsdbDataFReaderClose(state->pDataFReader); *state->pDataFReader = NULL; - resetLastBlockLoadInfo(state->pLoadInfo); + // resetLastBlockLoadInfo(state->pLoadInfo); if (state->aBlockIdx) { taosArrayDestroy(state->aBlockIdx); diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index e593bbd602..7040d2d7c8 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -212,6 +212,12 @@ int vnodeCommit(SVnode *pVnode) { vInfo("vgId:%d, start to commit, commit ID:%" PRId64 " version:%" PRId64, TD_VID(pVnode), pVnode->state.commitID, pVnode->state.applied); + // persist wal before starting + if (walPersist(pVnode->pWal) < 0) { + vError("vgId:%d, failed to persist wal since %s", TD_VID(pVnode), terrstr()); + return -1; + } + pVnode->state.commitTerm = pVnode->state.applyTerm; // save info diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index cfedd70711..d6c77afa66 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -285,8 +285,8 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp case TDMT_VND_COMMIT: goto _do_commit; default: - ASSERT(0); - break; + vError("vgId:%d, unprocessed msg, %d", TD_VID(pVnode), pMsg->msgType); + return -1; } vTrace("vgId:%d, process %s request, code:0x%x index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), pRsp->code, diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index d8340b72f2..98f8d57fc6 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -621,9 +621,10 @@ int32_t* setupColumnOffset(const SSDataBlock* pBlock, int32_t rowCapacity) { } static void clearPartitionOperator(SPartitionOperatorInfo* pInfo) { - void* ite = NULL; - while ((ite = taosHashIterate(pInfo->pGroupSet, ite)) != NULL) { - taosArrayDestroy(((SDataGroupInfo*)ite)->pPageList); + int32_t size = taosArrayGetSize(pInfo->sortedGroupArray); + for (int32_t i = 0; i < size; i++) { + SDataGroupInfo* pGp = taosArrayGet(pInfo->sortedGroupArray, i); + taosArrayDestroy(pGp->pPageList); } taosArrayClear(pInfo->sortedGroupArray); clearDiskbasedBuf(pInfo->pBuf); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 5229b46815..e54b889a7a 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -344,7 +344,8 @@ static bool doLoadBlockSMA(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, return true; } -static void doSetTagColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo, int32_t rows) { +static void doSetTagColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo, + int32_t rows) { if (pTableScanInfo->pseudoSup.numOfExprs > 0) { SExprSupp* pSup = &pTableScanInfo->pseudoSup; @@ -1878,7 +1879,6 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { } #endif -#if 1 if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE1 || pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE2) { STableScanInfo* pTSInfo = pInfo->pTableScanOp->info; @@ -1914,7 +1914,6 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { return NULL; } -#endif size_t total = taosArrayGetSize(pInfo->pBlockLists); // TODO: refactor @@ -2296,7 +2295,7 @@ SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pT pInfo->vnode = pHandle->vnode; pInfo->sContext = pHandle->sContext; - pOperator->name = "RawStreamScanOperator"; + pOperator->name = "RawScanOperator"; pOperator->info = pInfo; pOperator->pTaskInfo = pTaskInfo; @@ -4384,8 +4383,9 @@ static int32_t loadDataBlockFromOneTable2(SOperatorInfo* pOperator, STableMergeS // currently only the tbname pseudo column if (pTableScanInfo->pseudoSup.numOfExprs > 0) { - int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pTableScanInfo->pseudoSup.pExprInfo, - pTableScanInfo->pseudoSup.numOfExprs, pBlock, pBlock->info.rows, GET_TASKID(pTaskInfo)); + int32_t code = + addTagPseudoColumnData(&pTableScanInfo->readHandle, pTableScanInfo->pseudoSup.pExprInfo, + pTableScanInfo->pseudoSup.numOfExprs, pBlock, pBlock->info.rows, GET_TASKID(pTaskInfo)); if (code != TSDB_CODE_SUCCESS) { T_LONG_JMP(pTaskInfo->env, code); } @@ -4501,8 +4501,9 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc // currently only the tbname pseudo column if (pTableScanInfo->pseudoSup.numOfExprs > 0) { - int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pTableScanInfo->pseudoSup.pExprInfo, - pTableScanInfo->pseudoSup.numOfExprs, pBlock, pBlock->info.rows, GET_TASKID(pTaskInfo)); + int32_t code = + addTagPseudoColumnData(&pTableScanInfo->readHandle, pTableScanInfo->pseudoSup.pExprInfo, + pTableScanInfo->pseudoSup.numOfExprs, pBlock, pBlock->info.rows, GET_TASKID(pTaskInfo)); if (code != TSDB_CODE_SUCCESS) { T_LONG_JMP(pTaskInfo->env, code); } diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index 459ee583a4..85b14d6017 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -124,13 +124,16 @@ static int32_t udfSpawnUdfd(SUdfdData *pData) { char pathTaosdLdLib[512] = {0}; size_t taosdLdLibPathLen = sizeof(pathTaosdLdLib); - uv_os_getenv("LD_LIBRARY_PATH", pathTaosdLdLib, &taosdLdLibPathLen); + int ret = uv_os_getenv("LD_LIBRARY_PATH", pathTaosdLdLib, &taosdLdLibPathLen); + if (ret != UV_ENOBUFS) { + taosdLdLibPathLen = strlen(pathTaosdLdLib); + } char udfdPathLdLib[1024] = {0}; size_t udfdLdLibPathLen = strlen(tsUdfdLdLibPath); strncpy(udfdPathLdLib, tsUdfdLdLibPath, udfdLdLibPathLen); udfdPathLdLib[udfdLdLibPathLen] = ':'; - strncpy(udfdPathLdLib + udfdLdLibPathLen + 1, pathTaosdLdLib, sizeof(udfdPathLdLib) - udfdLdLibPathLen); + strncpy(udfdPathLdLib + udfdLdLibPathLen + 1, pathTaosdLdLib, sizeof(udfdPathLdLib) - udfdLdLibPathLen - 1); if (udfdLdLibPathLen + taosdLdLibPathLen < 1024) { fnInfo("udfd LD_LIBRARY_PATH: %s", udfdPathLdLib); } else { diff --git a/source/libs/wal/inc/walInt.h b/source/libs/wal/inc/walInt.h index 1aea0e8148..6dc9922981 100644 --- a/source/libs/wal/inc/walInt.h +++ b/source/libs/wal/inc/walInt.h @@ -34,6 +34,7 @@ typedef struct { int64_t createTs; int64_t closeTs; int64_t fileSize; + int64_t syncedOffset; } SWalFileInfo; typedef struct WalIdxEntry { @@ -66,6 +67,12 @@ static inline int64_t walGetLastFileSize(SWal* pWal) { return pInfo->fileSize; } +static inline int64_t walGetLastFileCachedSize(SWal* pWal) { + if (taosArrayGetSize(pWal->fileInfoSet) == 0) return 0; + SWalFileInfo* pInfo = (SWalFileInfo*)taosArrayGetLast(pWal->fileInfoSet); + return (pInfo->fileSize - pInfo->syncedOffset); +} + static inline int64_t walGetLastFileFirstVer(SWal* pWal) { if (taosArrayGetSize(pWal->fileInfoSet) == 0) return -1; SWalFileInfo* pInfo = (SWalFileInfo*)taosArrayGetLast(pWal->fileInfoSet); diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c index ad27a50f68..3baa906b19 100644 --- a/source/libs/wal/src/walMeta.c +++ b/source/libs/wal/src/walMeta.c @@ -16,6 +16,7 @@ #include "cJSON.h" #include "os.h" #include "taoserror.h" +#include "tglobal.h" #include "tutil.h" #include "walInt.h" @@ -65,32 +66,43 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx) { // ensure size as non-negative pFileInfo->fileSize = TMAX(0, pFileInfo->fileSize); + int64_t stepSize = WAL_SCAN_BUF_SIZE; uint64_t magic = WAL_MAGIC; int64_t walCkHeadSz = sizeof(SWalCkHead); int64_t end = fileSize; - int64_t offset = 0; int64_t capacity = 0; int64_t readSize = 0; char* buf = NULL; - int64_t found = -1; bool firstTrial = pFileInfo->fileSize < fileSize; + int64_t offset = TMIN(pFileInfo->fileSize, fileSize); + int64_t offsetForward = offset - stepSize + walCkHeadSz - 1; + int64_t offsetBackward = offset; + int64_t retVer = -1; + int64_t lastEntryBeginOffset = 0; + int64_t lastEntryEndOffset = 0; + + // check recover size + if (2 * tsWalFsyncDataSizeLimit + offset < end) { + wWarn("vgId:%d, possibly corrupted WAL range exceeds size limit (i.e. %" PRId64 " bytes). offset:%" PRId64 + ", end:%" PRId64 ", file:%s", + pWal->cfg.vgId, 2 * tsWalFsyncDataSizeLimit, offset, end, fnameStr); + } // search for the valid last WAL entry, e.g. block by block while (1) { - offset = (firstTrial) ? pFileInfo->fileSize : TMAX(0, end - WAL_SCAN_BUF_SIZE); + offset = (firstTrial) ? TMIN(fileSize, offsetForward + stepSize - walCkHeadSz + 1) + : TMAX(0, offsetBackward - stepSize + walCkHeadSz - 1); + end = TMIN(offset + stepSize, fileSize); + if (firstTrial) { + offsetForward = offset; + } else { + offsetBackward = offset; + } + ASSERT(offset <= end); readSize = end - offset; capacity = readSize + sizeof(magic); - int64_t limit = WAL_RECOV_SIZE_LIMIT; - if (limit < readSize) { - wError("vgId:%d, possibly corrupted WAL range exceeds size limit (i.e. %" PRId64 " bytes). offset:%" PRId64 - ", end:%" PRId64 ", file:%s", - pWal->cfg.vgId, limit, offset, end, fnameStr); - terrno = TSDB_CODE_WAL_SIZE_LIMIT; - goto _err; - } - void* ptr = taosMemoryRealloc(buf, capacity); if (ptr == NULL) { terrno = TSDB_CODE_WAL_OUT_OF_MEMORY; @@ -127,6 +139,7 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx) { } logContent = (SWalCkHead*)(buf + pos); if (walValidHeadCksum(logContent) != 0) { + terrno = TSDB_CODE_WAL_CHKSUM_MISMATCH; wWarn("vgId:%d, failed to validate checksum of wal entry header. offset:%" PRId64 ", file:%s", pWal->cfg.vgId, offset + pos, fnameStr); haystack = buf + pos + 1; @@ -179,46 +192,41 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx) { } // found one - found = pos; + retVer = logContent->head.version; + lastEntryBeginOffset = offset + pos; + lastEntryEndOffset = offset + pos + sizeof(SWalCkHead) + logContent->head.bodyLen; + + // try next haystack = buf + pos + 1; } - if (found >= 0 || offset == 0) break; - - // go backwards, e.g. by at most one WAL scan buf size - end = TMIN(offset + walCkHeadSz - 1, fileSize); - firstTrial = false; + if (end == fileSize) firstTrial = false; + if (firstTrial && terrno == TSDB_CODE_SUCCESS) continue; + if (retVer >= 0 || offset == 0) break; } - // determine end of last entry - SWalCkHead* lastEntry = (found >= 0) ? (SWalCkHead*)(buf + found) : NULL; - int64_t retVer = -1; - int64_t lastEntryBeginOffset = 0; - int64_t lastEntryEndOffset = 0; - - if (lastEntry == NULL) { + if (retVer < 0) { terrno = TSDB_CODE_WAL_LOG_NOT_EXIST; - } else { - retVer = lastEntry->head.version; - lastEntryBeginOffset = offset + (int64_t)((char*)lastEntry - (char*)buf); - lastEntryEndOffset = lastEntryBeginOffset + sizeof(SWalCkHead) + lastEntry->head.bodyLen; } // truncate file if (lastEntryEndOffset != fileSize) { wWarn("vgId:%d, repair meta truncate file %s to %" PRId64 ", orig size %" PRId64, pWal->cfg.vgId, fnameStr, lastEntryEndOffset, fileSize); + if (taosFtruncateFile(pFile, lastEntryEndOffset) < 0) { wError("failed to truncate file due to %s. file:%s", strerror(errno), fnameStr); terrno = TAOS_SYSTEM_ERROR(errno); goto _err; } + if (taosFsyncFile(pFile) < 0) { wError("failed to fsync file due to %s. file:%s", strerror(errno), fnameStr); terrno = TAOS_SYSTEM_ERROR(errno); goto _err; } } + pFileInfo->fileSize = lastEntryEndOffset; taosCloseFile(&pFile); @@ -621,6 +629,7 @@ int walRollFileInfo(SWal* pWal) { pNewInfo->createTs = ts; pNewInfo->closeTs = -1; pNewInfo->fileSize = 0; + pNewInfo->syncedOffset = 0; taosArrayPush(pArray, pNewInfo); taosMemoryFree(pNewInfo); return 0; @@ -771,6 +780,12 @@ static int walFindCurMetaVer(SWal* pWal) { return metaVer; } +void walUpdateSyncedOffset(SWal* pWal) { + SWalFileInfo* pFileInfo = walGetCurFileInfo(pWal); + if (pFileInfo == NULL) return; + pFileInfo->syncedOffset = pFileInfo->fileSize; +} + int walSaveMeta(SWal* pWal) { int metaVer = walFindCurMetaVer(pWal); char fnameStr[WAL_FILE_LEN]; @@ -790,6 +805,9 @@ int walSaveMeta(SWal* pWal) { return -1; } + // update synced offset + (void)walUpdateSyncedOffset(pWal); + // flush to a tmpfile n = walBuildTmpMetaName(pWal, tmpFnameStr); ASSERT(n < sizeof(tmpFnameStr) && "Buffer overflow of file name"); diff --git a/source/libs/wal/src/walMgmt.c b/source/libs/wal/src/walMgmt.c index 0df1a6b387..1a70a3038f 100644 --- a/source/libs/wal/src/walMgmt.c +++ b/source/libs/wal/src/walMgmt.c @@ -187,6 +187,13 @@ int32_t walAlter(SWal *pWal, SWalCfg *pCfg) { return 0; } +int32_t walPersist(SWal *pWal) { + taosThreadMutexLock(&pWal->mutex); + int32_t ret = walSaveMeta(pWal); + taosThreadMutexUnlock(&pWal->mutex); + return ret; +} + void walClose(SWal *pWal) { taosThreadMutexLock(&pWal->mutex); (void)walSaveMeta(pWal); diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index 8f493ddd85..1350ca0c37 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -198,7 +198,7 @@ int32_t walReadSeekVerImpl(SWalReader *pReader, int64_t ver) { return -1; } - wDebug("vgId:%d, wal version reset from index:%" PRId64 "(invalid:%d) to index:%" PRId64, pReader->pWal->cfg.vgId, + wDebug("vgId:%d, wal version reset from %" PRId64 "(invalid:%d) to %" PRId64, pReader->pWal->cfg.vgId, pReader->curVersion, pReader->curInvalid, ver); pReader->curVersion = ver; @@ -347,22 +347,47 @@ static int32_t walSkipFetchBodyNew(SWalReader *pRead) { int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) { int64_t code; + int64_t contLen; + bool seeked = false; + + wDebug("vgId:%d try to fetch ver %" PRId64 ", first ver:%" PRId64 ", commit ver:%" PRId64 ", last ver:%" PRId64 + ", applied ver:%" PRId64, + pRead->pWal->cfg.vgId, ver, pRead->pWal->vers.firstVer, pRead->pWal->vers.commitVer, pRead->pWal->vers.lastVer, + pRead->pWal->vers.appliedVer); // TODO: valid ver - if (ver > pRead->pWal->vers.commitVer) { + if (ver > pRead->pWal->vers.appliedVer) { return -1; } if (pRead->curInvalid || pRead->curVersion != ver) { code = walReadSeekVer(pRead, ver); - if (code < 0) return -1; + if (code < 0) { + pRead->curVersion = ver; + pRead->curInvalid = 1; + return -1; + } + seeked = true; } - ASSERT(taosValidFile(pRead->pLogFile) == true); - - code = taosReadFile(pRead->pLogFile, pHead, sizeof(SWalCkHead)); - if (code != sizeof(SWalCkHead)) { - return -1; + while (1) { + contLen = taosReadFile(pRead->pLogFile, pHead, sizeof(SWalCkHead)); + if (contLen == sizeof(SWalCkHead)) { + break; + } else if (contLen == 0 && !seeked) { + walReadSeekVerImpl(pRead, ver); + seeked = true; + continue; + } else { + if (contLen < 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + } else { + terrno = TSDB_CODE_WAL_FILE_CORRUPTED; + } + ASSERT(0); + pRead->curInvalid = 1; + return -1; + } } code = walValidHeadCksum(pHead); @@ -373,13 +398,20 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) { return -1; } + pRead->curInvalid = 0; return 0; } int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead) { int64_t code; - // ASSERT(pRead->curVersion == pHead->head.version); + wDebug("vgId:%d skip fetch body %" PRId64 ", first ver:%" PRId64 ", commit ver:%" PRId64 ", last ver:%" PRId64 + ", applied ver:%" PRId64, + pRead->pWal->cfg.vgId, pHead->head.version, pRead->pWal->vers.firstVer, pRead->pWal->vers.commitVer, + pRead->pWal->vers.lastVer, pRead->pWal->vers.appliedVer); + + ASSERT(pRead->curVersion == pHead->head.version); + ASSERT(pRead->curInvalid == 0); code = taosLSeekFile(pRead->pLogFile, pHead->head.bodyLen, SEEK_CUR); if (code < 0) { @@ -397,6 +429,11 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) { SWalCont *pReadHead = &((*ppHead)->head); int64_t ver = pReadHead->version; + wDebug("vgId:%d fetch body %" PRId64 ", first ver:%" PRId64 ", commit ver:%" PRId64 ", last ver:%" PRId64 + ", applied ver:%" PRId64, + pRead->pWal->cfg.vgId, ver, pRead->pWal->vers.firstVer, pRead->pWal->vers.commitVer, pRead->pWal->vers.lastVer, + pRead->pWal->vers.appliedVer); + if (pRead->capacity < pReadHead->bodyLen) { SWalCkHead *ptr = (SWalCkHead *)taosMemoryRealloc(*ppHead, sizeof(SWalCkHead) + pReadHead->bodyLen); if (ptr == NULL) { @@ -409,19 +446,32 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) { } if (pReadHead->bodyLen != taosReadFile(pRead->pLogFile, pReadHead->body, pReadHead->bodyLen)) { + if (pReadHead->bodyLen < 0) { + ASSERT(0); + terrno = TAOS_SYSTEM_ERROR(errno); + wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64 ", since %s", + pRead->pWal->cfg.vgId, pReadHead->version, ver, tstrerror(terrno)); + } else { + wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64 ", since file corrupted", + pRead->pWal->cfg.vgId, pReadHead->version, ver); + terrno = TSDB_CODE_WAL_FILE_CORRUPTED; + } + pRead->curInvalid = 1; ASSERT(0); return -1; } if (pReadHead->version != ver) { + ASSERT(0); wError("vgId:%d, wal fetch body error, index:%" PRId64 ", read request index:%" PRId64, pRead->pWal->cfg.vgId, - pRead->pHead->head.version, ver); + pReadHead->version, ver); pRead->curInvalid = 1; terrno = TSDB_CODE_WAL_FILE_CORRUPTED; return -1; } if (walValidBodyCksum(*ppHead) != 0) { + ASSERT(0); wError("vgId:%d, wal fetch body error, index:%" PRId64 ", since body checksum not passed", pRead->pWal->cfg.vgId, ver); pRead->curInvalid = 1; diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index e016e72471..c723618a4b 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -16,6 +16,7 @@ #include "os.h" #include "taoserror.h" #include "tchecksum.h" +#include "tglobal.h" #include "walInt.h" int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) { @@ -252,23 +253,36 @@ static FORCE_INLINE int32_t walCheckAndRoll(SWal *pWal) { } } + if (walGetLastFileCachedSize(pWal) > tsWalFsyncDataSizeLimit) { + if (walSaveMeta(pWal) < 0) { + return -1; + } + } + return 0; } int32_t walBeginSnapshot(SWal *pWal, int64_t ver) { + taosThreadMutexLock(&pWal->mutex); + pWal->vers.verInSnapshotting = ver; wDebug("vgId:%d, wal begin snapshot for version %" PRId64 ", first ver %" PRId64 ", last ver %" PRId64, pWal->cfg.vgId, ver, pWal->vers.firstVer, pWal->vers.lastVer); // check file rolling if (pWal->cfg.retentionPeriod == 0) { - taosThreadMutexLock(&pWal->mutex); if (walGetLastFileSize(pWal) != 0) { - walRollImpl(pWal); + if (walRollImpl(pWal) < 0) { + wError("vgId:%d, failed to roll wal files since %s", pWal->cfg.vgId, terrstr()); + goto _err; + } } - taosThreadMutexUnlock(&pWal->mutex); } - + taosThreadMutexUnlock(&pWal->mutex); return 0; + +_err: + taosThreadMutexUnlock(&pWal->mutex); + return -1; } int32_t walEndSnapshot(SWal *pWal) { diff --git a/source/os/src/osSysinfo.c b/source/os/src/osSysinfo.c index 0a6dad4819..e5ca9faacb 100644 --- a/source/os/src/osSysinfo.c +++ b/source/os/src/osSysinfo.c @@ -227,9 +227,7 @@ void taosGetSystemInfo() { #ifdef WINDOWS taosGetCpuCores(&tsNumOfCores); taosGetTotalMemory(&tsTotalMemoryKB); - - double tmp1, tmp2, tmp3, tmp4; - taosGetCpuUsage(&tmp1, &tmp2); + taosGetCpuUsage(NULL, NULL); #elif defined(_TD_DARWIN_64) long physical_pages = sysconf(_SC_PHYS_PAGES); long page_size = sysconf(_SC_PAGESIZE); @@ -240,9 +238,7 @@ void taosGetSystemInfo() { taosGetProcIOnfos(); taosGetCpuCores(&tsNumOfCores); taosGetTotalMemory(&tsTotalMemoryKB); - - double tmp1, tmp2, tmp3, tmp4; - taosGetCpuUsage(&tmp1, &tmp2); + taosGetCpuUsage(NULL, NULL); #endif } @@ -447,8 +443,8 @@ void taosGetCpuUsage(double *cpu_system, double *cpu_engine) { static int64_t curSysTotal = 0; static int64_t curProcTotal = 0; - *cpu_system = 0; - *cpu_engine = 0; + if (cpu_system != NULL) *cpu_system = 0; + if (cpu_engine != NULL) *cpu_engine = 0; SysCpuInfo sysCpu = {0}; ProcCpuInfo procCpu = {0}; @@ -458,8 +454,12 @@ void taosGetCpuUsage(double *cpu_system, double *cpu_engine) { curProcTotal = procCpu.utime + procCpu.stime + procCpu.cutime + procCpu.cstime; if (curSysTotal > lastSysTotal && curSysUsed >= lastSysUsed && curProcTotal >= lastProcTotal) { - *cpu_engine = (curSysUsed - lastSysUsed) / (double)(curSysTotal - lastSysTotal) * 100; - *cpu_system = (curProcTotal - lastProcTotal) / (double)(curSysTotal - lastSysTotal) * 100; + if (cpu_system != NULL) { + *cpu_system = (curSysUsed - lastSysUsed) / (double)(curSysTotal - lastSysTotal) * 100; + } + if (cpu_engine != NULL) { + *cpu_engine = (curProcTotal - lastProcTotal) / (double)(curSysTotal - lastSysTotal) * 100; + } } lastSysUsed = curSysUsed;