Merge branch '3.0' of https://github.com/taosdata/TDengine into feat/async_commit
This commit is contained in:
commit
df6eb27b75
|
@ -64,6 +64,11 @@ extern int32_t tsNumOfSnodeStreamThreads;
|
|||
extern int32_t tsNumOfSnodeWriteThreads;
|
||||
extern int64_t tsRpcQueueMemoryAllowed;
|
||||
|
||||
// sync raft
|
||||
extern int32_t tsElectInterval;
|
||||
extern int32_t tsHeartbeatInterval;
|
||||
extern int32_t tsHeartbeatTimeout;
|
||||
|
||||
// monitor
|
||||
extern bool tsEnableMonitor;
|
||||
extern int32_t tsMonitorInterval;
|
||||
|
@ -126,9 +131,9 @@ extern char tsUdfdResFuncs[];
|
|||
extern char tsUdfdLdLibPath[];
|
||||
|
||||
// schemaless
|
||||
extern char tsSmlChildTableName[];
|
||||
extern char tsSmlTagName[];
|
||||
extern bool tsSmlDataFormat;
|
||||
extern char tsSmlChildTableName[];
|
||||
extern char tsSmlTagName[];
|
||||
extern bool tsSmlDataFormat;
|
||||
extern int32_t tsSmlBatchSize;
|
||||
|
||||
// wal
|
||||
|
@ -146,7 +151,7 @@ extern int32_t tsUptimeInterval;
|
|||
extern int32_t tsRpcRetryLimit;
|
||||
extern int32_t tsRpcRetryInterval;
|
||||
|
||||
//#define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize)
|
||||
// #define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize)
|
||||
|
||||
int32_t taosCreateLog(const char *logname, int32_t logFileNum, const char *cfgDir, const char **envCmd,
|
||||
const char *envFile, char *apolloUrl, SArray *pArgs, bool tsc);
|
||||
|
|
|
@ -43,7 +43,7 @@ extern "C" {
|
|||
#define SYNC_MAX_RETRY_BACKOFF 5
|
||||
#define SYNC_LOG_REPL_RETRY_WAIT_MS 100
|
||||
#define SYNC_APPEND_ENTRIES_TIMEOUT_MS 10000
|
||||
#define SYNC_HEART_TIMEOUT_MS 1000 * 8
|
||||
#define SYNC_HEART_TIMEOUT_MS 1000 * 15
|
||||
|
||||
#define SYNC_HEARTBEAT_SLOW_MS 1500
|
||||
#define SYNC_HEARTBEAT_REPLY_SLOW_MS 1500
|
||||
|
|
|
@ -55,6 +55,11 @@ int32_t tsNumOfQnodeFetchThreads = 1;
|
|||
int32_t tsNumOfSnodeStreamThreads = 4;
|
||||
int32_t tsNumOfSnodeWriteThreads = 1;
|
||||
|
||||
// sync raft
|
||||
int32_t tsElectInterval = 25 * 1000;
|
||||
int32_t tsHeartbeatInterval = 1000;
|
||||
int32_t tsHeartbeatTimeout = 20 * 1000;
|
||||
|
||||
// monitor
|
||||
bool tsEnableMonitor = true;
|
||||
int32_t tsMonitorInterval = 30;
|
||||
|
@ -74,8 +79,8 @@ char tsSmlTagName[TSDB_COL_NAME_LEN] = "_tag_null";
|
|||
char tsSmlChildTableName[TSDB_TABLE_NAME_LEN] = ""; // user defined child table name can be specified in tag value.
|
||||
// If set to empty system will generate table name using MD5 hash.
|
||||
// true means that the name and order of cols in each line are the same(only for influx protocol)
|
||||
bool tsSmlDataFormat = false;
|
||||
int32_t tsSmlBatchSize = 10000;
|
||||
bool tsSmlDataFormat = false;
|
||||
int32_t tsSmlBatchSize = 10000;
|
||||
|
||||
// query
|
||||
int32_t tsQueryPolicy = 1;
|
||||
|
@ -198,9 +203,7 @@ int32_t taosSetTfsCfg(SConfig *pCfg) {
|
|||
int32_t taosSetTfsCfg(SConfig *pCfg);
|
||||
#endif
|
||||
|
||||
struct SConfig *taosGetCfg() {
|
||||
return tsCfg;
|
||||
}
|
||||
struct SConfig *taosGetCfg() { return tsCfg; }
|
||||
|
||||
static int32_t taosLoadCfg(SConfig *pCfg, const char **envCmd, const char *inputCfgDir, const char *envFile,
|
||||
char *apolloUrl) {
|
||||
|
@ -423,6 +426,10 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
|
|||
if (cfgAddInt64(pCfg, "rpcQueueMemoryAllowed", tsRpcQueueMemoryAllowed, TSDB_MAX_MSG_SIZE * 10L, INT64_MAX, 0) != 0)
|
||||
return -1;
|
||||
|
||||
if (cfgAddInt32(pCfg, "syncElectInterval", tsElectInterval, 10, 1000 * 60 * 24 * 2, 0) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "syncHeartbeatInterval", tsHeartbeatInterval, 10, 1000 * 60 * 24 * 2, 0) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "syncHeartbeatTimeout", tsHeartbeatTimeout, 10, 1000 * 60 * 24 * 2, 0) != 0) return -1;
|
||||
|
||||
if (cfgAddBool(pCfg, "monitor", tsEnableMonitor, 0) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "monitorInterval", tsMonitorInterval, 1, 200000, 0) != 0) return -1;
|
||||
if (cfgAddString(pCfg, "monitorFqdn", tsMonitorFqdn, 0) != 0) return -1;
|
||||
|
@ -728,6 +735,10 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
|
|||
tstrncpy(tsTelemServer, cfgGetItem(pCfg, "telemetryServer")->str, TSDB_FQDN_LEN);
|
||||
tsTelemPort = (uint16_t)cfgGetItem(pCfg, "telemetryPort")->i32;
|
||||
|
||||
tsElectInterval = cfgGetItem(pCfg, "syncElectInterval")->i32;
|
||||
tsHeartbeatInterval = cfgGetItem(pCfg, "syncHeartbeatInterval")->i32;
|
||||
tsHeartbeatTimeout = cfgGetItem(pCfg, "syncHeartbeatTimeout")->i32;
|
||||
|
||||
tsTransPullupInterval = cfgGetItem(pCfg, "transPullupInterval")->i32;
|
||||
tsMqRebalanceInterval = cfgGetItem(pCfg, "mqRebalanceInterval")->i32;
|
||||
tsTtlUnit = cfgGetItem(pCfg, "ttlUnit")->i32;
|
||||
|
@ -737,6 +748,10 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
|
|||
|
||||
tsWalFsyncDataSizeLimit = cfgGetItem(pCfg, "walFsyncDataSizeLimit")->i64;
|
||||
|
||||
tsElectInterval = cfgGetItem(pCfg, "syncElectInterval")->i32;
|
||||
tsHeartbeatInterval = cfgGetItem(pCfg, "syncHeartbeatInterval")->i32;
|
||||
tsHeartbeatTimeout = cfgGetItem(pCfg, "syncHeartbeatTimeout")->i32;
|
||||
|
||||
tsStartUdfd = cfgGetItem(pCfg, "udf")->bval;
|
||||
tstrncpy(tsUdfdResFuncs, cfgGetItem(pCfg, "udfdResFuncs")->str, sizeof(tsUdfdResFuncs));
|
||||
tstrncpy(tsUdfdLdLibPath, cfgGetItem(pCfg, "udfdLdLibPath")->str, sizeof(tsUdfdLdLibPath));
|
||||
|
|
|
@ -170,7 +170,9 @@ struct STsdbReader {
|
|||
SIOCostSummary cost;
|
||||
STSchema* pSchema; // the newest version schema
|
||||
STSchema* pMemSchema; // the previous schema for in-memory data, to avoid load schema too many times
|
||||
SDataFReader* pFileReader;
|
||||
SDataFReader* pFileReader; // the file reader
|
||||
SDelFReader* pDelFReader; // the del file reader
|
||||
SArray* pDelIdx; // del file block index;
|
||||
SVersionRange verRange;
|
||||
SBlockInfoBuf blockInfoBuf;
|
||||
int32_t step;
|
||||
|
@ -2531,42 +2533,18 @@ int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader*
|
|||
}
|
||||
|
||||
int32_t code = 0;
|
||||
STsdb* pTsdb = pReader->pTsdb;
|
||||
|
||||
SArray* pDelData = taosArrayInit(4, sizeof(SDelData));
|
||||
ASSERT(pReader->pReadSnap != NULL);
|
||||
|
||||
SDelFile* pDelFile = pReader->pReadSnap->fs.pDelFile;
|
||||
if (pDelFile) {
|
||||
SDelFReader* pDelFReader = NULL;
|
||||
code = tsdbDelFReaderOpen(&pDelFReader, pDelFile, pTsdb);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _err;
|
||||
}
|
||||
|
||||
SArray* aDelIdx = taosArrayInit(4, sizeof(SDelIdx));
|
||||
if (aDelIdx == NULL) {
|
||||
tsdbDelFReaderClose(&pDelFReader);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
// TODO: opt the perf of read del index
|
||||
code = tsdbReadDelIdx(pDelFReader, aDelIdx);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
taosArrayDestroy(aDelIdx);
|
||||
tsdbDelFReaderClose(&pDelFReader);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
if (pDelFile && taosArrayGetSize(pReader->pDelIdx) > 0) {
|
||||
SDelIdx idx = {.suid = pReader->suid, .uid = pBlockScanInfo->uid};
|
||||
SDelIdx* pIdx = taosArraySearch(aDelIdx, &idx, tCmprDelIdx, TD_EQ);
|
||||
SDelIdx* pIdx = taosArraySearch(pReader->pDelIdx, &idx, tCmprDelIdx, TD_EQ);
|
||||
|
||||
if (pIdx != NULL) {
|
||||
code = tsdbReadDelData(pDelFReader, pIdx, pDelData);
|
||||
code = tsdbReadDelData(pReader->pDelFReader, pIdx, pDelData);
|
||||
}
|
||||
|
||||
taosArrayDestroy(aDelIdx);
|
||||
tsdbDelFReaderClose(&pDelFReader);
|
||||
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _err;
|
||||
}
|
||||
|
@ -2662,6 +2640,30 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum) {
|
|||
}
|
||||
|
||||
taosArrayDestroy(pIndexList);
|
||||
|
||||
if (pReader->pReadSnap != NULL) {
|
||||
|
||||
SDelFile* pDelFile = pReader->pReadSnap->fs.pDelFile;
|
||||
if (pReader->pDelFReader == NULL && pDelFile != NULL) {
|
||||
int32_t code = tsdbDelFReaderOpen(&pReader->pDelFReader, pDelFile, pReader->pTsdb);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
pReader->pDelIdx = taosArrayInit(4, sizeof(SDelIdx));
|
||||
if (pReader->pDelIdx == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return code;
|
||||
}
|
||||
|
||||
code = tsdbReadDelIdx(pReader->pDelFReader, pReader->pDelIdx);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
taosArrayDestroy(pReader->pDelIdx);
|
||||
return code;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -3912,6 +3914,15 @@ void tsdbReaderClose(STsdbReader* pReader) {
|
|||
tsdbDataFReaderClose(&pReader->pFileReader);
|
||||
}
|
||||
|
||||
if (pReader->pDelFReader != NULL) {
|
||||
tsdbDelFReaderClose(&pReader->pDelFReader);
|
||||
}
|
||||
|
||||
if (pReader->pDelIdx != NULL) {
|
||||
taosArrayDestroy(pReader->pDelIdx);
|
||||
pReader->pDelIdx = NULL;
|
||||
}
|
||||
|
||||
tsdbUntakeReadSnap(pReader->pTsdb, pReader->pReadSnap, pReader->idStr);
|
||||
|
||||
taosMemoryFree(pReader->status.uidCheckInfo.tableUidList);
|
||||
|
@ -3953,6 +3964,9 @@ static bool doTsdbNextDataBlock(STsdbReader* pReader) {
|
|||
blockDataCleanup(pBlock);
|
||||
|
||||
SReaderStatus* pStatus = &pReader->status;
|
||||
if (taosHashGetSize(pStatus->pTableMap) == 0){
|
||||
return false;
|
||||
}
|
||||
|
||||
if (pStatus->loadFromFile) {
|
||||
int32_t code = buildBlockFromFiles(pReader);
|
||||
|
@ -3970,8 +3984,6 @@ static bool doTsdbNextDataBlock(STsdbReader* pReader) {
|
|||
buildBlockFromBufferSequentially(pReader);
|
||||
return pBlock->info.rows > 0;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
bool tsdbNextDataBlock(STsdbReader* pReader) {
|
||||
|
|
|
@ -1477,9 +1477,8 @@ int32_t tsdbDelFReaderClose(SDelFReader **ppReader) {
|
|||
}
|
||||
taosMemoryFree(pReader);
|
||||
}
|
||||
*ppReader = NULL;
|
||||
|
||||
_exit:
|
||||
*ppReader = NULL;
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
|
@ -22,8 +22,8 @@
|
|||
#include "syncEnv.h"
|
||||
#include "syncIndexMgr.h"
|
||||
#include "syncInt.h"
|
||||
#include "syncPipeline.h"
|
||||
#include "syncMessage.h"
|
||||
#include "syncPipeline.h"
|
||||
#include "syncRaftCfg.h"
|
||||
#include "syncRaftLog.h"
|
||||
#include "syncRaftStore.h"
|
||||
|
@ -35,6 +35,7 @@
|
|||
#include "syncTimeout.h"
|
||||
#include "syncUtil.h"
|
||||
#include "syncVoteMgr.h"
|
||||
#include "tglobal.h"
|
||||
#include "tref.h"
|
||||
|
||||
static void syncNodeEqPingTimer(void* param, void* tmrId);
|
||||
|
@ -1115,7 +1116,9 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
|
|||
pSyncNode->hbrSlowNum = 0;
|
||||
pSyncNode->tmrRoutineNum = 0;
|
||||
|
||||
sNTrace(pSyncNode, "sync open, node:%p", pSyncNode);
|
||||
sNInfo(pSyncNode, "sync open, node:%p", pSyncNode);
|
||||
sTrace("vgId:%d, tsElectInterval:%d, tsHeartbeatInterval:%d, tsHeartbeatTimeout:%d", pSyncNode->vgId, tsElectInterval,
|
||||
tsHeartbeatInterval, tsHeartbeatTimeout);
|
||||
|
||||
return pSyncNode;
|
||||
|
||||
|
@ -1229,7 +1232,7 @@ void syncHbTimerDataFree(SSyncHbTimerData* pData) { taosMemoryFree(pData); }
|
|||
|
||||
void syncNodeClose(SSyncNode* pSyncNode) {
|
||||
if (pSyncNode == NULL) return;
|
||||
sNTrace(pSyncNode, "sync close, data:%p", pSyncNode);
|
||||
sNInfo(pSyncNode, "sync close, node:%p", pSyncNode);
|
||||
|
||||
int32_t ret = raftStoreClose(pSyncNode->pRaftStore);
|
||||
ASSERT(ret == 0);
|
||||
|
|
Loading…
Reference in New Issue