Merge remote-tracking branch 'origin/3.0' into fix/TD-20052

This commit is contained in:
Shengliang Guan 2022-11-02 10:26:19 +08:00
commit 1c77d5381a
23 changed files with 273 additions and 97 deletions

View File

@ -211,10 +211,10 @@ SHOW USERS;
显示当前系统中所有用户的信息。包括用户自定义的用户和系统默认用户。 显示当前系统中所有用户的信息。包括用户自定义的用户和系统默认用户。
## SHOW VARIABLES ## SHOW CLUSTER VARIABLES(3.0.1.6 之前为 SHOW VARIABLES)
```sql ```sql
SHOW VARIABLES; SHOW CLUSTER VARIABLES;
SHOW DNODE dnode_id VARIABLES; SHOW DNODE dnode_id VARIABLES;
``` ```

View File

@ -126,6 +126,9 @@ extern char tsSmlChildTableName[];
extern char tsSmlTagName[]; extern char tsSmlTagName[];
extern bool tsSmlDataFormat; extern bool tsSmlDataFormat;
// wal
extern int64_t tsWalFsyncDataSizeLimit;
// internal // internal
extern int32_t tsTransPullupInterval; extern int32_t tsTransPullupInterval;
extern int32_t tsMqRebalanceInterval; extern int32_t tsMqRebalanceInterval;

View File

@ -57,10 +57,11 @@ extern int32_t tMsgDict[];
#define TMSG_SEG_SEQ(TYPE) ((TYPE)&0xff) #define TMSG_SEG_SEQ(TYPE) ((TYPE)&0xff)
#define TMSG_INFO(TYPE) \ #define TMSG_INFO(TYPE) \
((TYPE) < TDMT_DND_MAX_MSG || (TYPE) < TDMT_MND_MAX_MSG || (TYPE) < TDMT_VND_MAX_MSG || (TYPE) < TDMT_SCH_MAX_MSG || \ ((TYPE) < TDMT_DND_MAX_MSG || (TYPE) < TDMT_MND_MAX_MSG || (TYPE) < TDMT_VND_MAX_MSG || (TYPE) < TDMT_SCH_MAX_MSG || \
(TYPE) < TDMT_VND_TMQ_MAX_MSG || (TYPE) < TDMT_STREAM_MAX_MSG || (TYPE) < TDMT_VND_STREAM_MAX_MSG || \ (TYPE) < TDMT_STREAM_MAX_MSG || (TYPE) < TDMT_MON_MAX_MSG || (TYPE) < TDMT_SYNC_MAX_MSG) || \
(TYPE) < TDMT_MON_MAX_MSG || (TYPE) < TDMT_SYNC_MAX_MSG) \ (TYPE) < TDMT_VND_STREAM_MSG || (TYPE) < TDMT_VND_TMQ_MSG \
? tMsgInfo[tMsgDict[TMSG_SEG_CODE(TYPE)] + TMSG_SEG_SEQ(TYPE)] \ ? tMsgInfo[tMsgDict[TMSG_SEG_CODE(TYPE)] + TMSG_SEG_SEQ(TYPE)] \
: 0 : 0
#define TMSG_INDEX(TYPE) (tMsgDict[TMSG_SEG_CODE(TYPE)] + TMSG_SEG_SEQ(TYPE)) #define TMSG_INDEX(TYPE) (tMsgDict[TMSG_SEG_CODE(TYPE)] + TMSG_SEG_SEQ(TYPE))
typedef uint16_t tmsg_t; typedef uint16_t tmsg_t;

View File

@ -139,6 +139,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_MND_BATCH_META, "batch-meta", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_BATCH_META, "batch-meta", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_TABLE_CFG, "table-cfg", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TABLE_CFG, "table-cfg", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_TMQ_CREATE_TOPIC, "create-topic", SMCreateTopicReq, SMCreateTopicRsp) TD_DEF_MSG_TYPE(TDMT_MND_TMQ_CREATE_TOPIC, "create-topic", SMCreateTopicReq, SMCreateTopicRsp)
TD_DEF_MSG_TYPE(TDMT_MND_UNUSED1, "unused", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_TMQ_DROP_TOPIC, "drop-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TMQ_DROP_TOPIC, "drop-topic", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_TMQ_SUBSCRIBE, "subscribe", SCMSubscribeReq, SCMSubscribeRsp) TD_DEF_MSG_TYPE(TDMT_MND_TMQ_SUBSCRIBE, "subscribe", SCMSubscribeReq, SCMSubscribeRsp)
TD_DEF_MSG_TYPE(TDMT_MND_TMQ_ASK_EP, "ask-ep", SMqAskEpReq, SMqAskEpRsp) TD_DEF_MSG_TYPE(TDMT_MND_TMQ_ASK_EP, "ask-ep", SMqAskEpReq, SMqAskEpRsp)
@ -147,7 +148,8 @@ enum {
TD_DEF_MSG_TYPE(TDMT_MND_TMQ_HB, "consumer-hb", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TMQ_HB, "consumer-hb", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_TMQ_DO_REBALANCE, "do-rebalance", SMqDoRebalanceMsg, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TMQ_DO_REBALANCE, "do-rebalance", SMqDoRebalanceMsg, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_TMQ_DROP_CGROUP, "drop-cgroup", SMqDropCGroupReq, SMqDropCGroupRsp) TD_DEF_MSG_TYPE(TDMT_MND_TMQ_DROP_CGROUP, "drop-cgroup", SMqDropCGroupReq, SMqDropCGroupRsp)
TD_DEF_MSG_TYPE(TDMT_MND_MQ_TIMER, "mq-tmr", SMTimerReq, NULL) TD_DEF_MSG_TYPE(TDMT_MND_UNUSED2, "unused2", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_TMQ_TIMER, "mq-tmr", SMTimerReq, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_TELEM_TIMER, "telem-tmr", SMTimerReq, SMTimerReq) TD_DEF_MSG_TYPE(TDMT_MND_TELEM_TIMER, "telem-tmr", SMTimerReq, SMTimerReq)
TD_DEF_MSG_TYPE(TDMT_MND_TRANS_TIMER, "trans-tmr", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TRANS_TIMER, "trans-tmr", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_TTL_TIMER, "ttl-tmr", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TTL_TIMER, "ttl-tmr", NULL, NULL)
@ -184,6 +186,21 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_CREATE_STB, "vnode-create-stb", SVCreateStbReq, NULL) TD_DEF_MSG_TYPE(TDMT_VND_CREATE_STB, "vnode-create-stb", SVCreateStbReq, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_STB, "vnode-alter-stb", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_ALTER_STB, "vnode-alter-stb", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_DROP_STB, "vnode-drop-stb", SVDropStbReq, NULL) TD_DEF_MSG_TYPE(TDMT_VND_DROP_STB, "vnode-drop-stb", SVDropStbReq, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_UNUSED1, "vnode-unused1", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_UNUSED2, "vnode-unused2", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_UNUSED3, "vnode-unused3", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_UNUSED4, "vnode-unused4", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_UNUSED5, "vnode-unused5", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_UNUSED6, "vnode-unused6", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_UNUSED7, "vnode-unused7", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_UNUSED8, "vnode-unused8", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_UNUSED9, "vnode-unused9", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_UNUSED10, "vnode-unused10", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_UNUSED11, "vnode-unused11", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_UNUSED12, "vnode-unused12", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_UNUSED13, "vnode-unused13", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_UNUSED14, "vnode-unused14", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_UNUSED15, "vnode-unused15", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_CREATE_SMA, "vnode-create-sma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_CREATE_SMA, "vnode-create-sma", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_SMA, "vnode-cancel-sma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_SMA, "vnode-cancel-sma", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_DROP_SMA, "vnode-drop-sma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_DROP_SMA, "vnode-drop-sma", NULL, NULL)
@ -215,30 +232,17 @@ enum {
TD_DEF_MSG_TYPE(TDMT_SCH_LINK_BROKEN, "link-broken", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SCH_LINK_BROKEN, "link-broken", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SCH_MAX_MSG, "sch-max", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SCH_MAX_MSG, "sch-max", NULL, NULL)
TD_NEW_MSG_SEG(TDMT_VND_TMQ_MSG)
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_SUBSCRIBE, "vnode-tmq-subscribe", SMqRebVgReq, SMqRebVgRsp)
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_DELETE_SUB, "vnode-tmq-delete-sub", SMqVDeleteReq, SMqVDeleteRsp)
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_COMMIT_OFFSET, "vnode-tmq-commit-offset", STqOffset, STqOffset)
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_ADD_CHECKINFO, "vnode-tmq-add-checkinfo", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_DEL_CHECKINFO, "vnode-del-checkinfo", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_CONSUME, "vnode-tmq-consume", SMqPollReq, SMqDataBlkRsp)
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_MAX_MSG, "vnd-tmq-max", NULL, NULL)
TD_NEW_MSG_SEG(TDMT_STREAM_MSG) TD_NEW_MSG_SEG(TDMT_STREAM_MSG)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_DEPLOY, "stream-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_DEPLOY, "stream-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_DROP, "stream-task-drop", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_DROP, "stream-task-drop", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_RUN, "stream-task-run", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_RUN, "stream-task-run", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_DISPATCH, "stream-task-dispatch", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_DISPATCH, "stream-task-dispatch", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_UNUSED1, "stream-unused1", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_RETRIEVE, "stream-retrieve", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_RETRIEVE, "stream-retrieve", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_RECOVER_FINISH, "vnode-stream-finish", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_RECOVER_FINISH, "vnode-stream-finish", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_MAX_MSG, "stream-max", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_MAX_MSG, "stream-max", NULL, NULL)
TD_NEW_MSG_SEG(TDMT_VND_STREAM_MSG)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRIGGER, "vnode-stream-trigger", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_RECOVER_STEP1, "vnode-stream-recover1", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_RECOVER_STEP2, "vnode-stream-recover2", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_MAX_MSG, "vnd-stream-max", NULL, NULL)
TD_NEW_MSG_SEG(TDMT_MON_MSG) TD_NEW_MSG_SEG(TDMT_MON_MSG)
TD_DEF_MSG_TYPE(TDMT_MON_MAX_MSG, "monitor-max", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MON_MAX_MSG, "monitor-max", NULL, NULL)
@ -270,6 +274,22 @@ enum {
TD_DEF_MSG_TYPE(TDMT_SYNC_LOCAL_CMD, "sync-local-cmd", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_LOCAL_CMD, "sync-local-cmd", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SYNC_MAX_MSG, "sync-max", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_MAX_MSG, "sync-max", NULL, NULL)
TD_NEW_MSG_SEG(TDMT_VND_STREAM_MSG)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRIGGER, "vnode-stream-trigger", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_RECOVER_STEP1, "vnode-stream-recover1", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_RECOVER_STEP2, "vnode-stream-recover2", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_MAX_MSG, "vnd-stream-max", NULL, NULL)
TD_NEW_MSG_SEG(TDMT_VND_TMQ_MSG)
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_SUBSCRIBE, "vnode-tmq-subscribe", SMqRebVgReq, SMqRebVgRsp)
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_DELETE_SUB, "vnode-tmq-delete-sub", SMqVDeleteReq, SMqVDeleteRsp)
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_COMMIT_OFFSET, "vnode-tmq-commit-offset", STqOffset, STqOffset)
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_ADD_CHECKINFO, "vnode-tmq-add-checkinfo", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_DEL_CHECKINFO, "vnode-del-checkinfo", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_CONSUME, "vnode-tmq-consume", SMqPollReq, SMqDataBlkRsp)
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_MAX_MSG, "vnd-tmq-max", NULL, NULL)
#if defined(TD_MSG_NUMBER_) #if defined(TD_MSG_NUMBER_)
TDMT_MAX TDMT_MAX
#endif #endif

View File

@ -43,7 +43,6 @@ extern "C" {
#define WAL_FILE_LEN (WAL_PATH_LEN + 32) #define WAL_FILE_LEN (WAL_PATH_LEN + 32)
#define WAL_MAGIC 0xFAFBFCFDF4F3F2F1ULL #define WAL_MAGIC 0xFAFBFCFDF4F3F2F1ULL
#define WAL_SCAN_BUF_SIZE (1024 * 1024 * 3) #define WAL_SCAN_BUF_SIZE (1024 * 1024 * 3)
#define WAL_RECOV_SIZE_LIMIT (100 * WAL_SCAN_BUF_SIZE)
typedef enum { typedef enum {
TAOS_WAL_WRITE = 1, TAOS_WAL_WRITE = 1,
@ -159,6 +158,7 @@ void walCleanUp();
// handle open and ctl // handle open and ctl
SWal *walOpen(const char *path, SWalCfg *pCfg); SWal *walOpen(const char *path, SWalCfg *pCfg);
int32_t walAlter(SWal *, SWalCfg *pCfg); int32_t walAlter(SWal *, SWalCfg *pCfg);
int32_t walPersist(SWal *);
void walClose(SWal *); void walClose(SWal *);
// write interfaces // write interfaces

View File

@ -156,6 +156,9 @@ char tsCompressor[32] = "ZSTD_COMPRESSOR"; // ZSTD_COMPRESSOR or GZIP_COMPR
// udf // udf
bool tsStartUdfd = true; bool tsStartUdfd = true;
// wal
int64_t tsWalFsyncDataSizeLimit = (100 * 1024 * 1024L);
// internal // internal
int32_t tsTransPullupInterval = 2; int32_t tsTransPullupInterval = 2;
int32_t tsMqRebalanceInterval = 2; int32_t tsMqRebalanceInterval = 2;
@ -423,6 +426,9 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddInt32(pCfg, "uptimeInterval", tsUptimeInterval, 1, 100000, 1) != 0) return -1; if (cfgAddInt32(pCfg, "uptimeInterval", tsUptimeInterval, 1, 100000, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "queryRsmaTolerance", tsQueryRsmaTolerance, 0, 900000, 0) != 0) return -1; if (cfgAddInt32(pCfg, "queryRsmaTolerance", tsQueryRsmaTolerance, 0, 900000, 0) != 0) return -1;
if (cfgAddInt64(pCfg, "walFsyncDataSizeLimit", tsWalFsyncDataSizeLimit, 100 * 1024 * 1024, INT64_MAX, 0) != 0)
return -1;
if (cfgAddBool(pCfg, "udf", tsStartUdfd, 0) != 0) return -1; if (cfgAddBool(pCfg, "udf", tsStartUdfd, 0) != 0) return -1;
if (cfgAddString(pCfg, "udfdResFuncs", tsUdfdResFuncs, 0) != 0) return -1; if (cfgAddString(pCfg, "udfdResFuncs", tsUdfdResFuncs, 0) != 0) return -1;
if (cfgAddString(pCfg, "udfdLdLibPath", tsUdfdLdLibPath, 0) != 0) return -1; if (cfgAddString(pCfg, "udfdLdLibPath", tsUdfdLdLibPath, 0) != 0) return -1;
@ -722,6 +728,8 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsUptimeInterval = cfgGetItem(pCfg, "uptimeInterval")->i32; tsUptimeInterval = cfgGetItem(pCfg, "uptimeInterval")->i32;
tsQueryRsmaTolerance = cfgGetItem(pCfg, "queryRsmaTolerance")->i32; tsQueryRsmaTolerance = cfgGetItem(pCfg, "queryRsmaTolerance")->i32;
tsWalFsyncDataSizeLimit = cfgGetItem(pCfg, "walFsyncDataSizeLimit")->i64;
tsStartUdfd = cfgGetItem(pCfg, "udf")->bval; tsStartUdfd = cfgGetItem(pCfg, "udf")->bval;
tstrncpy(tsUdfdResFuncs, cfgGetItem(pCfg, "udfdResFuncs")->str, sizeof(tsUdfdResFuncs)); tstrncpy(tsUdfdResFuncs, cfgGetItem(pCfg, "udfdResFuncs")->str, sizeof(tsUdfdResFuncs));
tstrncpy(tsUdfdLdLibPath, cfgGetItem(pCfg, "udfdLdLibPath")->str, sizeof(tsUdfdLdLibPath)); tstrncpy(tsUdfdLdLibPath, cfgGetItem(pCfg, "udfdLdLibPath")->str, sizeof(tsUdfdLdLibPath));

View File

@ -55,7 +55,7 @@ void *dmSetMgmtHandle(SArray *pArray, tmsg_t msgType, void *nodeMsgFp, bool need
} }
void dmGetMonitorSystemInfo(SMonSysInfo *pInfo) { void dmGetMonitorSystemInfo(SMonSysInfo *pInfo) {
taosGetCpuUsage(&pInfo->cpu_engine, &pInfo->cpu_system); taosGetCpuUsage(&pInfo->cpu_system, &pInfo->cpu_engine);
taosGetCpuCores(&pInfo->cpu_cores); taosGetCpuCores(&pInfo->cpu_cores);
taosGetProcMemory(&pInfo->mem_engine); taosGetProcMemory(&pInfo->mem_engine);
taosGetSysMemory(&pInfo->mem_system); taosGetSysMemory(&pInfo->mem_system);

View File

@ -66,7 +66,7 @@ int32_t mndInitConsumer(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_SUBSCRIBE, mndProcessSubscribeReq); mndSetMsgHandle(pMnode, TDMT_MND_TMQ_SUBSCRIBE, mndProcessSubscribeReq);
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_HB, mndProcessMqHbReq); mndSetMsgHandle(pMnode, TDMT_MND_TMQ_HB, mndProcessMqHbReq);
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_ASK_EP, mndProcessAskEpReq); mndSetMsgHandle(pMnode, TDMT_MND_TMQ_ASK_EP, mndProcessAskEpReq);
mndSetMsgHandle(pMnode, TDMT_MND_MQ_TIMER, mndProcessMqTimerMsg); mndSetMsgHandle(pMnode, TDMT_MND_TMQ_TIMER, mndProcessMqTimerMsg);
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_CONSUMER_LOST, mndProcessConsumerLostMsg); mndSetMsgHandle(pMnode, TDMT_MND_TMQ_CONSUMER_LOST, mndProcessConsumerLostMsg);
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_CONSUMER_RECOVER, mndProcessConsumerRecoverMsg); mndSetMsgHandle(pMnode, TDMT_MND_TMQ_CONSUMER_RECOVER, mndProcessConsumerRecoverMsg);

View File

@ -105,7 +105,7 @@ static void mndCalMqRebalance(SMnode *pMnode) {
int32_t contLen = 0; int32_t contLen = 0;
void *pReq = mndBuildTimerMsg(&contLen); void *pReq = mndBuildTimerMsg(&contLen);
if (pReq != NULL) { if (pReq != NULL) {
SRpcMsg rpcMsg = {.msgType = TDMT_MND_MQ_TIMER, .pCont = pReq, .contLen = contLen}; SRpcMsg rpcMsg = {.msgType = TDMT_MND_TMQ_TIMER, .pCont = pReq, .contLen = contLen};
tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg); tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
} }
} }
@ -490,7 +490,7 @@ static int32_t mndCheckMnodeState(SRpcMsg *pMsg) {
SMnode *pMnode = pMsg->info.node; SMnode *pMnode = pMsg->info.node;
SSyncState state = syncGetState(pMnode->syncMgmt.sync); SSyncState state = syncGetState(pMnode->syncMgmt.sync);
if (pMsg->msgType == TDMT_MND_MQ_TIMER || pMsg->msgType == TDMT_MND_TELEM_TIMER || if (pMsg->msgType == TDMT_MND_TMQ_TIMER || pMsg->msgType == TDMT_MND_TELEM_TIMER ||
pMsg->msgType == TDMT_MND_TRANS_TIMER || pMsg->msgType == TDMT_MND_TTL_TIMER || pMsg->msgType == TDMT_MND_TRANS_TIMER || pMsg->msgType == TDMT_MND_TTL_TIMER ||
pMsg->msgType == TDMT_MND_UPTIME_TIMER) { pMsg->msgType == TDMT_MND_UPTIME_TIMER) {
mTrace("timer not process since mnode restored:%d stopped:%d, sync restored:%d role:%s ", pMnode->restored, mTrace("timer not process since mnode restored:%d stopped:%d, sync restored:%d role:%s ", pMnode->restored,

View File

@ -1268,7 +1268,7 @@ int32_t tqProcessSubmitReq(STQ* pTq, SSubmitReq* pReq, int64_t ver) {
if (pIter == NULL) break; if (pIter == NULL) break;
SStreamTask* pTask = *(SStreamTask**)pIter; SStreamTask* pTask = *(SStreamTask**)pIter;
if (pTask->taskLevel != TASK_LEVEL__SOURCE) continue; if (pTask->taskLevel != TASK_LEVEL__SOURCE) continue;
if (pTask->taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->taskStatus == TASK_STATUS__RECOVER1) { if (pTask->taskStatus == TASK_STATUS__RECOVER_PREPARE) {
tqDebug("skip push task %d, task status %d", pTask->taskId, pTask->taskStatus); tqDebug("skip push task %d, task status %d", pTask->taskId, pTask->taskStatus);
continue; continue;
} }

View File

@ -199,6 +199,9 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea
goto END; goto END;
} }
tqDebug("vgId:%d, taosx get msg ver %" PRId64 ", type: %s", pTq->pVnode->config.vgId, offset,
TMSG_INFO((*ppCkHead)->head.msgType));
if ((*ppCkHead)->head.msgType == TDMT_VND_SUBMIT) { if ((*ppCkHead)->head.msgType == TDMT_VND_SUBMIT) {
code = walFetchBody(pHandle->pWalReader, ppCkHead); code = walFetchBody(pHandle->pWalReader, ppCkHead);
@ -216,17 +219,20 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea
SWalCont* pHead = &((*ppCkHead)->head); SWalCont* pHead = &((*ppCkHead)->head);
if (IS_META_MSG(pHead->msgType)) { if (IS_META_MSG(pHead->msgType)) {
code = walFetchBody(pHandle->pWalReader, ppCkHead); code = walFetchBody(pHandle->pWalReader, ppCkHead);
if (code < 0) { if (code < 0) {
ASSERT(0); ASSERT(0);
*fetchOffset = offset; *fetchOffset = offset;
code = -1; code = -1;
goto END; goto END;
} }
if (isValValidForTable(pHandle, pHead)) { if (isValValidForTable(pHandle, pHead)) {
*fetchOffset = offset; *fetchOffset = offset;
code = 0; code = 0;
goto END; goto END;
} else {
offset++;
continue;
} }
} }
} }
@ -586,8 +592,39 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
taosHashPut(pExec->execHandle.execDb.pFilterOutTbUid, &tbUid, sizeof(int64_t), NULL, 0); taosHashPut(pExec->execHandle.execDb.pFilterOutTbUid, &tbUid, sizeof(int64_t), NULL, 0);
} }
} }
} else if (pExec->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
if (isAdd) {
SArray* qa = taosArrayInit(4, sizeof(tb_uid_t));
SMetaReader mr = {0};
metaReaderInit(&mr, pTq->pVnode->pMeta, 0);
for (int32_t i = 0; i < taosArrayGetSize(tbUidList); ++i) {
uint64_t* id = (uint64_t*)taosArrayGet(tbUidList, i);
int32_t code = metaGetTableEntryByUid(&mr, *id);
if (code != TSDB_CODE_SUCCESS) {
qError("failed to get table meta, uid:%" PRIu64 " code:%s", *id, tstrerror(terrno));
continue;
}
tDecoderClear(&mr.coder);
if (mr.me.type != TSDB_CHILD_TABLE || mr.me.ctbEntry.suid != pExec->execHandle.execTb.suid) {
tqDebug("table uid %" PRId64 " does not add to tq handle", *id);
continue;
}
tqDebug("table uid %" PRId64 " add to tq handle", *id);
taosArrayPush(qa, id);
}
metaReaderClear(&mr);
if (taosArrayGetSize(qa) > 0) {
tqReaderAddTbUidList(pExec->execHandle.pExecReader, qa);
}
taosArrayDestroy(qa);
} else {
// TODO handle delete table from stb
}
} else { } else {
// tq update id ASSERT(0);
} }
} }
while (1) { while (1) {

View File

@ -684,7 +684,7 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) {
if (*state->pDataFReader != NULL) { if (*state->pDataFReader != NULL) {
tsdbDataFReaderClose(state->pDataFReader); tsdbDataFReaderClose(state->pDataFReader);
resetLastBlockLoadInfo(state->pLoadInfo); // resetLastBlockLoadInfo(state->pLoadInfo);
} }
code = tsdbDataFReaderOpen(state->pDataFReader, state->pTsdb, pFileSet); code = tsdbDataFReaderOpen(state->pDataFReader, state->pTsdb, pFileSet);
@ -764,7 +764,7 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) {
if (--state->iBlock < 0) { if (--state->iBlock < 0) {
tsdbDataFReaderClose(state->pDataFReader); tsdbDataFReaderClose(state->pDataFReader);
*state->pDataFReader = NULL; *state->pDataFReader = NULL;
resetLastBlockLoadInfo(state->pLoadInfo); // resetLastBlockLoadInfo(state->pLoadInfo);
if (state->aBlockIdx) { if (state->aBlockIdx) {
taosArrayDestroy(state->aBlockIdx); taosArrayDestroy(state->aBlockIdx);

View File

@ -212,6 +212,12 @@ int vnodeCommit(SVnode *pVnode) {
vInfo("vgId:%d, start to commit, commit ID:%" PRId64 " version:%" PRId64, TD_VID(pVnode), pVnode->state.commitID, vInfo("vgId:%d, start to commit, commit ID:%" PRId64 " version:%" PRId64, TD_VID(pVnode), pVnode->state.commitID,
pVnode->state.applied); pVnode->state.applied);
// persist wal before starting
if (walPersist(pVnode->pWal) < 0) {
vError("vgId:%d, failed to persist wal since %s", TD_VID(pVnode), terrstr());
return -1;
}
pVnode->state.commitTerm = pVnode->state.applyTerm; pVnode->state.commitTerm = pVnode->state.applyTerm;
// save info // save info

View File

@ -285,8 +285,8 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
case TDMT_VND_COMMIT: case TDMT_VND_COMMIT:
goto _do_commit; goto _do_commit;
default: default:
ASSERT(0); vError("vgId:%d, unprocessed msg, %d", TD_VID(pVnode), pMsg->msgType);
break; return -1;
} }
vTrace("vgId:%d, process %s request, code:0x%x index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), pRsp->code, vTrace("vgId:%d, process %s request, code:0x%x index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), pRsp->code,

View File

@ -621,9 +621,10 @@ int32_t* setupColumnOffset(const SSDataBlock* pBlock, int32_t rowCapacity) {
} }
static void clearPartitionOperator(SPartitionOperatorInfo* pInfo) { static void clearPartitionOperator(SPartitionOperatorInfo* pInfo) {
void* ite = NULL; int32_t size = taosArrayGetSize(pInfo->sortedGroupArray);
while ((ite = taosHashIterate(pInfo->pGroupSet, ite)) != NULL) { for (int32_t i = 0; i < size; i++) {
taosArrayDestroy(((SDataGroupInfo*)ite)->pPageList); SDataGroupInfo* pGp = taosArrayGet(pInfo->sortedGroupArray, i);
taosArrayDestroy(pGp->pPageList);
} }
taosArrayClear(pInfo->sortedGroupArray); taosArrayClear(pInfo->sortedGroupArray);
clearDiskbasedBuf(pInfo->pBuf); clearDiskbasedBuf(pInfo->pBuf);

View File

@ -344,7 +344,8 @@ static bool doLoadBlockSMA(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock,
return true; return true;
} }
static void doSetTagColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo, int32_t rows) { static void doSetTagColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo,
int32_t rows) {
if (pTableScanInfo->pseudoSup.numOfExprs > 0) { if (pTableScanInfo->pseudoSup.numOfExprs > 0) {
SExprSupp* pSup = &pTableScanInfo->pseudoSup; SExprSupp* pSup = &pTableScanInfo->pseudoSup;
@ -1878,7 +1879,6 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
} }
#endif #endif
#if 1
if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE1 || if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE1 ||
pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE2) { pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE2) {
STableScanInfo* pTSInfo = pInfo->pTableScanOp->info; STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
@ -1914,7 +1914,6 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
return NULL; return NULL;
} }
#endif
size_t total = taosArrayGetSize(pInfo->pBlockLists); size_t total = taosArrayGetSize(pInfo->pBlockLists);
// TODO: refactor // TODO: refactor
@ -2296,7 +2295,7 @@ SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pT
pInfo->vnode = pHandle->vnode; pInfo->vnode = pHandle->vnode;
pInfo->sContext = pHandle->sContext; pInfo->sContext = pHandle->sContext;
pOperator->name = "RawStreamScanOperator"; pOperator->name = "RawScanOperator";
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
@ -4384,8 +4383,9 @@ static int32_t loadDataBlockFromOneTable2(SOperatorInfo* pOperator, STableMergeS
// currently only the tbname pseudo column // currently only the tbname pseudo column
if (pTableScanInfo->pseudoSup.numOfExprs > 0) { if (pTableScanInfo->pseudoSup.numOfExprs > 0) {
int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pTableScanInfo->pseudoSup.pExprInfo, int32_t code =
pTableScanInfo->pseudoSup.numOfExprs, pBlock, pBlock->info.rows, GET_TASKID(pTaskInfo)); addTagPseudoColumnData(&pTableScanInfo->readHandle, pTableScanInfo->pseudoSup.pExprInfo,
pTableScanInfo->pseudoSup.numOfExprs, pBlock, pBlock->info.rows, GET_TASKID(pTaskInfo));
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code); T_LONG_JMP(pTaskInfo->env, code);
} }
@ -4501,8 +4501,9 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc
// currently only the tbname pseudo column // currently only the tbname pseudo column
if (pTableScanInfo->pseudoSup.numOfExprs > 0) { if (pTableScanInfo->pseudoSup.numOfExprs > 0) {
int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pTableScanInfo->pseudoSup.pExprInfo, int32_t code =
pTableScanInfo->pseudoSup.numOfExprs, pBlock, pBlock->info.rows, GET_TASKID(pTaskInfo)); addTagPseudoColumnData(&pTableScanInfo->readHandle, pTableScanInfo->pseudoSup.pExprInfo,
pTableScanInfo->pseudoSup.numOfExprs, pBlock, pBlock->info.rows, GET_TASKID(pTaskInfo));
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code); T_LONG_JMP(pTaskInfo->env, code);
} }

View File

@ -124,13 +124,16 @@ static int32_t udfSpawnUdfd(SUdfdData *pData) {
char pathTaosdLdLib[512] = {0}; char pathTaosdLdLib[512] = {0};
size_t taosdLdLibPathLen = sizeof(pathTaosdLdLib); size_t taosdLdLibPathLen = sizeof(pathTaosdLdLib);
uv_os_getenv("LD_LIBRARY_PATH", pathTaosdLdLib, &taosdLdLibPathLen); int ret = uv_os_getenv("LD_LIBRARY_PATH", pathTaosdLdLib, &taosdLdLibPathLen);
if (ret != UV_ENOBUFS) {
taosdLdLibPathLen = strlen(pathTaosdLdLib);
}
char udfdPathLdLib[1024] = {0}; char udfdPathLdLib[1024] = {0};
size_t udfdLdLibPathLen = strlen(tsUdfdLdLibPath); size_t udfdLdLibPathLen = strlen(tsUdfdLdLibPath);
strncpy(udfdPathLdLib, tsUdfdLdLibPath, udfdLdLibPathLen); strncpy(udfdPathLdLib, tsUdfdLdLibPath, udfdLdLibPathLen);
udfdPathLdLib[udfdLdLibPathLen] = ':'; udfdPathLdLib[udfdLdLibPathLen] = ':';
strncpy(udfdPathLdLib + udfdLdLibPathLen + 1, pathTaosdLdLib, sizeof(udfdPathLdLib) - udfdLdLibPathLen); strncpy(udfdPathLdLib + udfdLdLibPathLen + 1, pathTaosdLdLib, sizeof(udfdPathLdLib) - udfdLdLibPathLen - 1);
if (udfdLdLibPathLen + taosdLdLibPathLen < 1024) { if (udfdLdLibPathLen + taosdLdLibPathLen < 1024) {
fnInfo("udfd LD_LIBRARY_PATH: %s", udfdPathLdLib); fnInfo("udfd LD_LIBRARY_PATH: %s", udfdPathLdLib);
} else { } else {

View File

@ -34,6 +34,7 @@ typedef struct {
int64_t createTs; int64_t createTs;
int64_t closeTs; int64_t closeTs;
int64_t fileSize; int64_t fileSize;
int64_t syncedOffset;
} SWalFileInfo; } SWalFileInfo;
typedef struct WalIdxEntry { typedef struct WalIdxEntry {
@ -66,6 +67,12 @@ static inline int64_t walGetLastFileSize(SWal* pWal) {
return pInfo->fileSize; return pInfo->fileSize;
} }
static inline int64_t walGetLastFileCachedSize(SWal* pWal) {
if (taosArrayGetSize(pWal->fileInfoSet) == 0) return 0;
SWalFileInfo* pInfo = (SWalFileInfo*)taosArrayGetLast(pWal->fileInfoSet);
return (pInfo->fileSize - pInfo->syncedOffset);
}
static inline int64_t walGetLastFileFirstVer(SWal* pWal) { static inline int64_t walGetLastFileFirstVer(SWal* pWal) {
if (taosArrayGetSize(pWal->fileInfoSet) == 0) return -1; if (taosArrayGetSize(pWal->fileInfoSet) == 0) return -1;
SWalFileInfo* pInfo = (SWalFileInfo*)taosArrayGetLast(pWal->fileInfoSet); SWalFileInfo* pInfo = (SWalFileInfo*)taosArrayGetLast(pWal->fileInfoSet);

View File

@ -16,6 +16,7 @@
#include "cJSON.h" #include "cJSON.h"
#include "os.h" #include "os.h"
#include "taoserror.h" #include "taoserror.h"
#include "tglobal.h"
#include "tutil.h" #include "tutil.h"
#include "walInt.h" #include "walInt.h"
@ -65,32 +66,43 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx) {
// ensure size as non-negative // ensure size as non-negative
pFileInfo->fileSize = TMAX(0, pFileInfo->fileSize); pFileInfo->fileSize = TMAX(0, pFileInfo->fileSize);
int64_t stepSize = WAL_SCAN_BUF_SIZE;
uint64_t magic = WAL_MAGIC; uint64_t magic = WAL_MAGIC;
int64_t walCkHeadSz = sizeof(SWalCkHead); int64_t walCkHeadSz = sizeof(SWalCkHead);
int64_t end = fileSize; int64_t end = fileSize;
int64_t offset = 0;
int64_t capacity = 0; int64_t capacity = 0;
int64_t readSize = 0; int64_t readSize = 0;
char* buf = NULL; char* buf = NULL;
int64_t found = -1;
bool firstTrial = pFileInfo->fileSize < fileSize; bool firstTrial = pFileInfo->fileSize < fileSize;
int64_t offset = TMIN(pFileInfo->fileSize, fileSize);
int64_t offsetForward = offset - stepSize + walCkHeadSz - 1;
int64_t offsetBackward = offset;
int64_t retVer = -1;
int64_t lastEntryBeginOffset = 0;
int64_t lastEntryEndOffset = 0;
// check recover size
if (2 * tsWalFsyncDataSizeLimit + offset < end) {
wWarn("vgId:%d, possibly corrupted WAL range exceeds size limit (i.e. %" PRId64 " bytes). offset:%" PRId64
", end:%" PRId64 ", file:%s",
pWal->cfg.vgId, 2 * tsWalFsyncDataSizeLimit, offset, end, fnameStr);
}
// search for the valid last WAL entry, e.g. block by block // search for the valid last WAL entry, e.g. block by block
while (1) { while (1) {
offset = (firstTrial) ? pFileInfo->fileSize : TMAX(0, end - WAL_SCAN_BUF_SIZE); offset = (firstTrial) ? TMIN(fileSize, offsetForward + stepSize - walCkHeadSz + 1)
: TMAX(0, offsetBackward - stepSize + walCkHeadSz - 1);
end = TMIN(offset + stepSize, fileSize);
if (firstTrial) {
offsetForward = offset;
} else {
offsetBackward = offset;
}
ASSERT(offset <= end); ASSERT(offset <= end);
readSize = end - offset; readSize = end - offset;
capacity = readSize + sizeof(magic); capacity = readSize + sizeof(magic);
int64_t limit = WAL_RECOV_SIZE_LIMIT;
if (limit < readSize) {
wError("vgId:%d, possibly corrupted WAL range exceeds size limit (i.e. %" PRId64 " bytes). offset:%" PRId64
", end:%" PRId64 ", file:%s",
pWal->cfg.vgId, limit, offset, end, fnameStr);
terrno = TSDB_CODE_WAL_SIZE_LIMIT;
goto _err;
}
void* ptr = taosMemoryRealloc(buf, capacity); void* ptr = taosMemoryRealloc(buf, capacity);
if (ptr == NULL) { if (ptr == NULL) {
terrno = TSDB_CODE_WAL_OUT_OF_MEMORY; terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
@ -127,6 +139,7 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx) {
} }
logContent = (SWalCkHead*)(buf + pos); logContent = (SWalCkHead*)(buf + pos);
if (walValidHeadCksum(logContent) != 0) { if (walValidHeadCksum(logContent) != 0) {
terrno = TSDB_CODE_WAL_CHKSUM_MISMATCH;
wWarn("vgId:%d, failed to validate checksum of wal entry header. offset:%" PRId64 ", file:%s", pWal->cfg.vgId, wWarn("vgId:%d, failed to validate checksum of wal entry header. offset:%" PRId64 ", file:%s", pWal->cfg.vgId,
offset + pos, fnameStr); offset + pos, fnameStr);
haystack = buf + pos + 1; haystack = buf + pos + 1;
@ -179,46 +192,41 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx) {
} }
// found one // found one
found = pos; retVer = logContent->head.version;
lastEntryBeginOffset = offset + pos;
lastEntryEndOffset = offset + pos + sizeof(SWalCkHead) + logContent->head.bodyLen;
// try next
haystack = buf + pos + 1; haystack = buf + pos + 1;
} }
if (found >= 0 || offset == 0) break; if (end == fileSize) firstTrial = false;
if (firstTrial && terrno == TSDB_CODE_SUCCESS) continue;
// go backwards, e.g. by at most one WAL scan buf size if (retVer >= 0 || offset == 0) break;
end = TMIN(offset + walCkHeadSz - 1, fileSize);
firstTrial = false;
} }
// determine end of last entry if (retVer < 0) {
SWalCkHead* lastEntry = (found >= 0) ? (SWalCkHead*)(buf + found) : NULL;
int64_t retVer = -1;
int64_t lastEntryBeginOffset = 0;
int64_t lastEntryEndOffset = 0;
if (lastEntry == NULL) {
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST; terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
} else {
retVer = lastEntry->head.version;
lastEntryBeginOffset = offset + (int64_t)((char*)lastEntry - (char*)buf);
lastEntryEndOffset = lastEntryBeginOffset + sizeof(SWalCkHead) + lastEntry->head.bodyLen;
} }
// truncate file // truncate file
if (lastEntryEndOffset != fileSize) { if (lastEntryEndOffset != fileSize) {
wWarn("vgId:%d, repair meta truncate file %s to %" PRId64 ", orig size %" PRId64, pWal->cfg.vgId, fnameStr, wWarn("vgId:%d, repair meta truncate file %s to %" PRId64 ", orig size %" PRId64, pWal->cfg.vgId, fnameStr,
lastEntryEndOffset, fileSize); lastEntryEndOffset, fileSize);
if (taosFtruncateFile(pFile, lastEntryEndOffset) < 0) { if (taosFtruncateFile(pFile, lastEntryEndOffset) < 0) {
wError("failed to truncate file due to %s. file:%s", strerror(errno), fnameStr); wError("failed to truncate file due to %s. file:%s", strerror(errno), fnameStr);
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
goto _err; goto _err;
} }
if (taosFsyncFile(pFile) < 0) { if (taosFsyncFile(pFile) < 0) {
wError("failed to fsync file due to %s. file:%s", strerror(errno), fnameStr); wError("failed to fsync file due to %s. file:%s", strerror(errno), fnameStr);
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
goto _err; goto _err;
} }
} }
pFileInfo->fileSize = lastEntryEndOffset; pFileInfo->fileSize = lastEntryEndOffset;
taosCloseFile(&pFile); taosCloseFile(&pFile);
@ -621,6 +629,7 @@ int walRollFileInfo(SWal* pWal) {
pNewInfo->createTs = ts; pNewInfo->createTs = ts;
pNewInfo->closeTs = -1; pNewInfo->closeTs = -1;
pNewInfo->fileSize = 0; pNewInfo->fileSize = 0;
pNewInfo->syncedOffset = 0;
taosArrayPush(pArray, pNewInfo); taosArrayPush(pArray, pNewInfo);
taosMemoryFree(pNewInfo); taosMemoryFree(pNewInfo);
return 0; return 0;
@ -771,6 +780,12 @@ static int walFindCurMetaVer(SWal* pWal) {
return metaVer; return metaVer;
} }
void walUpdateSyncedOffset(SWal* pWal) {
SWalFileInfo* pFileInfo = walGetCurFileInfo(pWal);
if (pFileInfo == NULL) return;
pFileInfo->syncedOffset = pFileInfo->fileSize;
}
int walSaveMeta(SWal* pWal) { int walSaveMeta(SWal* pWal) {
int metaVer = walFindCurMetaVer(pWal); int metaVer = walFindCurMetaVer(pWal);
char fnameStr[WAL_FILE_LEN]; char fnameStr[WAL_FILE_LEN];
@ -790,6 +805,9 @@ int walSaveMeta(SWal* pWal) {
return -1; return -1;
} }
// update synced offset
(void)walUpdateSyncedOffset(pWal);
// flush to a tmpfile // flush to a tmpfile
n = walBuildTmpMetaName(pWal, tmpFnameStr); n = walBuildTmpMetaName(pWal, tmpFnameStr);
ASSERT(n < sizeof(tmpFnameStr) && "Buffer overflow of file name"); ASSERT(n < sizeof(tmpFnameStr) && "Buffer overflow of file name");

View File

@ -187,6 +187,13 @@ int32_t walAlter(SWal *pWal, SWalCfg *pCfg) {
return 0; return 0;
} }
int32_t walPersist(SWal *pWal) {
taosThreadMutexLock(&pWal->mutex);
int32_t ret = walSaveMeta(pWal);
taosThreadMutexUnlock(&pWal->mutex);
return ret;
}
void walClose(SWal *pWal) { void walClose(SWal *pWal) {
taosThreadMutexLock(&pWal->mutex); taosThreadMutexLock(&pWal->mutex);
(void)walSaveMeta(pWal); (void)walSaveMeta(pWal);

View File

@ -198,7 +198,7 @@ int32_t walReadSeekVerImpl(SWalReader *pReader, int64_t ver) {
return -1; return -1;
} }
wDebug("vgId:%d, wal version reset from index:%" PRId64 "(invalid:%d) to index:%" PRId64, pReader->pWal->cfg.vgId, wDebug("vgId:%d, wal version reset from %" PRId64 "(invalid:%d) to %" PRId64, pReader->pWal->cfg.vgId,
pReader->curVersion, pReader->curInvalid, ver); pReader->curVersion, pReader->curInvalid, ver);
pReader->curVersion = ver; pReader->curVersion = ver;
@ -347,22 +347,47 @@ static int32_t walSkipFetchBodyNew(SWalReader *pRead) {
int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) { int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) {
int64_t code; int64_t code;
int64_t contLen;
bool seeked = false;
wDebug("vgId:%d try to fetch ver %" PRId64 ", first ver:%" PRId64 ", commit ver:%" PRId64 ", last ver:%" PRId64
", applied ver:%" PRId64,
pRead->pWal->cfg.vgId, ver, pRead->pWal->vers.firstVer, pRead->pWal->vers.commitVer, pRead->pWal->vers.lastVer,
pRead->pWal->vers.appliedVer);
// TODO: valid ver // TODO: valid ver
if (ver > pRead->pWal->vers.commitVer) { if (ver > pRead->pWal->vers.appliedVer) {
return -1; return -1;
} }
if (pRead->curInvalid || pRead->curVersion != ver) { if (pRead->curInvalid || pRead->curVersion != ver) {
code = walReadSeekVer(pRead, ver); code = walReadSeekVer(pRead, ver);
if (code < 0) return -1; if (code < 0) {
pRead->curVersion = ver;
pRead->curInvalid = 1;
return -1;
}
seeked = true;
} }
ASSERT(taosValidFile(pRead->pLogFile) == true); while (1) {
contLen = taosReadFile(pRead->pLogFile, pHead, sizeof(SWalCkHead));
code = taosReadFile(pRead->pLogFile, pHead, sizeof(SWalCkHead)); if (contLen == sizeof(SWalCkHead)) {
if (code != sizeof(SWalCkHead)) { break;
return -1; } else if (contLen == 0 && !seeked) {
walReadSeekVerImpl(pRead, ver);
seeked = true;
continue;
} else {
if (contLen < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
} else {
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
}
ASSERT(0);
pRead->curInvalid = 1;
return -1;
}
} }
code = walValidHeadCksum(pHead); code = walValidHeadCksum(pHead);
@ -373,13 +398,20 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) {
return -1; return -1;
} }
pRead->curInvalid = 0;
return 0; return 0;
} }
int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead) { int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead) {
int64_t code; int64_t code;
// ASSERT(pRead->curVersion == pHead->head.version); wDebug("vgId:%d skip fetch body %" PRId64 ", first ver:%" PRId64 ", commit ver:%" PRId64 ", last ver:%" PRId64
", applied ver:%" PRId64,
pRead->pWal->cfg.vgId, pHead->head.version, pRead->pWal->vers.firstVer, pRead->pWal->vers.commitVer,
pRead->pWal->vers.lastVer, pRead->pWal->vers.appliedVer);
ASSERT(pRead->curVersion == pHead->head.version);
ASSERT(pRead->curInvalid == 0);
code = taosLSeekFile(pRead->pLogFile, pHead->head.bodyLen, SEEK_CUR); code = taosLSeekFile(pRead->pLogFile, pHead->head.bodyLen, SEEK_CUR);
if (code < 0) { if (code < 0) {
@ -397,6 +429,11 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) {
SWalCont *pReadHead = &((*ppHead)->head); SWalCont *pReadHead = &((*ppHead)->head);
int64_t ver = pReadHead->version; int64_t ver = pReadHead->version;
wDebug("vgId:%d fetch body %" PRId64 ", first ver:%" PRId64 ", commit ver:%" PRId64 ", last ver:%" PRId64
", applied ver:%" PRId64,
pRead->pWal->cfg.vgId, ver, pRead->pWal->vers.firstVer, pRead->pWal->vers.commitVer, pRead->pWal->vers.lastVer,
pRead->pWal->vers.appliedVer);
if (pRead->capacity < pReadHead->bodyLen) { if (pRead->capacity < pReadHead->bodyLen) {
SWalCkHead *ptr = (SWalCkHead *)taosMemoryRealloc(*ppHead, sizeof(SWalCkHead) + pReadHead->bodyLen); SWalCkHead *ptr = (SWalCkHead *)taosMemoryRealloc(*ppHead, sizeof(SWalCkHead) + pReadHead->bodyLen);
if (ptr == NULL) { if (ptr == NULL) {
@ -409,19 +446,32 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) {
} }
if (pReadHead->bodyLen != taosReadFile(pRead->pLogFile, pReadHead->body, pReadHead->bodyLen)) { if (pReadHead->bodyLen != taosReadFile(pRead->pLogFile, pReadHead->body, pReadHead->bodyLen)) {
if (pReadHead->bodyLen < 0) {
ASSERT(0);
terrno = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64 ", since %s",
pRead->pWal->cfg.vgId, pReadHead->version, ver, tstrerror(terrno));
} else {
wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64 ", since file corrupted",
pRead->pWal->cfg.vgId, pReadHead->version, ver);
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
}
pRead->curInvalid = 1;
ASSERT(0); ASSERT(0);
return -1; return -1;
} }
if (pReadHead->version != ver) { if (pReadHead->version != ver) {
ASSERT(0);
wError("vgId:%d, wal fetch body error, index:%" PRId64 ", read request index:%" PRId64, pRead->pWal->cfg.vgId, wError("vgId:%d, wal fetch body error, index:%" PRId64 ", read request index:%" PRId64, pRead->pWal->cfg.vgId,
pRead->pHead->head.version, ver); pReadHead->version, ver);
pRead->curInvalid = 1; pRead->curInvalid = 1;
terrno = TSDB_CODE_WAL_FILE_CORRUPTED; terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
return -1; return -1;
} }
if (walValidBodyCksum(*ppHead) != 0) { if (walValidBodyCksum(*ppHead) != 0) {
ASSERT(0);
wError("vgId:%d, wal fetch body error, index:%" PRId64 ", since body checksum not passed", pRead->pWal->cfg.vgId, wError("vgId:%d, wal fetch body error, index:%" PRId64 ", since body checksum not passed", pRead->pWal->cfg.vgId,
ver); ver);
pRead->curInvalid = 1; pRead->curInvalid = 1;

View File

@ -16,6 +16,7 @@
#include "os.h" #include "os.h"
#include "taoserror.h" #include "taoserror.h"
#include "tchecksum.h" #include "tchecksum.h"
#include "tglobal.h"
#include "walInt.h" #include "walInt.h"
int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) { int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) {
@ -252,23 +253,36 @@ static FORCE_INLINE int32_t walCheckAndRoll(SWal *pWal) {
} }
} }
if (walGetLastFileCachedSize(pWal) > tsWalFsyncDataSizeLimit) {
if (walSaveMeta(pWal) < 0) {
return -1;
}
}
return 0; return 0;
} }
int32_t walBeginSnapshot(SWal *pWal, int64_t ver) { int32_t walBeginSnapshot(SWal *pWal, int64_t ver) {
taosThreadMutexLock(&pWal->mutex);
pWal->vers.verInSnapshotting = ver; pWal->vers.verInSnapshotting = ver;
wDebug("vgId:%d, wal begin snapshot for version %" PRId64 ", first ver %" PRId64 ", last ver %" PRId64, wDebug("vgId:%d, wal begin snapshot for version %" PRId64 ", first ver %" PRId64 ", last ver %" PRId64,
pWal->cfg.vgId, ver, pWal->vers.firstVer, pWal->vers.lastVer); pWal->cfg.vgId, ver, pWal->vers.firstVer, pWal->vers.lastVer);
// check file rolling // check file rolling
if (pWal->cfg.retentionPeriod == 0) { if (pWal->cfg.retentionPeriod == 0) {
taosThreadMutexLock(&pWal->mutex);
if (walGetLastFileSize(pWal) != 0) { if (walGetLastFileSize(pWal) != 0) {
walRollImpl(pWal); if (walRollImpl(pWal) < 0) {
wError("vgId:%d, failed to roll wal files since %s", pWal->cfg.vgId, terrstr());
goto _err;
}
} }
taosThreadMutexUnlock(&pWal->mutex);
} }
taosThreadMutexUnlock(&pWal->mutex);
return 0; return 0;
_err:
taosThreadMutexUnlock(&pWal->mutex);
return -1;
} }
int32_t walEndSnapshot(SWal *pWal) { int32_t walEndSnapshot(SWal *pWal) {

View File

@ -227,9 +227,7 @@ void taosGetSystemInfo() {
#ifdef WINDOWS #ifdef WINDOWS
taosGetCpuCores(&tsNumOfCores); taosGetCpuCores(&tsNumOfCores);
taosGetTotalMemory(&tsTotalMemoryKB); taosGetTotalMemory(&tsTotalMemoryKB);
taosGetCpuUsage(NULL, NULL);
double tmp1, tmp2, tmp3, tmp4;
taosGetCpuUsage(&tmp1, &tmp2);
#elif defined(_TD_DARWIN_64) #elif defined(_TD_DARWIN_64)
long physical_pages = sysconf(_SC_PHYS_PAGES); long physical_pages = sysconf(_SC_PHYS_PAGES);
long page_size = sysconf(_SC_PAGESIZE); long page_size = sysconf(_SC_PAGESIZE);
@ -240,9 +238,7 @@ void taosGetSystemInfo() {
taosGetProcIOnfos(); taosGetProcIOnfos();
taosGetCpuCores(&tsNumOfCores); taosGetCpuCores(&tsNumOfCores);
taosGetTotalMemory(&tsTotalMemoryKB); taosGetTotalMemory(&tsTotalMemoryKB);
taosGetCpuUsage(NULL, NULL);
double tmp1, tmp2, tmp3, tmp4;
taosGetCpuUsage(&tmp1, &tmp2);
#endif #endif
} }
@ -447,8 +443,8 @@ void taosGetCpuUsage(double *cpu_system, double *cpu_engine) {
static int64_t curSysTotal = 0; static int64_t curSysTotal = 0;
static int64_t curProcTotal = 0; static int64_t curProcTotal = 0;
*cpu_system = 0; if (cpu_system != NULL) *cpu_system = 0;
*cpu_engine = 0; if (cpu_engine != NULL) *cpu_engine = 0;
SysCpuInfo sysCpu = {0}; SysCpuInfo sysCpu = {0};
ProcCpuInfo procCpu = {0}; ProcCpuInfo procCpu = {0};
@ -458,8 +454,12 @@ void taosGetCpuUsage(double *cpu_system, double *cpu_engine) {
curProcTotal = procCpu.utime + procCpu.stime + procCpu.cutime + procCpu.cstime; curProcTotal = procCpu.utime + procCpu.stime + procCpu.cutime + procCpu.cstime;
if (curSysTotal > lastSysTotal && curSysUsed >= lastSysUsed && curProcTotal >= lastProcTotal) { if (curSysTotal > lastSysTotal && curSysUsed >= lastSysUsed && curProcTotal >= lastProcTotal) {
*cpu_engine = (curSysUsed - lastSysUsed) / (double)(curSysTotal - lastSysTotal) * 100; if (cpu_system != NULL) {
*cpu_system = (curProcTotal - lastProcTotal) / (double)(curSysTotal - lastSysTotal) * 100; *cpu_system = (curSysUsed - lastSysUsed) / (double)(curSysTotal - lastSysTotal) * 100;
}
if (cpu_engine != NULL) {
*cpu_engine = (curProcTotal - lastProcTotal) / (double)(curSysTotal - lastSysTotal) * 100;
}
} }
lastSysUsed = curSysUsed; lastSysUsed = curSysUsed;