Merge remote-tracking branch 'origin/3.0' into fix/mnode

This commit is contained in:
Shengliang Guan 2022-07-05 19:08:30 +08:00
commit 93962a8c6d
20 changed files with 287 additions and 128 deletions

View File

@ -519,6 +519,8 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
} }
} }
syncNodeRelease(pSyncNode);
if (code != 0) { if (code != 0) {
terrno = TSDB_CODE_SYN_INTERNAL_ERROR; terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
} }

View File

@ -15,10 +15,10 @@
#include "builtins.h" #include "builtins.h"
#include "builtinsimpl.h" #include "builtinsimpl.h"
#include "cJSON.h"
#include "querynodes.h" #include "querynodes.h"
#include "scalar.h" #include "scalar.h"
#include "taoserror.h" #include "taoserror.h"
#include "cJSON.h"
static int32_t buildFuncErrMsg(char* pErrBuf, int32_t len, int32_t errCode, const char* pFormat, ...) { static int32_t buildFuncErrMsg(char* pErrBuf, int32_t len, int32_t errCode, const char* pFormat, ...) {
va_list vArgList; va_list vArgList;
@ -52,10 +52,9 @@ static int32_t validateTimeUnitParam(uint8_t dbPrec, const SValueNode* pVal) {
return TIME_UNIT_TOO_SMALL; return TIME_UNIT_TOO_SMALL;
} }
if (pVal->literal[0] != '1' || (pVal->literal[1] != 'u' && pVal->literal[1] != 'a' && if (pVal->literal[0] != '1' ||
pVal->literal[1] != 's' && pVal->literal[1] != 'm' && (pVal->literal[1] != 'u' && pVal->literal[1] != 'a' && pVal->literal[1] != 's' && pVal->literal[1] != 'm' &&
pVal->literal[1] != 'h' && pVal->literal[1] != 'd' && pVal->literal[1] != 'h' && pVal->literal[1] != 'd' && pVal->literal[1] != 'w')) {
pVal->literal[1] != 'w')) {
return TIME_UNIT_INVALID; return TIME_UNIT_INVALID;
} }
@ -701,7 +700,8 @@ static int32_t translateElapsed(SFunctionNode* pFunc, char* pErrBuf, int32_t len
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
"ELAPSED function time unit parameter should be greater than db precision"); "ELAPSED function time unit parameter should be greater than db precision");
} else if (ret == TIME_UNIT_INVALID) { } else if (ret == TIME_UNIT_INVALID) {
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, return buildFuncErrMsg(
pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
"ELAPSED function time unit parameter should be one of the following: [1u, 1a, 1s, 1m, 1h, 1d, 1w]"); "ELAPSED function time unit parameter should be one of the following: [1u, 1a, 1s, 1m, 1h, 1d, 1w]");
} }
} }
@ -1223,7 +1223,8 @@ static int32_t translateStateDuration(SFunctionNode* pFunc, char* pErrBuf, int32
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
"STATEDURATION function time unit parameter should be greater than db precision"); "STATEDURATION function time unit parameter should be greater than db precision");
} else if (ret == TIME_UNIT_INVALID) { } else if (ret == TIME_UNIT_INVALID) {
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, return buildFuncErrMsg(
pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
"STATEDURATION function time unit parameter should be one of the following: [1u, 1a, 1s, 1m, 1h, 1d, 1w]"); "STATEDURATION function time unit parameter should be one of the following: [1u, 1a, 1s, 1m, 1h, 1d, 1w]");
} }
} }
@ -1432,10 +1433,6 @@ static int32_t translateFirstLast(SFunctionNode* pFunc, char* pErrBuf, int32_t l
static int32_t translateFirstLastImpl(SFunctionNode* pFunc, char* pErrBuf, int32_t len, bool isPartial) { static int32_t translateFirstLastImpl(SFunctionNode* pFunc, char* pErrBuf, int32_t len, bool isPartial) {
// first(col_list) will be rewritten as first(col) // first(col_list) will be rewritten as first(col)
if (2 != LIST_LENGTH(pFunc->pParameterList)) { // input has two params c0,ts, is this a bug?
return TSDB_CODE_SUCCESS;
}
SNode* pPara = nodesListGetNode(pFunc->pParameterList, 0); SNode* pPara = nodesListGetNode(pFunc->pParameterList, 0);
uint8_t paraType = ((SExprNode*)pPara)->resType.type; uint8_t paraType = ((SExprNode*)pPara)->resType.type;
int32_t paraBytes = ((SExprNode*)pPara)->resType.bytes; int32_t paraBytes = ((SExprNode*)pPara)->resType.bytes;
@ -1738,7 +1735,8 @@ static int32_t translateTimeTruncate(SFunctionNode* pFunc, char* pErrBuf, int32_
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
"TIMETRUNCATE function time unit parameter should be greater than db precision"); "TIMETRUNCATE function time unit parameter should be greater than db precision");
} else if (ret == TIME_UNIT_INVALID) { } else if (ret == TIME_UNIT_INVALID) {
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, return buildFuncErrMsg(
pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
"TIMETRUNCATE function time unit parameter should be one of the following: [1u, 1a, 1s, 1m, 1h, 1d, 1w]"); "TIMETRUNCATE function time unit parameter should be one of the following: [1u, 1a, 1s, 1m, 1h, 1d, 1w]");
} }
@ -1777,7 +1775,8 @@ static int32_t translateTimeDiff(SFunctionNode* pFunc, char* pErrBuf, int32_t le
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
"TIMEDIFF function time unit parameter should be greater than db precision"); "TIMEDIFF function time unit parameter should be greater than db precision");
} else if (ret == TIME_UNIT_INVALID) { } else if (ret == TIME_UNIT_INVALID) {
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, return buildFuncErrMsg(
pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
"TIMEDIFF function time unit parameter should be one of the following: [1u, 1a, 1s, 1m, 1h, 1d, 1w]"); "TIMEDIFF function time unit parameter should be one of the following: [1u, 1a, 1s, 1m, 1h, 1d, 1w]");
} }
} }

View File

@ -385,6 +385,15 @@ SNode* createLogicConditionNode(SAstCreateContext* pCxt, ELogicConditionType typ
SNode* createOperatorNode(SAstCreateContext* pCxt, EOperatorType type, SNode* pLeft, SNode* pRight) { SNode* createOperatorNode(SAstCreateContext* pCxt, EOperatorType type, SNode* pLeft, SNode* pRight) {
CHECK_PARSER_STATUS(pCxt); CHECK_PARSER_STATUS(pCxt);
if (OP_TYPE_MINUS == type && QUERY_NODE_VALUE == nodeType(pLeft)) {
SValueNode* pVal = (SValueNode*)pLeft;
char* pNewLiteral = taosMemoryCalloc(1, strlen(pVal->literal) + 2);
CHECK_OUT_OF_MEM(pNewLiteral);
sprintf(pNewLiteral, "-%s", pVal->literal);
taosMemoryFree(pVal->literal);
pVal->literal = pNewLiteral;
return pLeft;
}
SOperatorNode* op = (SOperatorNode*)nodesMakeNode(QUERY_NODE_OPERATOR); SOperatorNode* op = (SOperatorNode*)nodesMakeNode(QUERY_NODE_OPERATOR);
CHECK_OUT_OF_MEM(op); CHECK_OUT_OF_MEM(op);
op->opType = type; op->opType = type;

View File

@ -558,11 +558,11 @@ static void setColumnInfoByExpr(STempTableNode* pTable, SExprNode* pExpr, SColum
pCol->node.resType = pExpr->resType; pCol->node.resType = pExpr->resType;
} }
static int32_t createColumnsByTable(STranslateContext* pCxt, const STableNode* pTable, SNodeList* pList) { static int32_t createColumnsByTable(STranslateContext* pCxt, const STableNode* pTable, bool igTags, SNodeList* pList) {
if (QUERY_NODE_REAL_TABLE == nodeType(pTable)) { if (QUERY_NODE_REAL_TABLE == nodeType(pTable)) {
const STableMeta* pMeta = ((SRealTableNode*)pTable)->pMeta; const STableMeta* pMeta = ((SRealTableNode*)pTable)->pMeta;
int32_t nums = int32_t nums = pMeta->tableInfo.numOfColumns +
pMeta->tableInfo.numOfColumns + ((TSDB_SUPER_TABLE == pMeta->tableType) ? pMeta->tableInfo.numOfTags : 0); (igTags ? 0 : ((TSDB_SUPER_TABLE == pMeta->tableType) ? pMeta->tableInfo.numOfTags : 0));
for (int32_t i = 0; i < nums; ++i) { for (int32_t i = 0; i < nums; ++i) {
SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN); SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
if (NULL == pCol) { if (NULL == pCol) {
@ -878,6 +878,9 @@ static EDealRes translateNormalValue(STranslateContext* pCxt, SValueNode* pVal,
} }
case TSDB_DATA_TYPE_VARCHAR: case TSDB_DATA_TYPE_VARCHAR:
case TSDB_DATA_TYPE_VARBINARY: { case TSDB_DATA_TYPE_VARBINARY: {
if (strict && (pVal->node.resType.bytes > targetDt.bytes - VARSTR_HEADER_SIZE)) {
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, pVal->literal);
}
pVal->datum.p = taosMemoryCalloc(1, targetDt.bytes + 1); pVal->datum.p = taosMemoryCalloc(1, targetDt.bytes + 1);
if (NULL == pVal->datum.p) { if (NULL == pVal->datum.p) {
return generateDealNodeErrMsg(pCxt, TSDB_CODE_OUT_OF_MEMORY); return generateDealNodeErrMsg(pCxt, TSDB_CODE_OUT_OF_MEMORY);
@ -1333,6 +1336,9 @@ static int32_t rewriteSystemInfoFuncImpl(STranslateContext* pCxt, char* pLiteral
pVal->isNull = true; pVal->isNull = true;
} else { } else {
pVal->literal = pLiteral; pVal->literal = pLiteral;
if (IS_VAR_DATA_TYPE(pVal->node.resType.type)) {
pVal->node.resType.bytes = strlen(pLiteral);
}
} }
if (DEAL_RES_ERROR != translateValue(pCxt, pVal)) { if (DEAL_RES_ERROR != translateValue(pCxt, pVal)) {
*pNode = (SNode*)pVal; *pNode = (SNode*)pVal;
@ -1928,7 +1934,7 @@ static int32_t translateTable(STranslateContext* pCxt, SNode* pTable) {
return code; return code;
} }
static int32_t createAllColumns(STranslateContext* pCxt, SNodeList** pCols) { static int32_t createAllColumns(STranslateContext* pCxt, bool igTags, SNodeList** pCols) {
*pCols = nodesMakeList(); *pCols = nodesMakeList();
if (NULL == *pCols) { if (NULL == *pCols) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_OUT_OF_MEMORY); return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_OUT_OF_MEMORY);
@ -1937,7 +1943,7 @@ static int32_t createAllColumns(STranslateContext* pCxt, SNodeList** pCols) {
size_t nums = taosArrayGetSize(pTables); size_t nums = taosArrayGetSize(pTables);
for (size_t i = 0; i < nums; ++i) { for (size_t i = 0; i < nums; ++i) {
STableNode* pTable = taosArrayGetP(pTables, i); STableNode* pTable = taosArrayGetP(pTables, i);
int32_t code = createColumnsByTable(pCxt, pTable, *pCols); int32_t code = createColumnsByTable(pCxt, pTable, igTags, *pCols);
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
return code; return code;
} }
@ -1974,7 +1980,7 @@ static SNode* createMultiResFunc(SFunctionNode* pSrcFunc, SExprNode* pExpr) {
return (SNode*)pFunc; return (SNode*)pFunc;
} }
static int32_t createTableAllCols(STranslateContext* pCxt, SColumnNode* pCol, SNodeList** pOutput) { static int32_t createTableAllCols(STranslateContext* pCxt, SColumnNode* pCol, bool igTags, SNodeList** pOutput) {
STableNode* pTable = NULL; STableNode* pTable = NULL;
int32_t code = findTable(pCxt, pCol->tableAlias, &pTable); int32_t code = findTable(pCxt, pCol->tableAlias, &pTable);
if (TSDB_CODE_SUCCESS == code && NULL == *pOutput) { if (TSDB_CODE_SUCCESS == code && NULL == *pOutput) {
@ -1984,7 +1990,7 @@ static int32_t createTableAllCols(STranslateContext* pCxt, SColumnNode* pCol, SN
} }
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = createColumnsByTable(pCxt, pTable, *pOutput); code = createColumnsByTable(pCxt, pTable, igTags, *pOutput);
} }
return code; return code;
} }
@ -2006,11 +2012,9 @@ static int32_t createMultiResFuncsParas(STranslateContext* pCxt, SNodeList* pSrc
SNode* pPara = NULL; SNode* pPara = NULL;
FOREACH(pPara, pSrcParas) { FOREACH(pPara, pSrcParas) {
if (isStar(pPara)) { if (isStar(pPara)) {
code = createAllColumns(pCxt, &pExprs); code = createAllColumns(pCxt, true, &pExprs);
// The syntax definition ensures that * and other parameters do not appear at the same time
break;
} else if (isTableStar(pPara)) { } else if (isTableStar(pPara)) {
code = createTableAllCols(pCxt, (SColumnNode*)pPara, &pExprs); code = createTableAllCols(pCxt, (SColumnNode*)pPara, true, &pExprs);
} else { } else {
code = nodesListMakeStrictAppend(&pExprs, nodesCloneNode(pPara)); code = nodesListMakeStrictAppend(&pExprs, nodesCloneNode(pPara));
} }
@ -2069,7 +2073,7 @@ static int32_t translateStar(STranslateContext* pCxt, SSelectStmt* pSelect) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
if (isStar(pNode)) { if (isStar(pNode)) {
SNodeList* pCols = NULL; SNodeList* pCols = NULL;
code = createAllColumns(pCxt, &pCols); code = createAllColumns(pCxt, false, &pCols);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
INSERT_LIST(pSelect->pProjectionList, pCols); INSERT_LIST(pSelect->pProjectionList, pCols);
ERASE_NODE(pSelect->pProjectionList); ERASE_NODE(pSelect->pProjectionList);
@ -2085,7 +2089,7 @@ static int32_t translateStar(STranslateContext* pCxt, SSelectStmt* pSelect) {
} }
} else if (isTableStar(pNode)) { } else if (isTableStar(pNode)) {
SNodeList* pCols = NULL; SNodeList* pCols = NULL;
code = createTableAllCols(pCxt, (SColumnNode*)pNode, &pCols); code = createTableAllCols(pCxt, (SColumnNode*)pNode, false, &pCols);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
INSERT_LIST(pSelect->pProjectionList, pCols); INSERT_LIST(pSelect->pProjectionList, pCols);
ERASE_NODE(pSelect->pProjectionList); ERASE_NODE(pSelect->pProjectionList);
@ -5826,10 +5830,6 @@ static int32_t buildUpdateTagValReq(STranslateContext* pCxt, SAlterTableStmt* pS
return pCxt->errCode; return pCxt->errCode;
} }
if (IS_VAR_DATA_TYPE(pSchema->type) && strlen(pStmt->pVal->literal) > pSchema->bytes) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_WRONG_VALUE_TYPE, pStmt->pVal->literal);
}
pReq->isNull = (TSDB_DATA_TYPE_NULL == pStmt->pVal->node.resType.type); pReq->isNull = (TSDB_DATA_TYPE_NULL == pStmt->pVal->node.resType.type);
if (targetDt.type == TSDB_DATA_TYPE_JSON) { if (targetDt.type == TSDB_DATA_TYPE_JSON) {
pReq->isNull = 0; pReq->isNull = 0;

View File

@ -236,7 +236,6 @@ int32_t buildSyntaxErrMsg(SMsgBuf* pBuf, const char* additionalInfo, const char*
const char* prefix = "syntax error"; const char* prefix = "syntax error";
if (sourceStr == NULL) { if (sourceStr == NULL) {
assert(additionalInfo != NULL);
snprintf(pBuf->buf, pBuf->len, msgFormat1, additionalInfo); snprintf(pBuf->buf, pBuf->len, msgFormat1, additionalInfo);
return TSDB_CODE_TSC_SQL_SYNTAX_ERROR; return TSDB_CODE_TSC_SQL_SYNTAX_ERROR;
} }
@ -254,40 +253,25 @@ int32_t buildSyntaxErrMsg(SMsgBuf* pBuf, const char* additionalInfo, const char*
return TSDB_CODE_TSC_SQL_SYNTAX_ERROR; return TSDB_CODE_TSC_SQL_SYNTAX_ERROR;
} }
SSchema* getTableColumnSchema(const STableMeta* pTableMeta) { SSchema* getTableColumnSchema(const STableMeta* pTableMeta) { return (SSchema*)pTableMeta->schema; }
assert(pTableMeta != NULL);
return (SSchema*)pTableMeta->schema;
}
static SSchema* getOneColumnSchema(const STableMeta* pTableMeta, int32_t colIndex) { static SSchema* getOneColumnSchema(const STableMeta* pTableMeta, int32_t colIndex) {
assert(pTableMeta != NULL && pTableMeta->schema != NULL && colIndex >= 0 &&
colIndex < (getNumOfColumns(pTableMeta) + getNumOfTags(pTableMeta)));
SSchema* pSchema = (SSchema*)pTableMeta->schema; SSchema* pSchema = (SSchema*)pTableMeta->schema;
return &pSchema[colIndex]; return &pSchema[colIndex];
} }
SSchema* getTableTagSchema(const STableMeta* pTableMeta) { SSchema* getTableTagSchema(const STableMeta* pTableMeta) {
assert(pTableMeta != NULL &&
(pTableMeta->tableType == TSDB_SUPER_TABLE || pTableMeta->tableType == TSDB_CHILD_TABLE));
return getOneColumnSchema(pTableMeta, getTableInfo(pTableMeta).numOfColumns); return getOneColumnSchema(pTableMeta, getTableInfo(pTableMeta).numOfColumns);
} }
int32_t getNumOfColumns(const STableMeta* pTableMeta) { int32_t getNumOfColumns(const STableMeta* pTableMeta) {
assert(pTableMeta != NULL);
// table created according to super table, use data from super table // table created according to super table, use data from super table
return getTableInfo(pTableMeta).numOfColumns; return getTableInfo(pTableMeta).numOfColumns;
} }
int32_t getNumOfTags(const STableMeta* pTableMeta) { int32_t getNumOfTags(const STableMeta* pTableMeta) { return getTableInfo(pTableMeta).numOfTags; }
assert(pTableMeta != NULL);
return getTableInfo(pTableMeta).numOfTags;
}
STableComInfo getTableInfo(const STableMeta* pTableMeta) { STableComInfo getTableInfo(const STableMeta* pTableMeta) { return pTableMeta->tableInfo; }
assert(pTableMeta != NULL);
return pTableMeta->tableInfo;
}
STableMeta* tableMetaDup(const STableMeta* pTableMeta) { STableMeta* tableMetaDup(const STableMeta* pTableMeta) {
size_t size = TABLE_META_SIZE(pTableMeta); size_t size = TABLE_META_SIZE(pTableMeta);

View File

@ -75,6 +75,11 @@ static SLogicNode* optFindPossibleNode(SLogicNode* pNode, FMayBeOptimized func)
return NULL; return NULL;
} }
static void optResetParent(SLogicNode* pNode) {
SNode* pChild = NULL;
FOREACH(pChild, pNode->pChildren) { ((SLogicNode*)pChild)->pParent = pNode; }
}
EDealRes scanPathOptHaveNormalColImpl(SNode* pNode, void* pContext) { EDealRes scanPathOptHaveNormalColImpl(SNode* pNode, void* pContext) {
if (QUERY_NODE_COLUMN == nodeType(pNode)) { if (QUERY_NODE_COLUMN == nodeType(pNode)) {
// *((bool*)pContext) = (COLUMN_TYPE_TAG != ((SColumnNode*)pNode)->colType); // *((bool*)pContext) = (COLUMN_TYPE_TAG != ((SColumnNode*)pNode)->colType);
@ -1540,6 +1545,7 @@ static int32_t rewriteTailOptCreateSort(SIndefRowsFuncLogicNode* pIndef, SLogicN
pSort->groupSort = rewriteTailOptNeedGroupSort(pIndef); pSort->groupSort = rewriteTailOptNeedGroupSort(pIndef);
TSWAP(pSort->node.pChildren, pIndef->node.pChildren); TSWAP(pSort->node.pChildren, pIndef->node.pChildren);
optResetParent((SLogicNode*)pSort);
pSort->node.precision = pIndef->node.precision; pSort->node.precision = pIndef->node.precision;
SFunctionNode* pTail = NULL; SFunctionNode* pTail = NULL;
@ -1747,6 +1753,7 @@ static int32_t rewriteUniqueOptCreateAgg(SIndefRowsFuncLogicNode* pIndef, SLogic
} }
TSWAP(pAgg->node.pChildren, pIndef->node.pChildren); TSWAP(pAgg->node.pChildren, pIndef->node.pChildren);
optResetParent((SLogicNode*)pAgg);
pAgg->node.precision = pIndef->node.precision; pAgg->node.precision = pIndef->node.precision;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;

View File

@ -1203,7 +1203,8 @@ typedef struct SQnodeSplitInfo {
static bool qndSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode, static bool qndSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode,
SQnodeSplitInfo* pInfo) { SQnodeSplitInfo* pInfo) {
if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode) && NULL != pNode->pParent) { if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode) && NULL != pNode->pParent &&
((SScanLogicNode*)pNode)->scanSeq[0] < 1 && ((SScanLogicNode*)pNode)->scanSeq[1] < 1) {
pInfo->pSplitNode = pNode; pInfo->pSplitNode = pNode;
pInfo->pSubplan = pSubplan; pInfo->pSubplan = pSubplan;
return true; return true;

View File

@ -63,6 +63,10 @@ TEST_F(PlanBasicTest, uniqueFunc) {
run("SELECT UNIQUE(c2 + 10), ts, c2 FROM t1 WHERE c1 > 10"); run("SELECT UNIQUE(c2 + 10), ts, c2 FROM t1 WHERE c1 > 10");
run("SELECT UNIQUE(c1) a FROM t1 ORDER BY a"); run("SELECT UNIQUE(c1) a FROM t1 ORDER BY a");
run("SELECT ts, UNIQUE(c1) FROM st1 PARTITION BY TBNAME");
run("SELECT TBNAME, UNIQUE(c1) FROM st1 PARTITION BY TBNAME");
} }
TEST_F(PlanBasicTest, tailFunc) { TEST_F(PlanBasicTest, tailFunc) {
@ -81,6 +85,8 @@ TEST_F(PlanBasicTest, tailFunc) {
run("SELECT TAIL(c2 + 10, 10, 80) FROM t1 WHERE c1 > 10 PARTITION BY c1 LIMIT 5"); run("SELECT TAIL(c2 + 10, 10, 80) FROM t1 WHERE c1 > 10 PARTITION BY c1 LIMIT 5");
run("SELECT TAIL(c1, 2, 1) FROM st1s1 UNION ALL SELECT c1 FROM st1s2"); run("SELECT TAIL(c1, 2, 1) FROM st1s1 UNION ALL SELECT c1 FROM st1s2");
run("SELECT TAIL(c1, 1) FROM st2 WHERE jtag->'tag1' > 10");
} }
TEST_F(PlanBasicTest, interpFunc) { TEST_F(PlanBasicTest, interpFunc) {

View File

@ -91,10 +91,3 @@ TEST_F(PlanOtherTest, delete) {
run("DELETE FROM st1 WHERE ts > now - 2d and ts < now - 1d AND tag1 = 10"); run("DELETE FROM st1 WHERE ts > now - 2d and ts < now - 1d AND tag1 = 10");
} }
TEST_F(PlanOtherTest, queryPolicy) {
useDb("root", "test");
tsQueryPolicy = QUERY_POLICY_QNODE;
run("SELECT COUNT(*) FROM st1");
}

View File

@ -78,6 +78,7 @@ static void parseArg(int argc, char* argv[]) {
{"skipSql", required_argument, NULL, 's'}, {"skipSql", required_argument, NULL, 's'},
{"limitSql", required_argument, NULL, 'i'}, {"limitSql", required_argument, NULL, 'i'},
{"log", required_argument, NULL, 'l'}, {"log", required_argument, NULL, 'l'},
{"queryPolicy", required_argument, NULL, 'q'},
{0, 0, 0, 0} {0, 0, 0, 0}
}; };
// clang-format on // clang-format on
@ -95,6 +96,9 @@ static void parseArg(int argc, char* argv[]) {
case 'l': case 'l':
setLogLevel(optarg); setLogLevel(optarg);
break; break;
case 'q':
setQueryPolicy(optarg);
break;
default: default:
break; break;
} }

View File

@ -24,6 +24,7 @@
#include "mockCatalogService.h" #include "mockCatalogService.h"
#include "parser.h" #include "parser.h"
#include "planInt.h" #include "planInt.h"
#include "tglobal.h"
using namespace std; using namespace std;
using namespace testing; using namespace testing;
@ -53,6 +54,7 @@ DumpModule g_dumpModule = DUMP_MODULE_NOTHING;
int32_t g_skipSql = 0; int32_t g_skipSql = 0;
int32_t g_limitSql = 0; int32_t g_limitSql = 0;
int32_t g_logLevel = 131; int32_t g_logLevel = 131;
int32_t g_queryPolicy = QUERY_POLICY_VNODE;
void setDumpModule(const char* pModule) { void setDumpModule(const char* pModule) {
if (NULL == pModule) { if (NULL == pModule) {
@ -79,6 +81,7 @@ void setDumpModule(const char* pModule) {
void setSkipSqlNum(const char* pNum) { g_skipSql = stoi(pNum); } void setSkipSqlNum(const char* pNum) { g_skipSql = stoi(pNum); }
void setLimitSqlNum(const char* pNum) { g_limitSql = stoi(pNum); } void setLimitSqlNum(const char* pNum) { g_limitSql = stoi(pNum); }
void setLogLevel(const char* pLogLevel) { g_logLevel = stoi(pLogLevel); } void setLogLevel(const char* pLogLevel) { g_logLevel = stoi(pLogLevel); }
void setQueryPolicy(const char* pQueryPolicy) { g_queryPolicy = stoi(pQueryPolicy); }
int32_t getLogLevel() { return g_logLevel; } int32_t getLogLevel() { return g_logLevel; }
@ -105,7 +108,23 @@ class PlannerTestBaseImpl {
} }
++sqlNum_; ++sqlNum_;
switch (g_queryPolicy) {
case QUERY_POLICY_VNODE:
case QUERY_POLICY_HYBRID:
case QUERY_POLICY_QNODE:
runImpl(sql, g_queryPolicy);
break;
default:
runImpl(sql, QUERY_POLICY_VNODE);
runImpl(sql, QUERY_POLICY_HYBRID);
runImpl(sql, QUERY_POLICY_QNODE);
break;
}
}
void runImpl(const string& sql, int32_t queryPolicy) {
reset(); reset();
tsQueryPolicy = queryPolicy;
try { try {
SQuery* pQuery = nullptr; SQuery* pQuery = nullptr;
doParseSql(sql, &pQuery); doParseSql(sql, &pQuery);

View File

@ -45,6 +45,7 @@ extern void setDumpModule(const char* pModule);
extern void setSkipSqlNum(const char* pNum); extern void setSkipSqlNum(const char* pNum);
extern void setLimitSqlNum(const char* pNum); extern void setLimitSqlNum(const char* pNum);
extern void setLogLevel(const char* pLogLevel); extern void setLogLevel(const char* pLogLevel);
extern void setQueryPolicy(const char* pQueryPolicy);
extern int32_t getLogLevel(); extern int32_t getLogLevel();
#endif // PLAN_TEST_UTIL_H #endif // PLAN_TEST_UTIL_H

View File

@ -83,7 +83,7 @@ typedef struct SSyncSnapshotReceiver {
SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId fromId); SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId fromId);
void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver); void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver);
int32_t snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, SyncSnapshotSend *pBeginMsg); int32_t snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pBeginMsg);
int32_t snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver); int32_t snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver);
bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver); bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver);

View File

@ -158,13 +158,13 @@ int32_t syncSetStandby(int64_t rid) {
} }
if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) { if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) {
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
terrno = TSDB_CODE_SYN_IS_LEADER; terrno = TSDB_CODE_SYN_IS_LEADER;
} else { } else {
terrno = TSDB_CODE_SYN_STANDBY_NOT_READY; terrno = TSDB_CODE_SYN_STANDBY_NOT_READY;
} }
sError("failed to set standby since it is not follower, state:%s rid:%" PRId64, syncStr(pSyncNode->state), rid); sError("failed to set standby since it is not follower, state:%s rid:%" PRId64, syncStr(pSyncNode->state), rid);
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
return -1; return -1;
} }
@ -620,6 +620,7 @@ int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak) {
SSyncNode* pSyncNode = taosAcquireRef(tsNodeRefId, rid); SSyncNode* pSyncNode = taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) { if (pSyncNode == NULL) {
taosReleaseRef(tsNodeRefId, rid);
terrno = TSDB_CODE_SYN_INTERNAL_ERROR; terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1; return -1;
} }

View File

@ -14,6 +14,7 @@
*/ */
#include "syncRespMgr.h" #include "syncRespMgr.h"
#include "syncRaftEntry.h"
#include "syncRaftStore.h" #include "syncRaftStore.h"
SSyncRespMgr *syncRespMgrCreate(void *data, int64_t ttl) { SSyncRespMgr *syncRespMgrCreate(void *data, int64_t ttl) {
@ -116,4 +117,59 @@ void syncRespClean(SSyncRespMgr *pObj) {
taosThreadMutexUnlock(&(pObj->mutex)); taosThreadMutexUnlock(&(pObj->mutex));
} }
void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl) {} void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl) {
SRespStub *pStub = (SRespStub *)taosHashIterate(pObj->pRespHash, NULL);
int cnt = 0;
SSyncNode *pSyncNode = pObj->data;
SArray *delIndexArray = taosArrayInit(0, sizeof(SyncIndex));
ASSERT(delIndexArray != NULL);
while (pStub) {
size_t len;
void *key = taosHashGetKey(pStub, &len);
SyncIndex *pIndex = (SyncIndex *)key;
int64_t nowMS = taosGetTimestampMs();
if (nowMS - pStub->createTime > ttl) {
taosArrayPush(delIndexArray, pIndex);
cnt++;
SSyncRaftEntry *pEntry = NULL;
int32_t code = 0;
if (pSyncNode->pLogStore != NULL) {
code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, *pIndex, &pEntry);
if (code == 0 && pEntry != NULL) {
SFsmCbMeta cbMeta = {0};
cbMeta.index = pEntry->index;
cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(pSyncNode, cbMeta.index);
cbMeta.isWeak = pEntry->isWeak;
cbMeta.code = TSDB_CODE_SYN_TIMEOUT;
cbMeta.state = pSyncNode->state;
cbMeta.seqNum = pEntry->seqNum;
cbMeta.term = pEntry->term;
cbMeta.currentTerm = pSyncNode->pRaftStore->currentTerm;
cbMeta.flag = 0;
SRpcMsg rpcMsg = pStub->rpcMsg;
rpcMsg.pCont = rpcMallocCont(pEntry->dataLen);
memcpy(rpcMsg.pCont, pEntry->data, pEntry->dataLen);
pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &rpcMsg, cbMeta);
syncEntryDestory(pEntry);
}
}
}
pStub = (SRespStub *)taosHashIterate(pObj->pRespHash, pStub);
}
int32_t arraySize = taosArrayGetSize(delIndexArray);
sDebug("vgId:%d, resp clean by ttl, cnt:%d, array-size:%d", pSyncNode->vgId, cnt, arraySize);
for (int32_t i = 0; i < arraySize; ++i) {
SyncIndex *pIndex = taosArrayGet(delIndexArray, i);
taosHashRemove(pObj->pRespHash, pIndex, sizeof(SyncIndex));
}
taosArrayDestroy(delIndexArray);
}

View File

@ -22,9 +22,11 @@
#include "wal.h" #include "wal.h"
//---------------------------------- //----------------------------------
static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, static void snapshotSenderUpdateProgress(SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg);
SyncSnapshotSend *pBeginMsg); static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pBeginMsg);
static void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceiver);
static void snapshotReceiverGotData(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg); static void snapshotReceiverGotData(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg);
static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg);
//---------------------------------- //----------------------------------
SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex) { SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex) {
@ -68,7 +70,9 @@ void snapshotSenderDestroy(SSyncSnapshotSender *pSender) {
// close reader // close reader
if (pSender->pReader != NULL) { if (pSender->pReader != NULL) {
int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader); int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader);
ASSERT(ret == 0); if (ret != 0) {
syncNodeErrorLog(pSender->pSyncNode, "stop reader error");
}
pSender->pReader = NULL; pSender->pReader = NULL;
} }
@ -79,7 +83,12 @@ void snapshotSenderDestroy(SSyncSnapshotSender *pSender) {
bool snapshotSenderIsStart(SSyncSnapshotSender *pSender) { return pSender->start; } bool snapshotSenderIsStart(SSyncSnapshotSender *pSender) { return pSender->start; }
// begin send snapshot by snapshot, pReader // begin send snapshot by param, snapshot, pReader
//
// action:
// 1. assert reader not start
// 2. update state
// 3. send first snapshot block
int32_t snapshotSenderStart(SSyncSnapshotSender *pSender, SSnapshotParam snapshotParam, SSnapshot snapshot, int32_t snapshotSenderStart(SSyncSnapshotSender *pSender, SSnapshotParam snapshotParam, SSnapshot snapshot,
void *pReader) { void *pReader) {
ASSERT(!snapshotSenderIsStart(pSender)); ASSERT(!snapshotSenderIsStart(pSender));
@ -98,7 +107,7 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender, SSnapshotParam snapsho
// update term // update term
pSender->term = pSender->pSyncNode->pRaftStore->currentTerm; pSender->term = pSender->pSyncNode->pRaftStore->currentTerm;
++(pSender->privateTerm); ++(pSender->privateTerm); // increase private term
// update state // update state
pSender->finish = false; pSender->finish = false;
@ -114,9 +123,7 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender, SSnapshotParam snapsho
code = pSender->pSyncNode->pLogStore->syncLogGetEntry(pSender->pSyncNode->pLogStore, code = pSender->pSyncNode->pLogStore->syncLogGetEntry(pSender->pSyncNode->pLogStore,
pSender->snapshot.lastConfigIndex, &pEntry); pSender->snapshot.lastConfigIndex, &pEntry);
if (code == 0) { if (code == 0 && pEntry != NULL) {
ASSERT(pEntry != NULL);
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pEntry, &rpcMsg); syncEntry2OriginalRpc(pEntry, &rpcMsg);
@ -207,6 +214,8 @@ int32_t snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) {
pSender->start = false; pSender->start = false;
pSender->finish = finish; pSender->finish = finish;
// do not update term, maybe print
// event log // event log
do { do {
char *eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender stop"); char *eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender stop");
@ -243,6 +252,7 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) {
pMsg->srcId = pSender->pSyncNode->myRaftId; pMsg->srcId = pSender->pSyncNode->myRaftId;
pMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex]; pMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex];
pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm; pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
pMsg->beginIndex = pSender->snapshotParam.start;
pMsg->lastIndex = pSender->snapshot.lastApplyIndex; pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
pMsg->lastTerm = pSender->snapshot.lastApplyTerm; pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex; pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
@ -281,11 +291,13 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
pMsg->srcId = pSender->pSyncNode->myRaftId; pMsg->srcId = pSender->pSyncNode->myRaftId;
pMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex]; pMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex];
pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm; pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
pMsg->beginIndex = pSender->snapshotParam.start;
pMsg->lastIndex = pSender->snapshot.lastApplyIndex; pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
pMsg->lastTerm = pSender->snapshot.lastApplyTerm; pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex; pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
pMsg->lastConfig = pSender->lastConfig; pMsg->lastConfig = pSender->lastConfig;
pMsg->seq = pSender->seq; pMsg->seq = pSender->seq;
pMsg->privateTerm = pSender->privateTerm;
memcpy(pMsg->data, pSender->pCurrentBlock, pSender->blockLen); memcpy(pMsg->data, pSender->pCurrentBlock, pSender->blockLen);
// send msg // send msg
@ -305,6 +317,12 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
return 0; return 0;
} }
static void snapshotSenderUpdateProgress(SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) {
ASSERT(pMsg->ack == pSender->seq);
pSender->ack = pMsg->ack;
++(pSender->seq);
}
cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) { cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) {
char u64buf[128]; char u64buf[128];
cJSON *pRoot = cJSON_CreateObject(); cJSON *pRoot = cJSON_CreateObject();
@ -371,10 +389,11 @@ char *snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event) {
syncUtilU642Addr(destId.addr, host, sizeof(host), &port); syncUtilU642Addr(destId.addr, host, sizeof(host), &port);
snprintf(s, len, snprintf(s, len,
"%s {%p laindex:%ld laterm:%lu lcindex:%ld seq:%d ack:%d finish:%d pterm:%lu replica-index:%d %s:%d}", event, "%s {%p s-param:%ld e-param:%ld laindex:%ld laterm:%lu lcindex:%ld seq:%d ack:%d finish:%d pterm:%lu "
pSender, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, "replica-index:%d %s:%d}",
pSender->snapshot.lastConfigIndex, pSender->seq, pSender->ack, pSender->finish, pSender->privateTerm, event, pSender, pSender->snapshotParam.start, pSender->snapshotParam.end, pSender->snapshot.lastApplyIndex,
pSender->replicaIndex, host, port); pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, pSender->seq, pSender->ack,
pSender->finish, pSender->privateTerm, pSender->replicaIndex, host, port);
return s; return s;
} }
@ -429,11 +448,10 @@ bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver) { return pReceive
// static do start by privateTerm, pBeginMsg // static do start by privateTerm, pBeginMsg
// receive first snapshot data // receive first snapshot data
// write first block data // write first block data
static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pBeginMsg) {
SyncSnapshotSend *pBeginMsg) {
// update state // update state
pReceiver->term = pReceiver->pSyncNode->pRaftStore->currentTerm; pReceiver->term = pReceiver->pSyncNode->pRaftStore->currentTerm;
pReceiver->privateTerm = privateTerm; pReceiver->privateTerm = pBeginMsg->privateTerm;
pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN; pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN;
pReceiver->fromId = pBeginMsg->srcId; pReceiver->fromId = pBeginMsg->srcId;
pReceiver->start = true; pReceiver->start = true;
@ -445,7 +463,7 @@ static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm p
pReceiver->snapshotParam.start = pBeginMsg->beginIndex; pReceiver->snapshotParam.start = pBeginMsg->beginIndex;
pReceiver->snapshotParam.end = pBeginMsg->lastIndex; pReceiver->snapshotParam.end = pBeginMsg->lastIndex;
// write data // start writer
ASSERT(pReceiver->pWriter == NULL); ASSERT(pReceiver->pWriter == NULL);
int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStartWrite(pReceiver->pSyncNode->pFsm, int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStartWrite(pReceiver->pSyncNode->pFsm,
&(pReceiver->snapshotParam), &(pReceiver->pWriter)); &(pReceiver->snapshotParam), &(pReceiver->pWriter));
@ -481,10 +499,10 @@ static void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceiver) {
// if receiver receive msg from seq = SYNC_SNAPSHOT_SEQ_BEGIN, start receiver // if receiver receive msg from seq = SYNC_SNAPSHOT_SEQ_BEGIN, start receiver
// if already start, force close, start again // if already start, force close, start again
int32_t snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, SyncSnapshotSend *pBeginMsg) { int32_t snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pBeginMsg) {
if (!snapshotReceiverIsStart(pReceiver)) { if (!snapshotReceiverIsStart(pReceiver)) {
// first start // first start
snapshotReceiverDoStart(pReceiver, privateTerm, pBeginMsg); snapshotReceiverDoStart(pReceiver, pBeginMsg);
} else { } else {
// already start // already start
@ -494,12 +512,14 @@ int32_t snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncTerm private
snapshotReceiverForceStop(pReceiver); snapshotReceiverForceStop(pReceiver);
// start again // start again
snapshotReceiverDoStart(pReceiver, privateTerm, pBeginMsg); snapshotReceiverDoStart(pReceiver, pBeginMsg);
} }
return 0; return 0;
} }
// just set start = false
// FpSnapshotStopWrite should not be called, assert writer == NULL
int32_t snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) { int32_t snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) {
if (pReceiver->pWriter != NULL) { if (pReceiver->pWriter != NULL) {
int32_t ret = int32_t ret =
@ -522,6 +542,7 @@ int32_t snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) {
return 0; return 0;
} }
// when recv last snapshot block, apply data into snapshot
static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) { static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) {
ASSERT(pMsg->seq == SYNC_SNAPSHOT_SEQ_END); ASSERT(pMsg->seq == SYNC_SNAPSHOT_SEQ_END);
@ -550,7 +571,7 @@ static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnap
pReceiver->pSyncNode->commitIndex = pReceiver->snapshot.lastApplyIndex; pReceiver->pSyncNode->commitIndex = pReceiver->snapshot.lastApplyIndex;
} }
// stop writer // stop writer, apply data
code = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, true); code = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, true);
if (code != 0) { if (code != 0) {
syncNodeErrorLog(pReceiver->pSyncNode, "snapshot stop writer true error"); syncNodeErrorLog(pReceiver->pSyncNode, "snapshot stop writer true error");
@ -579,15 +600,20 @@ static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnap
return 0; return 0;
} }
// apply data block
// update progress
static void snapshotReceiverGotData(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) { static void snapshotReceiverGotData(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) {
ASSERT(pMsg->seq == pReceiver->ack + 1); ASSERT(pMsg->seq == pReceiver->ack + 1);
if (pReceiver->pWriter != NULL) { if (pReceiver->pWriter != NULL) {
if (pMsg->dataLen > 0) { if (pMsg->dataLen > 0) {
// apply data block
int32_t code = pReceiver->pSyncNode->pFsm->FpSnapshotDoWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, int32_t code = pReceiver->pSyncNode->pFsm->FpSnapshotDoWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter,
pMsg->data, pMsg->dataLen); pMsg->data, pMsg->dataLen);
ASSERT(code == 0); ASSERT(code == 0);
} }
// update progress
pReceiver->ack = pMsg->seq; pReceiver->ack = pMsg->seq;
// event log // event log
@ -665,14 +691,23 @@ char *snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event)
uint16_t port; uint16_t port;
syncUtilU642Addr(fromId.addr, host, sizeof(host), &port); syncUtilU642Addr(fromId.addr, host, sizeof(host), &port);
snprintf(s, len, "%s {%p start:%d ack:%d term:%lu pterm:%lu from:%s:%d laindex:%ld laterm:%lu lcindex:%ld}", event, snprintf(s, len,
pReceiver, pReceiver->start, pReceiver->ack, pReceiver->term, pReceiver->privateTerm, host, port, "%s {%p start:%d ack:%d term:%lu pterm:%lu from:%s:%d s-param:%ld e-param:%ld laindex:%ld laterm:%lu "
pReceiver->snapshot.lastApplyIndex, pReceiver->snapshot.lastApplyTerm, pReceiver->snapshot.lastConfigIndex); "lcindex:%ld}",
event, pReceiver, pReceiver->start, pReceiver->ack, pReceiver->term, pReceiver->privateTerm, host, port,
pReceiver->snapshotParam.start, pReceiver->snapshotParam.end, pReceiver->snapshot.lastApplyIndex,
pReceiver->snapshot.lastApplyTerm, pReceiver->snapshot.lastConfigIndex);
return s; return s;
} }
// receiver do something // receiver on message
//
// condition 1, recv SYNC_SNAPSHOT_SEQ_BEGIN, start receiver, update privateTerm
// condition 2, recv SYNC_SNAPSHOT_SEQ_END, finish receiver(apply snapshot data, update commit index, maybe reconfig)
// condition 3, recv SYNC_SNAPSHOT_SEQ_FORCE_CLOSE, force close
// condition 4, got data, update ack
//
int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
// get receiver // get receiver
SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver; SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
@ -683,11 +718,13 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) { if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) {
if (pMsg->term == pSyncNode->pRaftStore->currentTerm) { if (pMsg->term == pSyncNode->pRaftStore->currentTerm) {
if (pMsg->seq == SYNC_SNAPSHOT_SEQ_BEGIN) { if (pMsg->seq == SYNC_SNAPSHOT_SEQ_BEGIN) {
// condition 1
// begin, no data // begin, no data
snapshotReceiverStart(pReceiver, pMsg->privateTerm, pMsg); snapshotReceiverStart(pReceiver, pMsg);
needRsp = true; needRsp = true;
} else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) { } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) {
// condition 2
// end, finish FSM // end, finish FSM
code = snapshotReceiverFinish(pReceiver, pMsg); code = snapshotReceiverFinish(pReceiver, pMsg);
if (code == 0) { if (code == 0) {
@ -697,7 +734,6 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
// maybe update lastconfig // maybe update lastconfig
if (pMsg->lastConfigIndex >= SYNC_INDEX_BEGIN) { if (pMsg->lastConfigIndex >= SYNC_INDEX_BEGIN) {
// int32_t oldReplicaNum = pSyncNode->replicaNum;
SSyncCfg oldSyncCfg = pSyncNode->pRaftCfg->cfg; SSyncCfg oldSyncCfg = pSyncNode->pRaftCfg->cfg;
// update new config myIndex // update new config myIndex
@ -709,11 +745,13 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
} }
} else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_FORCE_CLOSE) { } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_FORCE_CLOSE) {
// condition 3
// force close // force close
snapshotReceiverForceStop(pReceiver); snapshotReceiverForceStop(pReceiver);
needRsp = false; needRsp = false;
} else if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) { } else if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) {
// condition 4
// transfering // transfering
if (pMsg->seq == pReceiver->ack + 1) { if (pMsg->seq == pReceiver->ack + 1) {
snapshotReceiverGotData(pReceiver, pMsg); snapshotReceiverGotData(pReceiver, pMsg);
@ -752,6 +790,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
syncNodeSendMsgById(&(pRspMsg->destId), pSyncNode, &rpcMsg); syncNodeSendMsgById(&(pRspMsg->destId), pSyncNode, &rpcMsg);
syncSnapshotRspDestroy(pRspMsg); syncSnapshotRspDestroy(pRspMsg);
} }
} else { } else {
// error log // error log
do { do {
@ -759,6 +798,8 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
syncNodeErrorLog(pSyncNode, eventLog); syncNodeErrorLog(pSyncNode, eventLog);
taosMemoryFree(eventLog); taosMemoryFree(eventLog);
} while (0); } while (0);
return -1;
} }
} else { } else {
// error log // error log
@ -767,19 +808,19 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
syncNodeErrorLog(pSyncNode, eventLog); syncNodeErrorLog(pSyncNode, eventLog);
taosMemoryFree(eventLog); taosMemoryFree(eventLog);
} while (0); } while (0);
return -1;
} }
return 0; return 0;
} }
static void snapshotSenderUpdateProgress(SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) { // sender on message
ASSERT(pMsg->ack == pSender->seq); //
pSender->ack = pMsg->ack; // condition 1 sender receives SYNC_SNAPSHOT_SEQ_END, close sender
++(pSender->seq); // condition 2 sender receives ack, set seq = ack + 1, send msg from seq
} // condition 3 sender receives error msg, just print error log
//
// sender receives ack, set seq = ack + 1, send msg from seq
// if ack == SYNC_SNAPSHOT_SEQ_END, stop sender
int32_t syncNodeOnSnapshotRspCb(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) { int32_t syncNodeOnSnapshotRspCb(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) {
// if already drop replica, do not process // if already drop replica, do not process
if (!syncNodeInRaftGroup(pSyncNode, &(pMsg->srcId)) && pSyncNode->state == TAOS_SYNC_STATE_LEADER) { if (!syncNodeInRaftGroup(pSyncNode, &(pMsg->srcId)) && pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
@ -794,12 +835,14 @@ int32_t syncNodeOnSnapshotRspCb(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) {
// state, term, seq/ack // state, term, seq/ack
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
if (pMsg->term == pSyncNode->pRaftStore->currentTerm) { if (pMsg->term == pSyncNode->pRaftStore->currentTerm) {
// receiver ack is finish, close sender // condition 1
// receive ack is finish, close sender
if (pMsg->ack == SYNC_SNAPSHOT_SEQ_END) { if (pMsg->ack == SYNC_SNAPSHOT_SEQ_END) {
snapshotSenderStop(pSender, true); snapshotSenderStop(pSender, true);
return 0; return 0;
} }
// condition 2
// send next msg // send next msg
if (pMsg->ack == pSender->seq) { if (pMsg->ack == pSender->seq) {
// update sender ack // update sender ack
@ -807,6 +850,7 @@ int32_t syncNodeOnSnapshotRspCb(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) {
snapshotSend(pSender); snapshotSend(pSender);
} else if (pMsg->ack == pSender->seq - 1) { } else if (pMsg->ack == pSender->seq - 1) {
// maybe resend
snapshotReSend(pSender); snapshotReSend(pSender);
} else { } else {

View File

@ -8,7 +8,13 @@ void print(SHashObj *pNextIndex) {
printf("----------------\n"); printf("----------------\n");
uint64_t *p = (uint64_t *)taosHashIterate(pNextIndex, NULL); uint64_t *p = (uint64_t *)taosHashIterate(pNextIndex, NULL);
while (p) { while (p) {
printf("%lu \n", *p);
size_t len;
void* key = taosHashGetKey(p, &len);
SRaftId *pRaftId = (SRaftId*)key;
printf("key:<%lu, %d>, value:%lu \n", pRaftId->addr, pRaftId->vgId, *p);
p = (uint64_t *)taosHashIterate(pNextIndex, p); p = (uint64_t *)taosHashIterate(pNextIndex, p);
} }
} }

View File

@ -73,9 +73,15 @@ void syncRespMgrGetAndDelTest(uint64_t i) {
} }
} }
SSyncNode *createSyncNode() {
SSyncNode *pSyncNode = (SSyncNode*)taosMemoryMalloc(sizeof(SSyncNode));
memset(pSyncNode, 0, sizeof(SSyncNode));
return pSyncNode;
}
void test1() { void test1() {
printf("------- test1 ---------\n"); printf("------- test1 ---------\n");
pMgr = syncRespMgrCreate(NULL, 0); pMgr = syncRespMgrCreate(createSyncNode(), 0);
assert(pMgr != NULL); assert(pMgr != NULL);
syncRespMgrInsert(10); syncRespMgrInsert(10);
@ -100,7 +106,7 @@ void test1() {
void test2() { void test2() {
printf("------- test2 ---------\n"); printf("------- test2 ---------\n");
pMgr = syncRespMgrCreate(NULL, 0); pMgr = syncRespMgrCreate(createSyncNode(), 0);
assert(pMgr != NULL); assert(pMgr != NULL);
syncRespMgrInsert(10); syncRespMgrInsert(10);
@ -117,7 +123,7 @@ void test2() {
void test3() { void test3() {
printf("------- test3 ---------\n"); printf("------- test3 ---------\n");
pMgr = syncRespMgrCreate(NULL, 0); pMgr = syncRespMgrCreate(createSyncNode(), 0);
assert(pMgr != NULL); assert(pMgr != NULL);
syncRespMgrInsert(10); syncRespMgrInsert(10);
@ -132,13 +138,34 @@ void test3() {
syncRespMgrDestroy(pMgr); syncRespMgrDestroy(pMgr);
} }
void test4() {
printf("------- test4 ---------\n");
pMgr = syncRespMgrCreate(createSyncNode(), 2);
assert(pMgr != NULL);
syncRespMgrInsert(5);
syncRespMgrPrint();
taosMsleep(3000);
syncRespMgrInsert(3);
syncRespMgrPrint();
printf("====== after clean ttl \n");
syncRespClean(pMgr);
syncRespMgrPrint();
syncRespMgrDestroy(pMgr);
}
int main() { int main() {
tsAsyncLog = 0; tsAsyncLog = 0;
sDebugFlag = DEBUG_TRACE + DEBUG_SCREEN + DEBUG_FILE; sDebugFlag = DEBUG_DEBUG + DEBUG_TRACE + DEBUG_SCREEN + DEBUG_FILE;
logTest(); logTest();
test1(); test1();
test2(); test2();
test3(); test3();
test4();
return 0; return 0;
} }

View File

@ -14,7 +14,7 @@ sql_error alter table db.stb MODIFY tag ts int
sql_error alter table db.stb MODIFY tag t2 binary(3) sql_error alter table db.stb MODIFY tag t2 binary(3)
sql_error alter table db.stb MODIFY tag t2 int sql_error alter table db.stb MODIFY tag t2 int
sql_error alter table db.stb MODIFY tag t1 int sql_error alter table db.stb MODIFY tag t1 int
sql create table db.ctb using db.stb tags(101, "12345") sql create table db.ctb using db.stb tags(101, "123")
sql insert into db.ctb values(now, 1, "1234") sql insert into db.ctb values(now, 1, "1234")
sql select * from db.stb sql select * from db.stb
@ -32,7 +32,7 @@ endi
if $data[0][3] != 101 then if $data[0][3] != 101 then
return -1 return -1
endi endi
if $data[0][4] != 1234 then if $data[0][4] != 123 then
return -1 return -1
endi endi

View File

@ -14,7 +14,7 @@ sql_error alter table db.stb rename tag ts c3
sql_error alter table db.stb rename tag t2 t1 sql_error alter table db.stb rename tag t2 t1
sql_error alter table db.stb rename tag t2 t2 sql_error alter table db.stb rename tag t2 t2
sql_error alter table db.stb rename tag t1 t2 sql_error alter table db.stb rename tag t1 t2
sql create table db.ctb using db.stb tags(101, "12345") sql create table db.ctb using db.stb tags(101, "123")
sql insert into db.ctb values(now, 1, "1234") sql insert into db.ctb values(now, 1, "1234")
sql select * from db.stb sql select * from db.stb
@ -32,7 +32,7 @@ endi
if $data[0][3] != 101 then if $data[0][3] != 101 then
return -1 return -1
endi endi
if $data[0][4] != 1234 then if $data[0][4] != 123 then
return -1 return -1
endi endi
@ -56,7 +56,7 @@ endi
if $data[0][3] != 101 then if $data[0][3] != 101 then
return -1 return -1
endi endi
if $data[0][4] != 1234 then if $data[0][4] != 123 then
return -1 return -1
endi endi