commit
610d730d54
|
@ -162,7 +162,7 @@ Master Vnode遵循下面的写入流程:
|
||||||
|
|
||||||
<center> 图 3 TDengine Master写入流程 </center>
|
<center> 图 3 TDengine Master写入流程 </center>
|
||||||
1. Master vnode收到应用的数据插入请求,验证OK,进入下一步;
|
1. Master vnode收到应用的数据插入请求,验证OK,进入下一步;
|
||||||
2. 如果系统配置参数walLevel打开(设置为2),vnode将把该请求的原始数据包写入数据库日志文件WAL,以保证TDengine能够在断电等因素导致的服务重启时从数据库日志文件中恢复数据,避免数据的丢失;
|
2. 如果系统配置参数walLevel大于0,vnode将把该请求的原始数据包写入数据库日志文件WAL。如果walLevel设置为2,而且fsync设置为0,TDengine还将WAL数据立即落盘,以保证即使宕机,也能从数据库日志文件中恢复数据,避免数据的丢失;
|
||||||
3. 如果有多个副本,vnode将把数据包转发给同一虚拟节点组内slave vnodes, 该转发包带有数据的版本号(version);
|
3. 如果有多个副本,vnode将把数据包转发给同一虚拟节点组内slave vnodes, 该转发包带有数据的版本号(version);
|
||||||
4. 写入内存,并加记录加入到skip list;
|
4. 写入内存,并加记录加入到skip list;
|
||||||
5. Master vnode返回确认信息给应用,表示写入成功。
|
5. Master vnode返回确认信息给应用,表示写入成功。
|
||||||
|
@ -174,7 +174,7 @@ Master Vnode遵循下面的写入流程:
|
||||||
|
|
||||||
<center> 图 4 TDengine Slave写入流程 </center>
|
<center> 图 4 TDengine Slave写入流程 </center>
|
||||||
1. Slave vnode收到Master vnode转发了的数据插入请求。
|
1. Slave vnode收到Master vnode转发了的数据插入请求。
|
||||||
2. 如果系统配置参数walLevl设置为2,vnode将把该请求的原始数据包写入日志(WAL);
|
2. 如果系统配置参数walLevel大于0,vnode将把该请求的原始数据包写入数据库日志文件WAL。如果walLevel设置为2,而且fsync设置为0,TDengine还将WAL数据立即落盘,以保证即使宕机,也能从数据库日志文件中恢复数据,避免数据的丢失;
|
||||||
3. 写入内存,更新内存中的skip list。
|
3. 写入内存,更新内存中的skip list。
|
||||||
|
|
||||||
与Master vnode相比,slave vnode不存在转发环节,也不存在回复确认环节,少了两步。但写内存与WAL是完全一样的。
|
与Master vnode相比,slave vnode不存在转发环节,也不存在回复确认环节,少了两步。但写内存与WAL是完全一样的。
|
||||||
|
|
|
@ -399,7 +399,7 @@ int tsParseSql(SSqlObj *pSql, bool initial);
|
||||||
void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet);
|
void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet);
|
||||||
int tscProcessSql(SSqlObj *pSql);
|
int tscProcessSql(SSqlObj *pSql);
|
||||||
|
|
||||||
int tscRenewTableMeta(SSqlObj *pSql, char *tableId);
|
int tscRenewTableMeta(SSqlObj *pSql, int32_t tableIndex);
|
||||||
void tscQueueAsyncRes(SSqlObj *pSql);
|
void tscQueueAsyncRes(SSqlObj *pSql);
|
||||||
|
|
||||||
void tscQueueAsyncError(void(*fp), void *param, int32_t code);
|
void tscQueueAsyncError(void(*fp), void *param, int32_t code);
|
||||||
|
@ -414,7 +414,7 @@ void tscRestoreSQLFuncForSTableQuery(SQueryInfo *pQueryInfo);
|
||||||
int32_t tscCreateResPointerInfo(SSqlRes *pRes, SQueryInfo *pQueryInfo);
|
int32_t tscCreateResPointerInfo(SSqlRes *pRes, SQueryInfo *pQueryInfo);
|
||||||
void tscDestroyResPointerInfo(SSqlRes *pRes);
|
void tscDestroyResPointerInfo(SSqlRes *pRes);
|
||||||
|
|
||||||
void tscResetSqlCmdObj(SSqlCmd *pCmd);
|
void tscResetSqlCmdObj(SSqlCmd *pCmd, bool removeFromCache);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* free query result of the sql object
|
* free query result of the sql object
|
||||||
|
|
|
@ -468,7 +468,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
|
||||||
if (pCmd->command == TSDB_SQL_INSERT || pCmd->command == TSDB_SQL_SELECT) {
|
if (pCmd->command == TSDB_SQL_INSERT || pCmd->command == TSDB_SQL_SELECT) {
|
||||||
tscDebug("%p redo parse sql string and proceed", pSql);
|
tscDebug("%p redo parse sql string and proceed", pSql);
|
||||||
pCmd->parseFinished = false;
|
pCmd->parseFinished = false;
|
||||||
tscResetSqlCmdObj(pCmd);
|
tscResetSqlCmdObj(pCmd, false);
|
||||||
|
|
||||||
code = tsParseSql(pSql, true);
|
code = tsParseSql(pSql, true);
|
||||||
|
|
||||||
|
|
|
@ -1327,18 +1327,40 @@ int tsParseSql(SSqlObj *pSql, bool initial) {
|
||||||
pSql->fetchFp = pSql->fp;
|
pSql->fetchFp = pSql->fp;
|
||||||
pSql->fp = (void(*)())tscHandleMultivnodeInsert;
|
pSql->fp = (void(*)())tscHandleMultivnodeInsert;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (initial && ((ret = tsInsertInitialCheck(pSql)) != TSDB_CODE_SUCCESS)) {
|
if (initial && ((ret = tsInsertInitialCheck(pSql)) != TSDB_CODE_SUCCESS)) {
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// make a backup as tsParseInsertSql may modify the string
|
||||||
|
char* sqlstr = strdup(pSql->sqlstr);
|
||||||
ret = tsParseInsertSql(pSql);
|
ret = tsParseInsertSql(pSql);
|
||||||
|
if (sqlstr == NULL || pSql->retry >= 1 || ret != TSDB_CODE_TSC_INVALID_SQL) {
|
||||||
|
free(sqlstr);
|
||||||
|
} else {
|
||||||
|
tscResetSqlCmdObj(pCmd, true);
|
||||||
|
free(pSql->sqlstr);
|
||||||
|
pSql->sqlstr = sqlstr;
|
||||||
|
pSql->retry++;
|
||||||
|
if ((ret = tsInsertInitialCheck(pSql)) == TSDB_CODE_SUCCESS) {
|
||||||
|
ret = tsParseInsertSql(pSql);
|
||||||
|
}
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
SSqlInfo SQLInfo = qSQLParse(pSql->sqlstr);
|
SSqlInfo SQLInfo = qSQLParse(pSql->sqlstr);
|
||||||
ret = tscToSQLCmd(pSql, &SQLInfo);
|
ret = tscToSQLCmd(pSql, &SQLInfo);
|
||||||
|
if (ret == TSDB_CODE_TSC_INVALID_SQL && pSql->retry == 0 && SQLInfo.type == TSDB_SQL_NULL) {
|
||||||
|
tscResetSqlCmdObj(pCmd, true);
|
||||||
|
pSql->retry++;
|
||||||
|
ret = tscToSQLCmd(pSql, &SQLInfo);
|
||||||
|
}
|
||||||
SQLInfoDestroy(&SQLInfo);
|
SQLInfoDestroy(&SQLInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (ret == TSDB_CODE_SUCCESS) {
|
||||||
|
pSql->retry = 0;
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* the pRes->code may be modified or released by another thread in tscTableMetaCallBack function,
|
* the pRes->code may be modified or released by another thread in tscTableMetaCallBack function,
|
||||||
* so do NOT use pRes->code to determine if the getTableMeta function
|
* so do NOT use pRes->code to determine if the getTableMeta function
|
||||||
|
|
|
@ -276,8 +276,6 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
|
|
||||||
|
|
||||||
int32_t cmd = pCmd->command;
|
int32_t cmd = pCmd->command;
|
||||||
if ((cmd == TSDB_SQL_SELECT || cmd == TSDB_SQL_FETCH || cmd == TSDB_SQL_INSERT || cmd == TSDB_SQL_UPDATE_TAGS_VAL) &&
|
if ((cmd == TSDB_SQL_SELECT || cmd == TSDB_SQL_FETCH || cmd == TSDB_SQL_INSERT || cmd == TSDB_SQL_UPDATE_TAGS_VAL) &&
|
||||||
(rpcMsg->code == TSDB_CODE_TDB_INVALID_TABLE_ID ||
|
(rpcMsg->code == TSDB_CODE_TDB_INVALID_TABLE_ID ||
|
||||||
|
@ -302,7 +300,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
|
||||||
taosMsleep(duration);
|
taosMsleep(duration);
|
||||||
}
|
}
|
||||||
|
|
||||||
rpcMsg->code = tscRenewTableMeta(pSql, pTableMetaInfo->name);
|
rpcMsg->code = tscRenewTableMeta(pSql, 0);
|
||||||
|
|
||||||
// if there is an error occurring, proceed to the following error handling procedure.
|
// if there is an error occurring, proceed to the following error handling procedure.
|
||||||
if (rpcMsg->code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
|
if (rpcMsg->code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
|
||||||
|
@ -2202,14 +2200,14 @@ int tscGetMeterMetaEx(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo, bool create
|
||||||
/**
|
/**
|
||||||
* retrieve table meta from mnode, and update the local table meta cache.
|
* retrieve table meta from mnode, and update the local table meta cache.
|
||||||
* @param pSql sql object
|
* @param pSql sql object
|
||||||
* @param tableId table full name
|
* @param tableIndex table index
|
||||||
* @return status code
|
* @return status code
|
||||||
*/
|
*/
|
||||||
int tscRenewTableMeta(SSqlObj *pSql, char *tableId) {
|
int tscRenewTableMeta(SSqlObj *pSql, int32_t tableIndex) {
|
||||||
SSqlCmd *pCmd = &pSql->cmd;
|
SSqlCmd *pCmd = &pSql->cmd;
|
||||||
|
|
||||||
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
|
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
|
||||||
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, tableIndex);
|
||||||
|
|
||||||
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
|
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
|
||||||
if (pTableMetaInfo->pTableMeta) {
|
if (pTableMetaInfo->pTableMeta) {
|
||||||
|
|
|
@ -820,7 +820,7 @@ int taos_validate_sql(TAOS *taos, const char *sql) {
|
||||||
|
|
||||||
static int tscParseTblNameList(SSqlObj *pSql, const char *tblNameList, int32_t tblListLen) {
|
static int tscParseTblNameList(SSqlObj *pSql, const char *tblNameList, int32_t tblListLen) {
|
||||||
// must before clean the sqlcmd object
|
// must before clean the sqlcmd object
|
||||||
tscResetSqlCmdObj(&pSql->cmd);
|
tscResetSqlCmdObj(&pSql->cmd, false);
|
||||||
|
|
||||||
SSqlCmd *pCmd = &pSql->cmd;
|
SSqlCmd *pCmd = &pSql->cmd;
|
||||||
|
|
||||||
|
|
|
@ -33,7 +33,7 @@
|
||||||
static void freeQueryInfoImpl(SQueryInfo* pQueryInfo);
|
static void freeQueryInfoImpl(SQueryInfo* pQueryInfo);
|
||||||
static void clearAllTableMetaInfo(SQueryInfo* pQueryInfo, const char* address, bool removeFromCache);
|
static void clearAllTableMetaInfo(SQueryInfo* pQueryInfo, const char* address, bool removeFromCache);
|
||||||
|
|
||||||
SCond* tsGetSTableQueryCond(STagCond* pTagCond, uint64_t uid) {
|
SCond* tsGetSTableQueryCond(STagCond* pTagCond, uint64_t uid) {
|
||||||
if (pTagCond->pCond == NULL) {
|
if (pTagCond->pCond == NULL) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -294,7 +294,7 @@ void tscDestroyResPointerInfo(SSqlRes* pRes) {
|
||||||
pRes->data = NULL; // pRes->data points to the buffer of pRsp, no need to free
|
pRes->data = NULL; // pRes->data points to the buffer of pRsp, no need to free
|
||||||
}
|
}
|
||||||
|
|
||||||
static void tscFreeQueryInfo(SSqlCmd* pCmd) {
|
static void tscFreeQueryInfo(SSqlCmd* pCmd, bool removeFromCache) {
|
||||||
if (pCmd == NULL || pCmd->numOfClause == 0) {
|
if (pCmd == NULL || pCmd->numOfClause == 0) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -304,7 +304,7 @@ static void tscFreeQueryInfo(SSqlCmd* pCmd) {
|
||||||
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, i);
|
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, i);
|
||||||
|
|
||||||
freeQueryInfoImpl(pQueryInfo);
|
freeQueryInfoImpl(pQueryInfo);
|
||||||
clearAllTableMetaInfo(pQueryInfo, (const char*)addr, false);
|
clearAllTableMetaInfo(pQueryInfo, (const char*)addr, removeFromCache);
|
||||||
taosTFree(pQueryInfo);
|
taosTFree(pQueryInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -312,7 +312,7 @@ static void tscFreeQueryInfo(SSqlCmd* pCmd) {
|
||||||
taosTFree(pCmd->pQueryInfo);
|
taosTFree(pCmd->pQueryInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
void tscResetSqlCmdObj(SSqlCmd* pCmd) {
|
void tscResetSqlCmdObj(SSqlCmd* pCmd, bool removeFromCache) {
|
||||||
pCmd->command = 0;
|
pCmd->command = 0;
|
||||||
pCmd->numOfCols = 0;
|
pCmd->numOfCols = 0;
|
||||||
pCmd->count = 0;
|
pCmd->count = 0;
|
||||||
|
@ -326,7 +326,7 @@ void tscResetSqlCmdObj(SSqlCmd* pCmd) {
|
||||||
|
|
||||||
pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
|
pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
|
||||||
|
|
||||||
tscFreeQueryInfo(pCmd);
|
tscFreeQueryInfo(pCmd, removeFromCache);
|
||||||
}
|
}
|
||||||
|
|
||||||
void tscFreeSqlResult(SSqlObj* pSql) {
|
void tscFreeSqlResult(SSqlObj* pSql) {
|
||||||
|
@ -364,7 +364,7 @@ void tscPartiallyFreeSqlObj(SSqlObj* pSql) {
|
||||||
taosTFree(pSql->pSubs);
|
taosTFree(pSql->pSubs);
|
||||||
pSql->numOfSubs = 0;
|
pSql->numOfSubs = 0;
|
||||||
|
|
||||||
tscResetSqlCmdObj(pCmd);
|
tscResetSqlCmdObj(pCmd, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
void tscFreeSqlObj(SSqlObj* pSql) {
|
void tscFreeSqlObj(SSqlObj* pSql) {
|
||||||
|
|
|
@ -154,9 +154,14 @@ int32_t tBucketBigIntHash(tMemBucket *pBucket, const void *value) {
|
||||||
|
|
||||||
// todo refactor to more generic
|
// todo refactor to more generic
|
||||||
int32_t tBucketIntHash(tMemBucket *pBucket, const void *value) {
|
int32_t tBucketIntHash(tMemBucket *pBucket, const void *value) {
|
||||||
int32_t v = *(int32_t *)value;
|
int32_t v = 0;
|
||||||
int32_t index = -1;
|
switch(pBucket->type) {
|
||||||
|
case TSDB_DATA_TYPE_SMALLINT: v = *(int16_t*) value; break;
|
||||||
|
case TSDB_DATA_TYPE_TINYINT: v = *(int8_t*) value; break;
|
||||||
|
default: v = *(int32_t*) value;break;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t index = -1;
|
||||||
if (pBucket->range.iMaxVal == INT32_MIN) {
|
if (pBucket->range.iMaxVal == INT32_MIN) {
|
||||||
/*
|
/*
|
||||||
* taking negative integer into consideration,
|
* taking negative integer into consideration,
|
||||||
|
|
|
@ -693,7 +693,7 @@ class DbConnRest(DbConn):
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
super().__init__()
|
super().__init__()
|
||||||
self._type = self.TYPE_REST
|
self._type = self.TYPE_REST
|
||||||
self._url = "http://localhost:6020/rest/sql" # fixed for now
|
self._url = "http://localhost:6041/rest/sql" # fixed for now
|
||||||
self._result = None
|
self._result = None
|
||||||
|
|
||||||
def openByType(self): # Open connection
|
def openByType(self): # Open connection
|
||||||
|
@ -1306,6 +1306,7 @@ class DbManager():
|
||||||
"Cannot establish DB connection, please re-run script without parameter, and follow the instructions.")
|
"Cannot establish DB connection, please re-run script without parameter, and follow the instructions.")
|
||||||
sys.exit(2)
|
sys.exit(2)
|
||||||
else:
|
else:
|
||||||
|
print("Failed to connect to DB, errno = {}, msg: {}".format(Helper.convertErrno(err.errno), err.msg))
|
||||||
raise
|
raise
|
||||||
except BaseException:
|
except BaseException:
|
||||||
print("[=] Unexpected exception")
|
print("[=] Unexpected exception")
|
||||||
|
@ -1910,10 +1911,19 @@ class TaskReadData(StateTransitionTask):
|
||||||
# 'twa(speed)', # TODO: this one REQUIRES a where statement, not reasonable
|
# 'twa(speed)', # TODO: this one REQUIRES a where statement, not reasonable
|
||||||
'sum(speed)',
|
'sum(speed)',
|
||||||
'stddev(speed)',
|
'stddev(speed)',
|
||||||
|
# SELECTOR functions
|
||||||
'min(speed)',
|
'min(speed)',
|
||||||
'max(speed)',
|
'max(speed)',
|
||||||
'first(speed)',
|
'first(speed)',
|
||||||
'last(speed)']) # TODO: add more from 'top'
|
'last(speed)',
|
||||||
|
# 'top(speed)', # TODO: not supported?
|
||||||
|
# 'bottom(speed)', # TODO: not supported?
|
||||||
|
# 'percentile(speed, 10)', # TODO: TD-1316
|
||||||
|
'last_row(speed)',
|
||||||
|
# Transformation Functions
|
||||||
|
# 'diff(speed)', # TODO: no supported?!
|
||||||
|
'spread(speed)'
|
||||||
|
]) # TODO: add more from 'top'
|
||||||
filterExpr = Dice.choice([ # TODO: add various kind of WHERE conditions
|
filterExpr = Dice.choice([ # TODO: add various kind of WHERE conditions
|
||||||
None
|
None
|
||||||
])
|
])
|
||||||
|
@ -2350,7 +2360,7 @@ class ServiceManagerThread:
|
||||||
self._thread2.start()
|
self._thread2.start()
|
||||||
|
|
||||||
# wait for service to start
|
# wait for service to start
|
||||||
for i in range(0, 10):
|
for i in range(0, 100):
|
||||||
time.sleep(1.0)
|
time.sleep(1.0)
|
||||||
# self.procIpcBatch() # don't pump message during start up
|
# self.procIpcBatch() # don't pump message during start up
|
||||||
print("_zz_", end="", flush=True)
|
print("_zz_", end="", flush=True)
|
||||||
|
@ -2358,7 +2368,7 @@ class ServiceManagerThread:
|
||||||
logger.info("[] TDengine service READY to process requests")
|
logger.info("[] TDengine service READY to process requests")
|
||||||
return # now we've started
|
return # now we've started
|
||||||
# TODO: handle this better?
|
# TODO: handle this better?
|
||||||
self.procIpcBatch(20, True) # display output before cronking out, trim to last 20 msgs, force output
|
self.procIpcBatch(100, True) # display output before cronking out, trim to last 20 msgs, force output
|
||||||
raise RuntimeError("TDengine service did not start successfully")
|
raise RuntimeError("TDengine service did not start successfully")
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
|
@ -2768,7 +2778,7 @@ class MainExec:
|
||||||
try:
|
try:
|
||||||
ret = self._clientMgr.run(self._svcMgr) # stop TAOS service inside
|
ret = self._clientMgr.run(self._svcMgr) # stop TAOS service inside
|
||||||
except requests.exceptions.ConnectionError as err:
|
except requests.exceptions.ConnectionError as err:
|
||||||
logger.warning("Failed to open REST connection to DB")
|
logger.warning("Failed to open REST connection to DB: {}".format(err.getMessage()))
|
||||||
# don't raise
|
# don't raise
|
||||||
return ret
|
return ret
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue