diff --git a/include/libs/nodes/nodes.h b/include/libs/nodes/nodes.h index d5958e1b9c..96815ac29f 100644 --- a/include/libs/nodes/nodes.h +++ b/include/libs/nodes/nodes.h @@ -63,7 +63,7 @@ typedef enum ENodeType { QUERY_NODE_FILL, QUERY_NODE_RAW_EXPR, // Only be used in parser module. QUERY_NODE_TARGET, - QUERY_NODE_TUPLE_DESC, + QUERY_NODE_DATABLOCK_DESC, QUERY_NODE_SLOT_DESC, // Statement nodes are used in parser and planner module. diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index d8896501ad..608146e3df 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -71,17 +71,18 @@ typedef struct SSlotDescNode { SDataType dataType; bool reserve; bool output; + bool tag; } SSlotDescNode; -typedef struct STupleDescNode { +typedef struct SDataBlockDescNode { ENodeType type; - int16_t tupleId; + int16_t dataBlockId; SNodeList* pSlots; -} STupleDescNode; +} SDataBlockDescNode; typedef struct SPhysiNode { ENodeType type; - STupleDescNode outputTuple; + SDataBlockDescNode outputDataBlockDesc; SNode* pConditions; SNodeList* pChildren; struct SPhysiNode* pParent; @@ -104,6 +105,7 @@ typedef struct STableScanPhysiNode { SScanPhysiNode scan; uint8_t scanFlag; // denotes reversed scan of data or not STimeWindow scanRange; + SNode* pScanConditions; } STableScanPhysiNode; typedef STableScanPhysiNode STableSeqScanPhysiNode; diff --git a/include/libs/nodes/querynodes.h b/include/libs/nodes/querynodes.h index c54e1b3ad0..42f9310ef1 100644 --- a/include/libs/nodes/querynodes.h +++ b/include/libs/nodes/querynodes.h @@ -58,21 +58,13 @@ typedef struct SColumnNode { char tableAlias[TSDB_TABLE_NAME_LEN]; char colName[TSDB_COL_NAME_LEN]; SNode* pProjectRef; - int16_t tupleId; + int16_t dataBlockId; int16_t slotId; } SColumnNode; -// typedef struct SColumnRefNode { -// ENodeType type; -// SDataType dataType; -// int16_t tupleId; -// int16_t slotId; -// int16_t columnId; -// } SColumnRefNode; - typedef struct STargetNode { ENodeType type; - int16_t tupleId; + int16_t dataBlockId; int16_t slotId; SNode* pExpr; } STargetNode; diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 0ec741ec3e..a619e66622 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -21,7 +21,9 @@ extern "C" { #endif #include +#include #include "taosdef.h" +#include "trpc.h" typedef uint64_t SyncNodeId; typedef int32_t SyncGroupId; @@ -34,23 +36,23 @@ typedef enum { TAOS_SYNC_STATE_LEADER = 2, } ESyncState; -typedef struct { +typedef struct SSyncBuffer { void* data; size_t len; } SSyncBuffer; -typedef struct { - SyncNodeId nodeId; - uint16_t nodePort; // node sync Port - char nodeFqdn[TSDB_FQDN_LEN]; // node FQDN +typedef struct SNodeInfo { + uint16_t nodePort; // node sync Port + char nodeFqdn[TSDB_FQDN_LEN]; // node FQDN } SNodeInfo; -typedef struct { +typedef struct SSyncCfg { int32_t replicaNum; + int32_t myIndex; SNodeInfo nodeInfo[TSDB_MAX_REPLICA]; } SSyncCfg; -typedef struct { +typedef struct SNodesRole { int32_t replicaNum; SNodeInfo nodeInfo[TSDB_MAX_REPLICA]; ESyncState role[TSDB_MAX_REPLICA]; @@ -128,12 +130,12 @@ typedef struct SStateMgr { } SStateMgr; -typedef struct { - SyncGroupId vgId; - SSyncCfg syncCfg; - SSyncLogStore logStore; - SStateMgr stateManager; - SSyncFSM syncFsm; +typedef struct SSyncInfo { + SyncGroupId vgId; + SSyncCfg syncCfg; + char path[TSDB_FILENAME_LEN]; + SSyncFSM* pFsm; + int32_t (*FpSendMsg)(void* handle, const SEpSet* pEpSet, SRpcMsg* pMsg); } SSyncInfo; diff --git a/include/util/tjson.h b/include/util/tjson.h index a0c2fef05b..e42e40efa7 100644 --- a/include/util/tjson.h +++ b/include/util/tjson.h @@ -28,23 +28,42 @@ SJson* tjsonCreateObject(); void tjsonDelete(SJson* pJson); SJson* tjsonAddArrayToObject(SJson* pJson, const char* pName); - int32_t tjsonAddIntegerToObject(SJson* pJson, const char* pName, const uint64_t number); int32_t tjsonAddDoubleToObject(SJson* pJson, const char* pName, const double number); +int32_t tjsonAddBoolToObject(SJson* pJson, const char* pName, const bool boolean); int32_t tjsonAddStringToObject(SJson* pJson, const char* pName, const char* pVal); int32_t tjsonAddItemToObject(SJson* pJson, const char* pName, SJson* pItem); int32_t tjsonAddItemToArray(SJson* pJson, SJson* pItem); +SJson* tjsonGetObjectItem(const SJson* pJson, const char* pName); +int32_t tjsonGetStringValue(const SJson* pJson, const char* pName, char* pVal); +int32_t tjsonDupStringValue(const SJson* pJson, const char* pName, char** pVal); +int32_t tjsonGetBigIntValue(const SJson* pJson, const char* pName, int64_t* pVal); +int32_t tjsonGetIntValue(const SJson* pJson, const char* pName, int32_t* pVal); +int32_t tjsonGetSmallIntValue(const SJson* pJson, const char* pName, int16_t* pVal); +int32_t tjsonGetTinyIntValue(const SJson* pJson, const char* pName, int8_t* pVal); +int32_t tjsonGetUBigIntValue(const SJson* pJson, const char* pName, uint64_t* pVal); +int32_t tjsonGetUTinyIntValue(const SJson* pJson, const char* pName, uint8_t* pVal); +int32_t tjsonGetBoolValue(const SJson* pJson, const char* pName, bool* pVal); +int32_t tjsonGetDoubleValue(const SJson* pJson, const char* pName, double* pVal); + +int32_t tjsonGetArraySize(const SJson* pJson); +SJson* tjsonGetArrayItem(const SJson* pJson, int32_t index); + typedef int32_t (*FToJson)(const void* pObj, SJson* pJson); int32_t tjsonAddObject(SJson* pJson, const char* pName, FToJson func, const void* pObj); int32_t tjsonAddItem(SJson* pJson, FToJson func, const void* pObj); -typedef int32_t (*FFromJson)(const SJson* pJson, void* pObj); +typedef int32_t (*FToObject)(const SJson* pJson, void* pObj); + +int32_t tjsonToObject(const SJson* pJson, const char* pName, FToObject func, void* pObj); char* tjsonToString(const SJson* pJson); char* tjsonToUnformattedString(const SJson* pJson); +SJson* tjsonParse(const char* pStr); + #ifdef __cplusplus } #endif diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 73ce67bd28..d9eeb6eeeb 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -29,6 +29,16 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .processFunc = NULL, .finalizeFunc = NULL }, + { + .name = "sum", + .type = FUNCTION_TYPE_SUM, + .classification = FUNC_MGT_AGG_FUNC, + .checkFunc = stubCheckAndGetResultType, + .getEnvFunc = NULL, + .initFunc = NULL, + .processFunc = NULL, + .finalizeFunc = NULL + }, { .name = "concat", .type = FUNCTION_TYPE_CONCAT, diff --git a/source/libs/index/test/utilUT.cc b/source/libs/index/test/utilUT.cc index 7c5ec19212..aeff20d488 100644 --- a/source/libs/index/test/utilUT.cc +++ b/source/libs/index/test/utilUT.cc @@ -192,7 +192,6 @@ TEST_F(UtilEnv, 03union) { for (int i = 0; i < sizeof(arr2) / sizeof(arr2[0]); i++) { taosArrayPush(f, &arr2[i]); } - uint64_t arr3[] = {1, 12, 13, 16, 17}; f = (SArray *)taosArrayGetP(src, 2); for (int i = 0; i < sizeof(arr3) / sizeof(arr3[0]); i++) { diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index 864e13b773..e23c9ebe9d 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -72,6 +72,8 @@ static SNode* columnNodeCopy(const SColumnNode* pSrc, SColumnNode* pDst) { COPY_CHAR_ARRAY_FIELD(tableAlias); COPY_CHAR_ARRAY_FIELD(colName); // COPY_NODE_FIELD(pProjectRef); + COPY_SCALAR_FIELD(dataBlockId); + COPY_SCALAR_FIELD(slotId); return (SNode*)pDst; } @@ -143,7 +145,7 @@ static SNode* functionNodeCopy(const SFunctionNode* pSrc, SFunctionNode* pDst) { } static SNode* targetNodeCopy(const STargetNode* pSrc, STargetNode* pDst) { - COPY_SCALAR_FIELD(tupleId); + COPY_SCALAR_FIELD(dataBlockId); COPY_SCALAR_FIELD(slotId); COPY_NODE_FIELD(pExpr); return (SNode*)pDst; diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 92a6126750..b1fd4bb56f 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -20,6 +20,9 @@ #include "tjson.h" static int32_t nodeToJson(const void* pObj, SJson* pJson); +static int32_t jsonToNode(const SJson* pJson, void* pObj); +static int32_t jsonToNodeObject(const SJson* pJson, const char* pName, SNode** pNode); +static int32_t makeNodeByJson(const SJson* pJson, SNode** pNode); static char* nodeName(ENodeType type) { switch (type) { @@ -59,7 +62,7 @@ static char* nodeName(ENodeType type) { return "Target"; case QUERY_NODE_RAW_EXPR: return "RawExpr"; - case QUERY_NODE_TUPLE_DESC: + case QUERY_NODE_DATABLOCK_DESC: return "TupleDesc"; case QUERY_NODE_SLOT_DESC: return "SlotDesc"; @@ -83,6 +86,10 @@ static char* nodeName(ENodeType type) { return "PhysiTableScan"; case QUERY_NODE_PHYSICAL_PLAN_PROJECT: return "PhysiProject"; + case QUERY_NODE_PHYSICAL_PLAN_JOIN: + return "PhysiJoin"; + case QUERY_NODE_PHYSICAL_PLAN_AGG: + return "PhysiAgg"; default: break; } @@ -91,7 +98,7 @@ static char* nodeName(ENodeType type) { return tmp; } -static int32_t addNodeList(SJson* pJson, const char* pName, FToJson func, const SNodeList* pList) { +static int32_t nodeListToJson(SJson* pJson, const char* pName, const SNodeList* pList) { if (LIST_LENGTH(pList) > 0) { SJson* jList = tjsonAddArrayToObject(pJson, pName); if (NULL == jList) { @@ -99,7 +106,7 @@ static int32_t addNodeList(SJson* pJson, const char* pName, FToJson func, const } SNode* pNode; FOREACH(pNode, pList) { - int32_t code = tjsonAddItem(jList, func, pNode); + int32_t code = tjsonAddItem(jList, nodeToJson, pNode); if (TSDB_CODE_SUCCESS != code) { return code; } @@ -108,6 +115,31 @@ static int32_t addNodeList(SJson* pJson, const char* pName, FToJson func, const return TSDB_CODE_SUCCESS; } +static int32_t jsonToNodeList(const SJson* pJson, const char* pName, SNodeList** pList) { + const SJson* pJsonArray = tjsonGetObjectItem(pJson, pName); + int32_t size = (NULL == pJsonArray ? 0 : tjsonGetArraySize(pJsonArray)); + if (size > 0) { + *pList = nodesMakeList(); + if (NULL == *pList) { + return TSDB_CODE_OUT_OF_MEMORY; + } + } + + int32_t code = TSDB_CODE_SUCCESS; + for (int32_t i = 0; i < size; ++i) { + SJson* pJsonItem = tjsonGetArrayItem(pJsonArray, i); + SNode* pNode = NULL; + code = makeNodeByJson(pJsonItem, &pNode); + if (TSDB_CODE_SUCCESS == code) { + code = nodesListAppend(*pList, pNode); + } + if (TSDB_CODE_SUCCESS != code) { + break; + } + } + return code; +} + static const char* jkTableMetaUid = "TableMetaUid"; static const char* jkTableMetaSuid = "TableMetaSuid"; @@ -132,13 +164,13 @@ static int32_t logicPlanNodeToJson(const void* pObj, SJson* pJson) { int32_t code = tjsonAddIntegerToObject(pJson, jkLogicPlanId, pNode->id); if (TSDB_CODE_SUCCESS == code) { - code = addNodeList(pJson, jkLogicPlanTargets, nodeToJson, pNode->pTargets); + code = nodeListToJson(pJson, jkLogicPlanTargets, pNode->pTargets); } if (TSDB_CODE_SUCCESS == code) { code = tjsonAddObject(pJson, jkLogicPlanConditions, nodeToJson, pNode->pConditions); } if (TSDB_CODE_SUCCESS == code) { - code = addNodeList(pJson, jkLogicPlanChildren, nodeToJson, pNode->pChildren); + code = nodeListToJson(pJson, jkLogicPlanChildren, pNode->pChildren); } return code; @@ -152,7 +184,7 @@ static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) { int32_t code = logicPlanNodeToJson(pObj, pJson); if (TSDB_CODE_SUCCESS == code) { - code = addNodeList(pJson, jkScanLogicPlanScanCols, nodeToJson, pNode->pScanCols); + code = nodeListToJson(pJson, jkScanLogicPlanScanCols, pNode->pScanCols); } if (TSDB_CODE_SUCCESS == code) { code = tjsonAddObject(pJson, jkScanLogicPlanTableMeta, tableMetaToJson, pNode->pMeta); @@ -168,7 +200,7 @@ static int32_t logicProjectNodeToJson(const void* pObj, SJson* pJson) { int32_t code = logicPlanNodeToJson(pObj, pJson); if (TSDB_CODE_SUCCESS == code) { - code = addNodeList(pJson, jkProjectLogicPlanProjections, nodeToJson, pNode->pProjections); + code = nodeListToJson(pJson, jkProjectLogicPlanProjections, pNode->pProjections); } return code; @@ -191,19 +223,33 @@ static int32_t logicJoinNodeToJson(const void* pObj, SJson* pJson) { return code; } -static const char* jkPhysiPlanOutputTuple = "OutputTuple"; +static const char* jkPhysiPlanOutputDataBlockDesc = "OutputDataBlockDesc"; static const char* jkPhysiPlanConditions = "Conditions"; static const char* jkPhysiPlanChildren = "Children"; static int32_t physicPlanNodeToJson(const void* pObj, SJson* pJson) { const SPhysiNode* pNode = (const SPhysiNode*)pObj; - int32_t code = tjsonAddObject(pJson, jkPhysiPlanOutputTuple, nodeToJson, &pNode->outputTuple); + int32_t code = tjsonAddObject(pJson, jkPhysiPlanOutputDataBlockDesc, nodeToJson, &pNode->outputDataBlockDesc); if (TSDB_CODE_SUCCESS == code) { code = tjsonAddObject(pJson, jkPhysiPlanConditions, nodeToJson, pNode->pConditions); } if (TSDB_CODE_SUCCESS == code) { - code = addNodeList(pJson, jkPhysiPlanChildren, nodeToJson, pNode->pChildren); + code = nodeListToJson(pJson, jkPhysiPlanChildren, pNode->pChildren); + } + + return code; +} + +static int32_t jsonToPhysicPlanNode(const SJson* pJson, void* pObj) { + SPhysiNode* pNode = (SPhysiNode*)pObj; + + int32_t code = tjsonToObject(pJson, jkPhysiPlanOutputDataBlockDesc, jsonToNode, &pNode->outputDataBlockDesc); + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeObject(pJson, jkPhysiPlanConditions, &pNode->pConditions); + } + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeList(pJson, jkPhysiPlanChildren, &pNode->pChildren); } return code; @@ -221,7 +267,7 @@ static int32_t physiScanNodeToJson(const void* pObj, SJson* pJson) { int32_t code = physicPlanNodeToJson(pObj, pJson); if (TSDB_CODE_SUCCESS == code) { - code = addNodeList(pJson, jkScanPhysiPlanScanCols, nodeToJson, pNode->pScanCols); + code = nodeListToJson(pJson, jkScanPhysiPlanScanCols, pNode->pScanCols); } if (TSDB_CODE_SUCCESS == code) { code = tjsonAddIntegerToObject(pJson, jkScanPhysiPlanTableId, pNode->uid); @@ -242,10 +288,40 @@ static int32_t physiScanNodeToJson(const void* pObj, SJson* pJson) { return code; } +static int32_t jsonToPhysiScanNode(const SJson* pJson, void* pObj) { + STagScanPhysiNode* pNode = (STagScanPhysiNode*)pObj; + + int32_t code = jsonToPhysicPlanNode(pJson, pObj); + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeList(pJson, jkScanPhysiPlanScanCols, &pNode->pScanCols); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetUBigIntValue(pJson, jkScanPhysiPlanTableId, &pNode->uid); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetTinyIntValue(pJson, jkScanPhysiPlanTableType, &pNode->tableType); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetIntValue(pJson, jkScanPhysiPlanScanOrder, &pNode->order); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetIntValue(pJson, jkScanPhysiPlanScanCount, &pNode->count); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetIntValue(pJson, jkScanPhysiPlanReverseScanCount, &pNode->reverse); + } + + return code; +} + static int32_t physiTagScanNodeToJson(const void* pObj, SJson* pJson) { return physiScanNodeToJson(pObj, pJson); } +static int32_t jsonToPhysiTagScanNode(const SJson* pJson, void* pObj) { + return jsonToPhysiScanNode(pJson, pObj); +} + static const char* jkTableScanPhysiPlanScanFlag = "ScanFlag"; static const char* jkTableScanPhysiPlanStartKey = "StartKey"; static const char* jkTableScanPhysiPlanEndKey = "EndKey"; @@ -267,6 +343,23 @@ static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) { return code; } +static int32_t jsonToPhysiTableScanNode(const SJson* pJson, void* pObj) { + STableScanPhysiNode* pNode = (STableScanPhysiNode*)pObj; + + int32_t code = jsonToPhysiScanNode(pJson, pObj); + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetUTinyIntValue(pJson, jkTableScanPhysiPlanScanFlag, &pNode->scanFlag); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetBigIntValue(pJson, jkTableScanPhysiPlanStartKey, &pNode->scanRange.skey); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetBigIntValue(pJson, jkTableScanPhysiPlanEndKey, &pNode->scanRange.ekey); + } + + return code; +} + static const char* jkProjectPhysiPlanProjections = "Projections"; static int32_t physiProjectNodeToJson(const void* pObj, SJson* pJson) { @@ -274,7 +367,96 @@ static int32_t physiProjectNodeToJson(const void* pObj, SJson* pJson) { int32_t code = physicPlanNodeToJson(pObj, pJson); if (TSDB_CODE_SUCCESS == code) { - code = addNodeList(pJson, jkProjectPhysiPlanProjections, nodeToJson, pNode->pProjections); + code = nodeListToJson(pJson, jkProjectPhysiPlanProjections, pNode->pProjections); + } + + return code; +} + +static int32_t jsonToPhysiProjectNode(const SJson* pJson, void* pObj) { + SProjectPhysiNode* pNode = (SProjectPhysiNode*)pObj; + + int32_t code = jsonToPhysicPlanNode(pJson, pObj); + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeList(pJson, jkProjectPhysiPlanProjections, &pNode->pProjections); + } + + return code; +} + +static const char* jkJoinPhysiPlanJoinType = "JoinType"; +static const char* jkJoinPhysiPlanOnConditions = "OnConditions"; +static const char* jkJoinPhysiPlanTargets = "Targets"; + +static int32_t physiJoinNodeToJson(const void* pObj, SJson* pJson) { + const SJoinPhysiNode* pNode = (const SJoinPhysiNode*)pObj; + + int32_t code = physicPlanNodeToJson(pObj, pJson); + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkJoinPhysiPlanJoinType, pNode->joinType); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddObject(pJson, jkJoinPhysiPlanOnConditions, nodeToJson, pNode->pOnConditions); + } + if (TSDB_CODE_SUCCESS == code) { + code = nodeListToJson(pJson, jkJoinPhysiPlanTargets, pNode->pTargets); + } + + return code; +} + +static int32_t jsonToPhysiJoinNode(const SJson* pJson, void* pObj) { + SJoinPhysiNode* pNode = (SJoinPhysiNode*)pObj; + + int32_t code = jsonToPhysicPlanNode(pJson, pObj); + if (TSDB_CODE_SUCCESS == code) { + int32_t val; + code = tjsonGetIntValue(pJson, jkJoinPhysiPlanJoinType, &val); + pNode->joinType = val; + } + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeObject(pJson, jkJoinPhysiPlanOnConditions, &pNode->pOnConditions); + } + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeList(pJson, jkJoinPhysiPlanTargets, &pNode->pTargets); + } + + return code; +} + +static const char* jkAggPhysiPlanExprs = "Exprs"; +static const char* jkAggPhysiPlanGroupKeys = "GroupKeys"; +static const char* jkAggPhysiPlanAggFuncs = "AggFuncs"; + +static int32_t physiAggNodeToJson(const void* pObj, SJson* pJson) { + const SAggPhysiNode* pNode = (const SAggPhysiNode*)pObj; + + int32_t code = physicPlanNodeToJson(pObj, pJson); + if (TSDB_CODE_SUCCESS == code) { + code = nodeListToJson(pJson, jkAggPhysiPlanExprs, pNode->pExprs); + } + if (TSDB_CODE_SUCCESS == code) { + code = nodeListToJson(pJson, jkAggPhysiPlanGroupKeys, pNode->pGroupKeys); + } + if (TSDB_CODE_SUCCESS == code) { + code = nodeListToJson(pJson, jkAggPhysiPlanAggFuncs, pNode->pAggFuncs); + } + + return code; +} + +static int32_t jsonToPhysiAggNode(const SJson* pJson, void* pObj) { + SAggPhysiNode* pNode = (SAggPhysiNode*)pObj; + + int32_t code = jsonToPhysicPlanNode(pJson, pObj); + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeList(pJson, jkAggPhysiPlanExprs, &pNode->pExprs); + } + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeList(pJson, jkAggPhysiPlanGroupKeys, &pNode->pGroupKeys); + } + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeList(pJson, jkAggPhysiPlanAggFuncs, &pNode->pAggFuncs); } return code; @@ -288,10 +470,10 @@ static int32_t logicAggNodeToJson(const void* pObj, SJson* pJson) { int32_t code = logicPlanNodeToJson(pObj, pJson); if (TSDB_CODE_SUCCESS == code) { - code = addNodeList(pJson, jkAggLogicPlanGroupKeys, nodeToJson, pNode->pGroupKeys); + code = nodeListToJson(pJson, jkAggLogicPlanGroupKeys, pNode->pGroupKeys); } if (TSDB_CODE_SUCCESS == code) { - code = addNodeList(pJson, jkAggLogicPlanAggFuncs, nodeToJson, pNode->pAggFuncs); + code = nodeListToJson(pJson, jkAggLogicPlanAggFuncs, pNode->pAggFuncs); } return code; @@ -319,6 +501,23 @@ static int32_t dataTypeToJson(const void* pObj, SJson* pJson) { return code; } +static int32_t jsonToDataType(const SJson* pJson, void* pObj) { + SDataType* pNode = (SDataType*)pObj; + + int32_t code = tjsonGetUTinyIntValue(pJson, jkDataTypeType, &pNode->type); + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetUTinyIntValue(pJson, jkDataTypePrecision, &pNode->precision); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetUTinyIntValue(pJson, jkDataTypeScale, &pNode->scale); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetIntValue(pJson, jkDataTypeDataBytes, &pNode->bytes); + } + + return TSDB_CODE_SUCCESS; +} + static const char* jkExprDataType = "DataType"; static const char* jkExprAliasName = "AliasName"; @@ -333,6 +532,17 @@ static int32_t exprNodeToJson(const void* pObj, SJson* pJson) { return code; } +static int32_t jsonToExprNode(const SJson* pJson, void* pObj) { + SExprNode* pNode = (SExprNode*)pObj; + + int32_t code = tjsonToObject(pJson, jkExprDataType, jsonToDataType, &pNode->resType); + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetStringValue(pJson, jkExprAliasName, pNode->aliasName); + } + + return code; +} + static const char* jkColumnTableId = "TableId"; static const char* jkColumnColId = "ColId"; static const char* jkColumnColType = "ColType"; @@ -340,6 +550,8 @@ static const char* jkColumnDbName = "DbName"; static const char* jkColumnTableName = "TableName"; static const char* jkColumnTableAlias = "TableAlias"; static const char* jkColumnColName = "ColName"; +static const char* jkColumnDataBlockId = "DataBlockId"; +static const char* jkColumnSlotId = "SlotId"; static int32_t columnNodeToJson(const void* pObj, SJson* pJson) { const SColumnNode* pNode = (const SColumnNode*)pObj; @@ -366,6 +578,49 @@ static int32_t columnNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddStringToObject(pJson, jkColumnColName, pNode->colName); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkColumnDataBlockId, pNode->dataBlockId); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkColumnSlotId, pNode->slotId); + } + + return code; +} + +static int32_t jsonToColumnNode(const SJson* pJson, void* pObj) { + SColumnNode* pNode = (SColumnNode*)pObj; + + int32_t code = jsonToExprNode(pJson, pObj); + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetUBigIntValue(pJson, jkColumnTableId, &pNode->tableId); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetSmallIntValue(pJson, jkColumnColId, &pNode->colId); + } + if (TSDB_CODE_SUCCESS == code) { + int32_t tmp; + code = tjsonGetIntValue(pJson, jkColumnColType, &tmp); + pNode->colType = tmp; + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetStringValue(pJson, jkColumnDbName, pNode->dbName); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetStringValue(pJson, jkColumnTableName, pNode->tableName); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetStringValue(pJson, jkColumnTableAlias, pNode->tableAlias); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetStringValue(pJson, jkColumnColName, pNode->colName); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetSmallIntValue(pJson, jkColumnDataBlockId, &pNode->dataBlockId); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetSmallIntValue(pJson, jkColumnSlotId, &pNode->slotId); + } return code; } @@ -382,7 +637,7 @@ static int32_t valueNodeToJson(const void* pObj, SJson* pJson) { code = tjsonAddStringToObject(pJson, jkValueLiteral, pNode->literal); } if (TSDB_CODE_SUCCESS == code) { - code = tjsonAddIntegerToObject(pJson, jkValueDuration, pNode->isDuration); + code = tjsonAddBoolToObject(pJson, jkValueDuration, pNode->isDuration); } switch (pNode->node.resType.type) { case TSDB_DATA_TYPE_NULL: @@ -424,6 +679,56 @@ static int32_t valueNodeToJson(const void* pObj, SJson* pJson) { return code; } +static int32_t jsonToValueNode(const SJson* pJson, void* pObj) { + SValueNode* pNode = (SValueNode*)pObj; + + int32_t code = jsonToExprNode(pJson, pObj); + if (TSDB_CODE_SUCCESS == code) { + code = tjsonDupStringValue(pJson, jkValueLiteral, &pNode->literal); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetBoolValue(pJson, jkValueDuration, &pNode->isDuration); + } + switch (pNode->node.resType.type) { + case TSDB_DATA_TYPE_NULL: + break; + case TSDB_DATA_TYPE_BOOL: + code = tjsonGetBoolValue(pJson, jkValueDuration, &pNode->datum.b); + break; + case TSDB_DATA_TYPE_TINYINT: + case TSDB_DATA_TYPE_SMALLINT: + case TSDB_DATA_TYPE_INT: + case TSDB_DATA_TYPE_BIGINT: + case TSDB_DATA_TYPE_TIMESTAMP: + code = tjsonGetBigIntValue(pJson, jkValueDuration, &pNode->datum.i); + break; + case TSDB_DATA_TYPE_UTINYINT: + case TSDB_DATA_TYPE_USMALLINT: + case TSDB_DATA_TYPE_UINT: + case TSDB_DATA_TYPE_UBIGINT: + code = tjsonGetUBigIntValue(pJson, jkValueDuration, &pNode->datum.u); + break; + case TSDB_DATA_TYPE_FLOAT: + case TSDB_DATA_TYPE_DOUBLE: + code = tjsonGetDoubleValue(pJson, jkValueDuration, &pNode->datum.d); + break; + case TSDB_DATA_TYPE_BINARY: + case TSDB_DATA_TYPE_NCHAR: + case TSDB_DATA_TYPE_VARCHAR: + case TSDB_DATA_TYPE_VARBINARY: + code = tjsonDupStringValue(pJson, jkValueLiteral, &pNode->datum.p); + break; + case TSDB_DATA_TYPE_JSON: + case TSDB_DATA_TYPE_DECIMAL: + case TSDB_DATA_TYPE_BLOB: + // todo + default: + break; + } + + return code; +} + static const char* jkOperatorType = "OpType"; static const char* jkOperatorLeft = "Left"; static const char* jkOperatorRight = "Right"; @@ -445,6 +750,25 @@ static int32_t operatorNodeToJson(const void* pObj, SJson* pJson) { return code; } +static int32_t jsonToOperatorNode(const SJson* pJson, void* pObj) { + SOperatorNode* pNode = (SOperatorNode*)pObj; + + int32_t code = jsonToExprNode(pJson, pObj); + if (TSDB_CODE_SUCCESS == code) { + int32_t val; + code = tjsonGetIntValue(pJson, jkOperatorType, &val); + pNode->opType = val; + } + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeObject(pJson, jkOperatorLeft, &pNode->pLeft); + } + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeObject(pJson, jkOperatorRight, &pNode->pRight); + } + + return code; +} + static const char* jkLogicCondType = "CondType"; static const char* jkLogicCondParameters = "Parameters"; @@ -456,7 +780,23 @@ static int32_t logicConditionNodeToJson(const void* pObj, SJson* pJson) { code = tjsonAddIntegerToObject(pJson, jkLogicCondType, pNode->condType); } if (TSDB_CODE_SUCCESS == code) { - code = addNodeList(pJson, jkLogicCondParameters, nodeToJson, pNode->pParameterList); + code = nodeListToJson(pJson, jkLogicCondParameters, pNode->pParameterList); + } + + return code; +} + +static int32_t jsonToLogicConditionNode(const SJson* pJson, void* pObj) { + SLogicConditionNode* pNode = (SLogicConditionNode*)pObj; + + int32_t code = jsonToExprNode(pJson, pObj); + if (TSDB_CODE_SUCCESS == code) { + int32_t val; + code = tjsonGetIntValue(pJson, jkLogicCondType, &val); + pNode->condType = val; + } + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeList(pJson, jkLogicCondParameters, &pNode->pParameterList); } return code; @@ -481,7 +821,27 @@ static int32_t functionNodeToJson(const void* pObj, SJson* pJson) { code = tjsonAddIntegerToObject(pJson, jkFunctionType, pNode->funcType); } if (TSDB_CODE_SUCCESS == code) { - code = addNodeList(pJson, jkFunctionParameter, nodeToJson, pNode->pParameterList); + code = nodeListToJson(pJson, jkFunctionParameter, pNode->pParameterList); + } + + return code; +} + +static int32_t jsonToFunctionNode(const SJson* pJson, void* pObj) { + SFunctionNode* pNode = (SFunctionNode*)pObj; + + int32_t code = jsonToExprNode(pJson, pObj); + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetStringValue(pJson, jkFunctionName, pNode->functionName); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetIntValue(pJson, jkFunctionId, &pNode->funcId); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetIntValue(pJson, jkFunctionType, &pNode->funcType); + } + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeList(pJson, jkFunctionParameter, &pNode->pParameterList); } return code; @@ -495,20 +855,20 @@ static int32_t groupingSetNodeToJson(const void* pObj, SJson* pJson) { int32_t code = tjsonAddIntegerToObject(pJson, jkGroupingSetType, pNode->groupingSetType); if (TSDB_CODE_SUCCESS == code) { - code = addNodeList(pJson, jkGroupingSetParameter, nodeToJson, pNode->pParameterList); + code = nodeListToJson(pJson, jkGroupingSetParameter, pNode->pParameterList); } return code; } -static const char* jkTargetTupleId = "TupleId"; +static const char* jkTargetDataBlockId = "DataBlockId"; static const char* jkTargetSlotId = "SlotId"; static const char* jkTargetExpr = "Expr"; static int32_t targetNodeToJson(const void* pObj, SJson* pJson) { const STargetNode* pNode = (const STargetNode*)pObj; - int32_t code = tjsonAddIntegerToObject(pJson, jkTargetTupleId, pNode->tupleId); + int32_t code = tjsonAddIntegerToObject(pJson, jkTargetDataBlockId, pNode->dataBlockId); if (TSDB_CODE_SUCCESS == code) { code = tjsonAddIntegerToObject(pJson, jkTargetSlotId, pNode->slotId); } @@ -519,8 +879,24 @@ static int32_t targetNodeToJson(const void* pObj, SJson* pJson) { return code; } +static int32_t jsonToTargetNode(const SJson* pJson, void* pObj) { + STargetNode* pNode = (STargetNode*)pObj; + + int32_t code = tjsonGetSmallIntValue(pJson, jkTargetDataBlockId, &pNode->dataBlockId); + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetSmallIntValue(pJson, jkTargetSlotId, &pNode->slotId); + } + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeObject(pJson, jkTargetExpr, &pNode->pExpr); + } + + return code; +} + static const char* jkSlotDescSlotId = "SlotId"; static const char* jkSlotDescDataType = "DataType"; +static const char* jkSlotDescReserve = "Reserve"; +static const char* jkSlotDescOutput = "Output"; static int32_t slotDescNodeToJson(const void* pObj, SJson* pJson) { const SSlotDescNode* pNode = (const SSlotDescNode*)pObj; @@ -529,19 +905,53 @@ static int32_t slotDescNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddObject(pJson, jkSlotDescDataType, dataTypeToJson, &pNode->dataType); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddBoolToObject(pJson, jkSlotDescReserve, pNode->reserve); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddBoolToObject(pJson, jkSlotDescOutput, pNode->output); + } return code; } -static const char* jkTupleDescTupleId = "TupleId"; -static const char* jkTupleDescSlots = "Slots"; +static int32_t jsonToSlotDescNode(const SJson* pJson, void* pObj) { + SSlotDescNode* pNode = (SSlotDescNode*)pObj; -static int32_t tupleDescNodeToJson(const void* pObj, SJson* pJson) { - const STupleDescNode* pNode = (const STupleDescNode*)pObj; - - int32_t code = tjsonAddIntegerToObject(pJson, jkTupleDescTupleId, pNode->tupleId); + int32_t code = tjsonGetSmallIntValue(pJson, jkSlotDescSlotId, &pNode->slotId); if (TSDB_CODE_SUCCESS == code) { - code = addNodeList(pJson, jkTupleDescSlots, nodeToJson, pNode->pSlots); + code = tjsonToObject(pJson, jkSlotDescDataType, jsonToDataType, &pNode->dataType); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetBoolValue(pJson, jkSlotDescReserve, &pNode->reserve); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetBoolValue(pJson, jkSlotDescOutput, &pNode->output); + } + + return code; +} + +static const char* jkDataBlockDescDataBlockId = "DataBlockId"; +static const char* jkDataBlockDescSlots = "Slots"; + +static int32_t dataBlockDescNodeToJson(const void* pObj, SJson* pJson) { + const SDataBlockDescNode* pNode = (const SDataBlockDescNode*)pObj; + + int32_t code = tjsonAddIntegerToObject(pJson, jkDataBlockDescDataBlockId, pNode->dataBlockId); + if (TSDB_CODE_SUCCESS == code) { + code = nodeListToJson(pJson, jkDataBlockDescSlots, pNode->pSlots); + } + + return code; +} + +static int32_t jsonToDataBlockDescNode(const SJson* pJson, void* pObj) { + SDataBlockDescNode* pNode = (SDataBlockDescNode*)pObj; + + int32_t code = tjsonGetSmallIntValue(pJson, jkDataBlockDescDataBlockId, &pNode->dataBlockId); + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeList(pJson, jkDataBlockDescSlots, &pNode->pSlots); } return code; @@ -564,7 +974,7 @@ static int32_t selectStmtTojson(const void* pObj, SJson* pJson) { int32_t code = tjsonAddIntegerToObject(pJson, jkSelectStmtDistinct, pNode->isDistinct); if (TSDB_CODE_SUCCESS == code) { - code = addNodeList(pJson, jkSelectStmtProjections, nodeToJson, pNode->pProjectionList); + code = nodeListToJson(pJson, jkSelectStmtProjections, pNode->pProjectionList); } if (TSDB_CODE_SUCCESS == code) { code = tjsonAddObject(pJson, jkSelectStmtFrom, nodeToJson, pNode->pFromTable); @@ -573,19 +983,19 @@ static int32_t selectStmtTojson(const void* pObj, SJson* pJson) { code = tjsonAddObject(pJson, jkSelectStmtWhere, nodeToJson, pNode->pWhere); } if (TSDB_CODE_SUCCESS == code) { - code = addNodeList(pJson, jkSelectStmtPartitionBy, nodeToJson, pNode->pPartitionByList); + code = nodeListToJson(pJson, jkSelectStmtPartitionBy, pNode->pPartitionByList); } if (TSDB_CODE_SUCCESS == code) { code = tjsonAddObject(pJson, jkSelectStmtWindow, nodeToJson, pNode->pWindow); } if (TSDB_CODE_SUCCESS == code) { - code = addNodeList(pJson, jkSelectStmtGroupBy, nodeToJson, pNode->pGroupByList); + code = nodeListToJson(pJson, jkSelectStmtGroupBy, pNode->pGroupByList); } if (TSDB_CODE_SUCCESS == code) { code = tjsonAddObject(pJson, jkSelectStmtHaving, nodeToJson, pNode->pHaving); } if (TSDB_CODE_SUCCESS == code) { - code = addNodeList(pJson, jkSelectStmtOrderBy, nodeToJson, pNode->pOrderByList); + code = nodeListToJson(pJson, jkSelectStmtOrderBy, pNode->pOrderByList); } if (TSDB_CODE_SUCCESS == code) { code = tjsonAddObject(pJson, jkSelectStmtLimit, nodeToJson, pNode->pLimit); @@ -626,8 +1036,8 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) { return targetNodeToJson(pObj, pJson); case QUERY_NODE_RAW_EXPR: break; - case QUERY_NODE_TUPLE_DESC: - return tupleDescNodeToJson(pObj, pJson); + case QUERY_NODE_DATABLOCK_DESC: + return dataBlockDescNodeToJson(pObj, pJson); case QUERY_NODE_SLOT_DESC: return slotDescNodeToJson(pObj, pJson); case QUERY_NODE_SET_OPERATOR: @@ -650,6 +1060,73 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) { return physiTableScanNodeToJson(pObj, pJson); case QUERY_NODE_PHYSICAL_PLAN_PROJECT: return physiProjectNodeToJson(pObj, pJson); + case QUERY_NODE_PHYSICAL_PLAN_JOIN: + return physiJoinNodeToJson(pObj, pJson); + case QUERY_NODE_PHYSICAL_PLAN_AGG: + return physiAggNodeToJson(pObj, pJson); + default: + break; + } + return TSDB_CODE_SUCCESS; +} + +static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) { + switch (nodeType(pObj)) { + case QUERY_NODE_COLUMN: + return jsonToColumnNode(pJson, pObj); + case QUERY_NODE_VALUE: + return jsonToValueNode(pJson, pObj); + case QUERY_NODE_OPERATOR: + return jsonToOperatorNode(pJson, pObj); + case QUERY_NODE_LOGIC_CONDITION: + return jsonToLogicConditionNode(pJson, pObj); + case QUERY_NODE_FUNCTION: + return jsonToFunctionNode(pJson, pObj); + // case QUERY_NODE_REAL_TABLE: + // case QUERY_NODE_TEMP_TABLE: + // case QUERY_NODE_JOIN_TABLE: + // break; + // case QUERY_NODE_GROUPING_SET: + // return jsonToGroupingSetNode(pJson, pObj); + // case QUERY_NODE_ORDER_BY_EXPR: + // case QUERY_NODE_LIMIT: + // case QUERY_NODE_STATE_WINDOW: + // case QUERY_NODE_SESSION_WINDOW: + // case QUERY_NODE_INTERVAL_WINDOW: + // case QUERY_NODE_NODE_LIST: + // case QUERY_NODE_FILL: + case QUERY_NODE_TARGET: + return jsonToTargetNode(pJson, pObj); + // case QUERY_NODE_RAW_EXPR: + // break; + case QUERY_NODE_DATABLOCK_DESC: + return jsonToDataBlockDescNode(pJson, pObj); + case QUERY_NODE_SLOT_DESC: + return jsonToSlotDescNode(pJson, pObj); + // case QUERY_NODE_SET_OPERATOR: + // break; + // case QUERY_NODE_SELECT_STMT: + // return jsonToSelectStmt(pJson, pObj); + // case QUERY_NODE_SHOW_STMT: + // break; + // case QUERY_NODE_LOGIC_PLAN_SCAN: + // return jsonToLogicScanNode(pJson, pObj); + // case QUERY_NODE_LOGIC_PLAN_JOIN: + // return jsonToLogicJoinNode(pJson, pObj); + // case QUERY_NODE_LOGIC_PLAN_AGG: + // return jsonToLogicAggNode(pJson, pObj); + // case QUERY_NODE_LOGIC_PLAN_PROJECT: + // return jsonToLogicProjectNode(pJson, pObj); + case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN: + return jsonToPhysiTagScanNode(pJson, pObj); + case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN: + return jsonToPhysiTableScanNode(pJson, pObj); + case QUERY_NODE_PHYSICAL_PLAN_PROJECT: + return jsonToPhysiProjectNode(pJson, pObj); + case QUERY_NODE_PHYSICAL_PLAN_JOIN: + return jsonToPhysiJoinNode(pJson, pObj); + case QUERY_NODE_PHYSICAL_PLAN_AGG: + return jsonToPhysiAggNode(pJson, pObj); default: break; } @@ -657,18 +1134,57 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) { } static const char* jkNodeType = "Type"; +static const char* jkNodeName = "Name"; + static int32_t nodeToJson(const void* pObj, SJson* pJson) { const SNode* pNode = (const SNode*)pObj; - char* pNodeName = nodeName(nodeType(pNode)); - int32_t code = tjsonAddStringToObject(pJson, jkNodeType, pNodeName); + int32_t code = tjsonAddIntegerToObject(pJson, jkNodeType, pNode->type); if (TSDB_CODE_SUCCESS == code) { - code = tjsonAddObject(pJson, pNodeName, specificNodeToJson, pNode); + code = tjsonAddStringToObject(pJson, jkNodeName, nodeName(pNode->type)); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddObject(pJson, nodeName(pNode->type), specificNodeToJson, pNode); } return code; } +static int32_t jsonToNode(const SJson* pJson, void* pObj) { + SNode* pNode = (SNode*)pObj; + + int32_t val = 0; + int32_t code = tjsonGetIntValue(pJson, jkNodeType, &val); + pNode->type = val; + if (TSDB_CODE_SUCCESS == code) { + code = tjsonToObject(pJson, nodeName(pNode->type), jsonToSpecificNode, pNode); + } + + return code; +} + +static int32_t makeNodeByJson(const SJson* pJson, SNode** pNode) { + int32_t val = 0; + int32_t code = tjsonGetIntValue(pJson, jkNodeType, &val); + if (TSDB_CODE_SUCCESS == code) { + *pNode = nodesMakeNode(val); + if (NULL == *pNode) { + return TSDB_CODE_FAILED; + } + code = jsonToNode(pJson, *pNode); + } + + return code; +} + +static int32_t jsonToNodeObject(const SJson* pJson, const char* pName, SNode** pNode) { + SJson* pJsonNode = tjsonGetObjectItem(pJson, pName); + if (NULL == pJsonNode) { + return TSDB_CODE_FAILED; + } + return makeNodeByJson(pJsonNode, pNode); +} + int32_t nodesNodeToString(const SNode* pNode, bool format, char** pStr, int32_t* pLen) { if (NULL == pNode || NULL == pStr || NULL == pLen) { return TSDB_CODE_SUCCESS; @@ -694,5 +1210,18 @@ int32_t nodesNodeToString(const SNode* pNode, bool format, char** pStr, int32_t* } int32_t nodesStringToNode(const char* pStr, SNode** pNode) { + if (NULL == pStr || NULL == pNode) { + return TSDB_CODE_SUCCESS; + } + SJson* pJson = tjsonParse(pStr); + if (NULL == pJson) { + return TSDB_CODE_FAILED; + } + int32_t code = makeNodeByJson(pJson, pNode); + if (TSDB_CODE_SUCCESS != code) { + nodesDestroyNode(*pNode); + terrno = code; + return code; + } return TSDB_CODE_SUCCESS; } diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index 8810f24ef0..5d51b2e523 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -81,8 +81,8 @@ SNode* nodesMakeNode(ENodeType type) { return makeNode(type, sizeof(SProjectLogicNode)); case QUERY_NODE_TARGET: return makeNode(type, sizeof(STargetNode)); - case QUERY_NODE_TUPLE_DESC: - return makeNode(type, sizeof(STupleDescNode)); + case QUERY_NODE_DATABLOCK_DESC: + return makeNode(type, sizeof(SDataBlockDescNode)); case QUERY_NODE_SLOT_DESC: return makeNode(type, sizeof(SSlotDescNode)); case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN: diff --git a/source/libs/parser/test/mockCatalog.cpp b/source/libs/parser/test/mockCatalog.cpp index 8ed67d7f2f..457d586ea4 100644 --- a/source/libs/parser/test/mockCatalog.cpp +++ b/source/libs/parser/test/mockCatalog.cpp @@ -29,9 +29,10 @@ namespace { void generateTestT1(MockCatalogService* mcs) { - ITableBuilder& builder = mcs->createTableBuilder("test", "t1", TSDB_NORMAL_TABLE, 4) + ITableBuilder& builder = mcs->createTableBuilder("test", "t1", TSDB_NORMAL_TABLE, 6) .setPrecision(TSDB_TIME_PRECISION_MILLI).setVgid(1).addColumn("ts", TSDB_DATA_TYPE_TIMESTAMP) - .addColumn("c1", TSDB_DATA_TYPE_INT).addColumn("c2", TSDB_DATA_TYPE_BINARY, 20).addColumn("c3", TSDB_DATA_TYPE_BIGINT); + .addColumn("c1", TSDB_DATA_TYPE_INT).addColumn("c2", TSDB_DATA_TYPE_BINARY, 20).addColumn("c3", TSDB_DATA_TYPE_BIGINT) + .addColumn("c4", TSDB_DATA_TYPE_DOUBLE).addColumn("c5", TSDB_DATA_TYPE_DOUBLE); builder.done(); } diff --git a/source/libs/planner/src/plannerImpl.c b/source/libs/planner/src/plannerImpl.c index d5b5eb1500..b7e28d70db 100644 --- a/source/libs/planner/src/plannerImpl.c +++ b/source/libs/planner/src/plannerImpl.c @@ -159,6 +159,10 @@ static SLogicNode* createScanLogicNode(SPlanContext* pCxt, SSelectStmt* pSelect, CHECK_ALLOC(pScan->node.pTargets, (SLogicNode*)pScan); } + pScan->scanType = SCAN_TYPE_TABLE; + pScan->scanFlag = MAIN_SCAN; + pScan->scanRange = TSWINDOW_INITIALIZER; + return (SLogicNode*)pScan; } @@ -397,14 +401,14 @@ int32_t splitLogicPlan(SSubLogicPlan* pLogicPlan) { } typedef struct SSlotIndex { - int16_t tupleId; + int16_t dataBlockId; int16_t slotId; } SSlotIndex; typedef struct SPhysiPlanContext { int32_t errCode; - int16_t nextTupleId; - SArray* pTupleHelper; + int16_t nextDataBlockId; + SArray* pLocationHelper; } SPhysiPlanContext; static int32_t getSlotKey(SNode* pNode, char* pKey) { @@ -428,31 +432,31 @@ static SNode* createSlotDesc(SPhysiPlanContext* pCxt, const SNode* pNode, int16_ return (SNode*)pSlot; } -static SNode* createTarget(SNode* pNode, int16_t tupleId, int16_t slotId) { +static SNode* createTarget(SNode* pNode, int16_t dataBlockId, int16_t slotId) { STargetNode* pTarget = (STargetNode*)nodesMakeNode(QUERY_NODE_TARGET); if (NULL == pTarget) { return NULL; } - pTarget->tupleId = tupleId; + pTarget->dataBlockId = dataBlockId; pTarget->slotId = slotId; pTarget->pExpr = pNode; return (SNode*)pTarget; } -static int32_t addTupleDesc(SPhysiPlanContext* pCxt, SNodeList* pList, STupleDescNode* pTuple) { +static int32_t addDataBlockDesc(SPhysiPlanContext* pCxt, SNodeList* pList, SDataBlockDescNode* pDataBlockDesc) { SHashObj* pHash = NULL; - if (NULL == pTuple->pSlots) { - pTuple->pSlots = nodesMakeList(); - CHECK_ALLOC(pTuple->pSlots, TSDB_CODE_OUT_OF_MEMORY); + if (NULL == pDataBlockDesc->pSlots) { + pDataBlockDesc->pSlots = nodesMakeList(); + CHECK_ALLOC(pDataBlockDesc->pSlots, TSDB_CODE_OUT_OF_MEMORY); pHash = taosHashInit(LIST_LENGTH(pList), taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); CHECK_ALLOC(pHash, TSDB_CODE_OUT_OF_MEMORY); - if (NULL == taosArrayInsert(pCxt->pTupleHelper, pTuple->tupleId, &pHash)) { + if (NULL == taosArrayInsert(pCxt->pLocationHelper, pDataBlockDesc->dataBlockId, &pHash)) { taosHashCleanup(pHash); return TSDB_CODE_OUT_OF_MEMORY; } } else { - pHash = taosArrayGetP(pCxt->pTupleHelper, pTuple->tupleId); + pHash = taosArrayGetP(pCxt->pLocationHelper, pDataBlockDesc->dataBlockId); } SNode* pNode = NULL; @@ -460,17 +464,17 @@ static int32_t addTupleDesc(SPhysiPlanContext* pCxt, SNodeList* pList, STupleDes FOREACH(pNode, pList) { SNode* pSlot = createSlotDesc(pCxt, pNode, slotId); CHECK_ALLOC(pSlot, TSDB_CODE_OUT_OF_MEMORY); - if (TSDB_CODE_SUCCESS != nodesListAppend(pTuple->pSlots, (SNode*)pSlot)) { + if (TSDB_CODE_SUCCESS != nodesListAppend(pDataBlockDesc->pSlots, (SNode*)pSlot)) { nodesDestroyNode(pSlot); return TSDB_CODE_OUT_OF_MEMORY; } - SSlotIndex index = { .tupleId = pTuple->tupleId, .slotId = slotId }; + SSlotIndex index = { .dataBlockId = pDataBlockDesc->dataBlockId, .slotId = slotId }; char name[TSDB_TABLE_NAME_LEN + TSDB_COL_NAME_LEN]; int32_t len = getSlotKey(pNode, name); CHECK_CODE(taosHashPut(pHash, name, len, &index, sizeof(SSlotIndex)), TSDB_CODE_OUT_OF_MEMORY); - SNode* pTarget = createTarget(pNode, pTuple->tupleId, slotId); + SNode* pTarget = createTarget(pNode, pDataBlockDesc->dataBlockId, slotId); CHECK_ALLOC(pTarget, TSDB_CODE_OUT_OF_MEMORY); REPLACE_NODE(pTarget); @@ -495,7 +499,7 @@ static EDealRes doSetSlotId(SNode* pNode, void* pContext) { pIndex = taosHashGet(pCxt->pRightHash, name, len); } // pIndex is definitely not NULL, otherwise it is a bug - ((SColumnNode*)pNode)->tupleId = pIndex->tupleId; + ((SColumnNode*)pNode)->dataBlockId = pIndex->dataBlockId; ((SColumnNode*)pNode)->slotId = pIndex->slotId; CHECK_ALLOC(pNode, DEAL_RES_ERROR); return DEAL_RES_IGNORE_CHILD; @@ -503,11 +507,11 @@ static EDealRes doSetSlotId(SNode* pNode, void* pContext) { return DEAL_RES_CONTINUE; } -static SNode* setNodeSlotId(SPhysiPlanContext* pCxt, int16_t leftTupleId, int16_t rightTupleId, SNode* pNode) { +static SNode* setNodeSlotId(SPhysiPlanContext* pCxt, int16_t leftDataBlockId, int16_t rightDataBlockId, SNode* pNode) { SNode* pRes = nodesCloneNode(pNode); CHECK_ALLOC(pRes, NULL); - SSetSlotIdCxt cxt = { .errCode = TSDB_CODE_SUCCESS, .pLeftHash = taosArrayGetP(pCxt->pTupleHelper, leftTupleId), - .pRightHash = (rightTupleId < 0 ? NULL : taosArrayGetP(pCxt->pTupleHelper, rightTupleId)) }; + SSetSlotIdCxt cxt = { .errCode = TSDB_CODE_SUCCESS, .pLeftHash = taosArrayGetP(pCxt->pLocationHelper, leftDataBlockId), + .pRightHash = (rightDataBlockId < 0 ? NULL : taosArrayGetP(pCxt->pLocationHelper, rightDataBlockId)) }; nodesWalkNode(pRes, doSetSlotId, &cxt); if (TSDB_CODE_SUCCESS != cxt.errCode) { nodesDestroyNode(pRes); @@ -516,11 +520,11 @@ static SNode* setNodeSlotId(SPhysiPlanContext* pCxt, int16_t leftTupleId, int16_ return pRes; } -static SNodeList* setListSlotId(SPhysiPlanContext* pCxt, int16_t leftTupleId, int16_t rightTupleId, SNodeList* pList) { +static SNodeList* setListSlotId(SPhysiPlanContext* pCxt, int16_t leftDataBlockId, int16_t rightDataBlockId, SNodeList* pList) { SNodeList* pRes = nodesCloneList(pList); CHECK_ALLOC(pRes, NULL); - SSetSlotIdCxt cxt = { .errCode = TSDB_CODE_SUCCESS, .pLeftHash = taosArrayGetP(pCxt->pTupleHelper, leftTupleId), - .pRightHash = (rightTupleId < 0 ? NULL : taosArrayGetP(pCxt->pTupleHelper, rightTupleId)) }; + SSetSlotIdCxt cxt = { .errCode = TSDB_CODE_SUCCESS, .pLeftHash = taosArrayGetP(pCxt->pLocationHelper, leftDataBlockId), + .pRightHash = (rightDataBlockId < 0 ? NULL : taosArrayGetP(pCxt->pLocationHelper, rightDataBlockId)) }; nodesWalkList(pRes, doSetSlotId, &cxt); if (TSDB_CODE_SUCCESS != cxt.errCode) { nodesDestroyList(pRes); @@ -534,27 +538,27 @@ static SPhysiNode* makePhysiNode(SPhysiPlanContext* pCxt, ENodeType type) { if (NULL == pPhysiNode) { return NULL; } - pPhysiNode->outputTuple.tupleId = pCxt->nextTupleId++; - pPhysiNode->outputTuple.type = QUERY_NODE_TUPLE_DESC; + pPhysiNode->outputDataBlockDesc.dataBlockId = pCxt->nextDataBlockId++; + pPhysiNode->outputDataBlockDesc.type = QUERY_NODE_DATABLOCK_DESC; return pPhysiNode; } static int32_t setConditionsSlotId(SPhysiPlanContext* pCxt, const SLogicNode* pLogicNode, SPhysiNode* pPhysiNode) { if (NULL != pLogicNode->pConditions) { - pPhysiNode->pConditions = setNodeSlotId(pCxt, pPhysiNode->outputTuple.tupleId, -1, pLogicNode->pConditions); + pPhysiNode->pConditions = setNodeSlotId(pCxt, pPhysiNode->outputDataBlockDesc.dataBlockId, -1, pLogicNode->pConditions); CHECK_ALLOC(pPhysiNode->pConditions, TSDB_CODE_OUT_OF_MEMORY); } return TSDB_CODE_SUCCESS; } -static int32_t setSlotOutput(SPhysiPlanContext* pCxt, SNodeList* pTargets, STupleDescNode* pTuple) { - SHashObj* pHash = taosArrayGetP(pCxt->pTupleHelper, pTuple->tupleId); +static int32_t setSlotOutput(SPhysiPlanContext* pCxt, SNodeList* pTargets, SDataBlockDescNode* pDataBlockDesc) { + SHashObj* pHash = taosArrayGetP(pCxt->pLocationHelper, pDataBlockDesc->dataBlockId); char name[TSDB_TABLE_NAME_LEN + TSDB_COL_NAME_LEN]; SNode* pNode; FOREACH(pNode, pTargets) { int32_t len = getSlotKey(pNode, name); SSlotIndex* pIndex = taosHashGet(pHash, name, len); - ((SSlotDescNode*)nodesListGetNode(pTuple->pSlots, pIndex->slotId))->output = true; + ((SSlotDescNode*)nodesListGetNode(pDataBlockDesc->pSlots, pIndex->slotId))->output = true; } return TSDB_CODE_SUCCESS; @@ -565,12 +569,12 @@ static int32_t initScanPhysiNode(SPhysiPlanContext* pCxt, SScanLogicNode* pScanL pScanPhysiNode->pScanCols = nodesCloneList(pScanLogicNode->pScanCols); CHECK_ALLOC(pScanPhysiNode->pScanCols, TSDB_CODE_OUT_OF_MEMORY); } - // Tuple describe also needs to be set without scanning column, such as SELECT COUNT(*) FROM t - CHECK_CODE(addTupleDesc(pCxt, pScanPhysiNode->pScanCols, &pScanPhysiNode->node.outputTuple), TSDB_CODE_OUT_OF_MEMORY); + // Data block describe also needs to be set without scanning column, such as SELECT COUNT(*) FROM t + CHECK_CODE(addDataBlockDesc(pCxt, pScanPhysiNode->pScanCols, &pScanPhysiNode->node.outputDataBlockDesc), TSDB_CODE_OUT_OF_MEMORY); CHECK_CODE(setConditionsSlotId(pCxt, (const SLogicNode*)pScanLogicNode, (SPhysiNode*)pScanPhysiNode), TSDB_CODE_OUT_OF_MEMORY); - CHECK_CODE(setSlotOutput(pCxt, pScanLogicNode->node.pTargets, &pScanPhysiNode->node.outputTuple), TSDB_CODE_OUT_OF_MEMORY); + CHECK_CODE(setSlotOutput(pCxt, pScanLogicNode->node.pTargets, &pScanPhysiNode->node.outputDataBlockDesc), TSDB_CODE_OUT_OF_MEMORY); pScanPhysiNode->uid = pScanLogicNode->pMeta->uid; pScanPhysiNode->tableType = pScanLogicNode->pMeta->tableType; @@ -612,32 +616,32 @@ static SPhysiNode* createScanPhysiNode(SPhysiPlanContext* pCxt, SScanLogicNode* return NULL; } -static SNodeList* createJoinOutputCols(SPhysiPlanContext* pCxt, STupleDescNode* pLeftTuple, STupleDescNode* pRightTuple) { +static SNodeList* createJoinOutputCols(SPhysiPlanContext* pCxt, SDataBlockDescNode* pLeftDesc, SDataBlockDescNode* pRightDesc) { SNodeList* pCols = nodesMakeList(); CHECK_ALLOC(pCols, NULL); SNode* pNode; - FOREACH(pNode, pLeftTuple->pSlots) { + FOREACH(pNode, pLeftDesc->pSlots) { SSlotDescNode* pSlot = (SSlotDescNode*)pNode; SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN); if (NULL == pCol) { goto error; } pCol->node.resType = pSlot->dataType; - pCol->tupleId = pLeftTuple->tupleId; + pCol->dataBlockId = pLeftDesc->dataBlockId; pCol->slotId = pSlot->slotId; pCol->colId = -1; if (TSDB_CODE_SUCCESS != nodesListAppend(pCols, (SNode*)pCol)) { goto error; } } - FOREACH(pNode, pRightTuple->pSlots) { + FOREACH(pNode, pRightDesc->pSlots) { SSlotDescNode* pSlot = (SSlotDescNode*)pNode; SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN); if (NULL == pCol) { goto error; } pCol->node.resType = pSlot->dataType; - pCol->tupleId = pRightTuple->tupleId; + pCol->dataBlockId = pRightDesc->dataBlockId; pCol->slotId = pSlot->slotId; pCol->colId = -1; if (TSDB_CODE_SUCCESS != nodesListAppend(pCols, (SNode*)pCol)) { @@ -654,18 +658,18 @@ static SPhysiNode* createJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChil SJoinPhysiNode* pJoin = (SJoinPhysiNode*)makePhysiNode(pCxt, QUERY_NODE_PHYSICAL_PLAN_JOIN); CHECK_ALLOC(pJoin, NULL); - STupleDescNode* pLeftTuple = &((SPhysiNode*)nodesListGetNode(pChildren, 0))->outputTuple; - STupleDescNode* pRightTuple = &((SPhysiNode*)nodesListGetNode(pChildren, 1))->outputTuple; - pJoin->pOnConditions = setNodeSlotId(pCxt, pLeftTuple->tupleId, pRightTuple->tupleId, pJoinLogicNode->pOnConditions); + SDataBlockDescNode* pLeftDesc = &((SPhysiNode*)nodesListGetNode(pChildren, 0))->outputDataBlockDesc; + SDataBlockDescNode* pRightDesc = &((SPhysiNode*)nodesListGetNode(pChildren, 1))->outputDataBlockDesc; + pJoin->pOnConditions = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pOnConditions); CHECK_ALLOC(pJoin->pOnConditions, (SPhysiNode*)pJoin); - pJoin->pTargets = createJoinOutputCols(pCxt, pLeftTuple, pRightTuple); + pJoin->pTargets = createJoinOutputCols(pCxt, pLeftDesc, pRightDesc); CHECK_ALLOC(pJoin->pTargets, (SPhysiNode*)pJoin); - CHECK_CODE(addTupleDesc(pCxt, pJoin->pTargets, &pJoin->node.outputTuple), (SPhysiNode*)pJoin); + CHECK_CODE(addDataBlockDesc(pCxt, pJoin->pTargets, &pJoin->node.outputDataBlockDesc), (SPhysiNode*)pJoin); CHECK_CODE(setConditionsSlotId(pCxt, (const SLogicNode*)pJoinLogicNode, (SPhysiNode*)pJoin), (SPhysiNode*)pJoin); - CHECK_CODE(setSlotOutput(pCxt, pJoinLogicNode->node.pTargets, &pJoin->node.outputTuple), (SPhysiNode*)pJoin); + CHECK_CODE(setSlotOutput(pCxt, pJoinLogicNode->node.pTargets, &pJoin->node.outputDataBlockDesc), (SPhysiNode*)pJoin); return (SPhysiNode*)pJoin; } @@ -689,9 +693,14 @@ static EDealRes collectAndRewrite(SRewritePrecalcExprsCxt* pCxt, SNode** pNode) nodesDestroyNode(pExpr); return DEAL_RES_ERROR; } - SExprNode* pToBeRewrittenExpr = (SExprNode*)(*pNode); - pCol->node.resType = pToBeRewrittenExpr->resType; - strcpy(pCol->colName, pToBeRewrittenExpr->aliasName); + SExprNode* pRewrittenExpr = (SExprNode*)pExpr; + pCol->node.resType = pRewrittenExpr->resType; + if ('\0' != pRewrittenExpr->aliasName[0]) { + strcpy(pCol->colName, pRewrittenExpr->aliasName); + } else { + snprintf(pRewrittenExpr->aliasName, sizeof(pRewrittenExpr->aliasName), "#expr_%d_%d", pCxt->planNodeId, pCxt->rewriteId); + strcpy(pCol->colName, pRewrittenExpr->aliasName); + } nodesDestroyNode(*pNode); *pNode = (SNode*)pCol; return DEAL_RES_IGNORE_CHILD; @@ -758,29 +767,29 @@ static SPhysiNode* createAggPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChild CHECK_CODE(rewritePrecalcExprs(pCxt, pAggLogicNode->pGroupKeys, &pPrecalcExprs, &pGroupKeys), (SPhysiNode*)pAgg); CHECK_CODE(rewritePrecalcExprs(pCxt, pAggLogicNode->pAggFuncs, &pPrecalcExprs, &pAggFuncs), (SPhysiNode*)pAgg); - STupleDescNode* pChildTupe = &(((SPhysiNode*)nodesListGetNode(pChildren, 0))->outputTuple); - // push down expression to outputTuple of child node + SDataBlockDescNode* pChildTupe = &(((SPhysiNode*)nodesListGetNode(pChildren, 0))->outputDataBlockDesc); + // push down expression to outputDataBlockDesc of child node if (NULL != pPrecalcExprs) { - pAgg->pExprs = setListSlotId(pCxt, pChildTupe->tupleId, -1, pPrecalcExprs); + pAgg->pExprs = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs); CHECK_ALLOC(pAgg->pExprs, (SPhysiNode*)pAgg); - CHECK_CODE(addTupleDesc(pCxt, pAgg->pExprs, pChildTupe), (SPhysiNode*)pAgg); + CHECK_CODE(addDataBlockDesc(pCxt, pAgg->pExprs, pChildTupe), (SPhysiNode*)pAgg); } if (NULL != pGroupKeys) { - pAgg->pGroupKeys = setListSlotId(pCxt, pChildTupe->tupleId, -1, pGroupKeys); + pAgg->pGroupKeys = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pGroupKeys); CHECK_ALLOC(pAgg->pGroupKeys, (SPhysiNode*)pAgg); - CHECK_CODE(addTupleDesc(pCxt, pAgg->pGroupKeys, &pAgg->node.outputTuple), (SPhysiNode*)pAgg); + CHECK_CODE(addDataBlockDesc(pCxt, pAgg->pGroupKeys, &pAgg->node.outputDataBlockDesc), (SPhysiNode*)pAgg); } if (NULL != pAggFuncs) { - pAgg->pAggFuncs = setListSlotId(pCxt, pChildTupe->tupleId, -1, pAggFuncs); + pAgg->pAggFuncs = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pAggFuncs); CHECK_ALLOC(pAgg->pAggFuncs, (SPhysiNode*)pAgg); - CHECK_CODE(addTupleDesc(pCxt, pAgg->pAggFuncs, &pAgg->node.outputTuple), (SPhysiNode*)pAgg); + CHECK_CODE(addDataBlockDesc(pCxt, pAgg->pAggFuncs, &pAgg->node.outputDataBlockDesc), (SPhysiNode*)pAgg); } CHECK_CODE(setConditionsSlotId(pCxt, (const SLogicNode*)pAggLogicNode, (SPhysiNode*)pAgg), (SPhysiNode*)pAgg); - CHECK_CODE(setSlotOutput(pCxt, pAggLogicNode->node.pTargets, &pAgg->node.outputTuple), (SPhysiNode*)pAgg); + CHECK_CODE(setSlotOutput(pCxt, pAggLogicNode->node.pTargets, &pAgg->node.outputDataBlockDesc), (SPhysiNode*)pAgg); return (SPhysiNode*)pAgg; } @@ -789,9 +798,9 @@ static SPhysiNode* createProjectPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pC SProjectPhysiNode* pProject = (SProjectPhysiNode*)makePhysiNode(pCxt, QUERY_NODE_PHYSICAL_PLAN_PROJECT); CHECK_ALLOC(pProject, NULL); - pProject->pProjections = setListSlotId(pCxt, ((SPhysiNode*)nodesListGetNode(pChildren, 0))->outputTuple.tupleId, -1, pProjectLogicNode->pProjections); + pProject->pProjections = setListSlotId(pCxt, ((SPhysiNode*)nodesListGetNode(pChildren, 0))->outputDataBlockDesc.dataBlockId, -1, pProjectLogicNode->pProjections); CHECK_ALLOC(pProject->pProjections, (SPhysiNode*)pProject); - CHECK_CODE(addTupleDesc(pCxt, pProject->pProjections, &pProject->node.outputTuple), (SPhysiNode*)pProject); + CHECK_CODE(addDataBlockDesc(pCxt, pProject->pProjections, &pProject->node.outputDataBlockDesc), (SPhysiNode*)pProject); CHECK_CODE(setConditionsSlotId(pCxt, (const SLogicNode*)pProjectLogicNode, (SPhysiNode*)pProject), (SPhysiNode*)pProject); @@ -840,8 +849,8 @@ static SPhysiNode* createPhysiNode(SPhysiPlanContext* pCxt, SLogicNode* pLogicPl } int32_t createPhysiPlan(SLogicNode* pLogicNode, SPhysiNode** pPhyNode) { - SPhysiPlanContext cxt = { .errCode = TSDB_CODE_SUCCESS, .nextTupleId = 0, .pTupleHelper = taosArrayInit(32, POINTER_BYTES) }; - if (NULL == cxt.pTupleHelper) { + SPhysiPlanContext cxt = { .errCode = TSDB_CODE_SUCCESS, .nextDataBlockId = 0, .pLocationHelper = taosArrayInit(32, POINTER_BYTES) }; + if (NULL == cxt.pLocationHelper) { return TSDB_CODE_OUT_OF_MEMORY; } *pPhyNode = createPhysiNode(&cxt, pLogicNode); diff --git a/source/libs/planner/test/newPlannerTest.cpp b/source/libs/planner/test/newPlannerTest.cpp index 51ef52ac2d..e7c0c24830 100644 --- a/source/libs/planner/test/newPlannerTest.cpp +++ b/source/libs/planner/test/newPlannerTest.cpp @@ -62,7 +62,7 @@ protected: return false; } - cout << "sql : [" << cxt_.pSql << "]" << endl; + cout << "====================sql : [" << cxt_.pSql << "]" << endl; cout << "syntax test : " << endl; cout << syntaxTreeStr << endl; cout << "unformatted logic plan : " << endl; @@ -123,8 +123,8 @@ TEST_F(NewPlannerTest, simple) { TEST_F(NewPlannerTest, groupBy) { setDatabase("root", "test"); - // bind("SELECT count(*) FROM t1"); - // ASSERT_TRUE(run()); + bind("SELECT count(*) FROM t1"); + ASSERT_TRUE(run()); bind("SELECT c1, count(*) FROM t1 GROUP BY c1"); ASSERT_TRUE(run()); @@ -132,7 +132,7 @@ TEST_F(NewPlannerTest, groupBy) { bind("SELECT c1 + c3, c1 + count(*) FROM t1 where c2 = 'abc' GROUP BY c1, c3"); ASSERT_TRUE(run()); - bind("SELECT c1 + c3, count(*) FROM t1 where concat(c2, 'wwww') = 'abcwww' GROUP BY c1 + c3"); + bind("SELECT c1 + c3, sum(c4 * c5) FROM t1 where concat(c2, 'wwww') = 'abcwww' GROUP BY c1 + c3"); ASSERT_TRUE(run()); } diff --git a/source/libs/scalar/src/filter.c b/source/libs/scalar/src/filter.c index 5eb0003662..4d6656b061 100644 --- a/source/libs/scalar/src/filter.c +++ b/source/libs/scalar/src/filter.c @@ -1455,7 +1455,7 @@ void filterDumpInfoToString(SFilterInfo *info, const char *msg, int32_t options) for (uint32_t i = 0; i < info->fields[FLD_TYPE_COLUMN].num; ++i) { SFilterField *field = &info->fields[FLD_TYPE_COLUMN].fields[i]; SColumnNode *refNode = (SColumnNode *)field->desc; - qDebug("COL%d => [%d][%d]", i, refNode->tupleId, refNode->slotId); + qDebug("COL%d => [%d][%d]", i, refNode->dataBlockId, refNode->slotId); } qDebug("VALUE Field Num:%u", info->fields[FLD_TYPE_VALUE].num); @@ -1485,7 +1485,7 @@ void filterDumpInfoToString(SFilterInfo *info, const char *msg, int32_t options) SFilterField *left = FILTER_UNIT_LEFT_FIELD(info, unit); SColumnNode *refNode = (SColumnNode *)left->desc; if (unit->compare.optr >= 0 && unit->compare.optr <= OP_TYPE_JSON_CONTAINS){ - len = sprintf(str, "UNIT[%d] => [%d][%d] %s [", i, refNode->tupleId, refNode->slotId, gOptrStr[unit->compare.optr].str); + len = sprintf(str, "UNIT[%d] => [%d][%d] %s [", i, refNode->dataBlockId, refNode->slotId, gOptrStr[unit->compare.optr].str); } if (unit->right.type == FLD_TYPE_VALUE && FILTER_UNIT_OPTR(unit) != OP_TYPE_IN) { @@ -1504,7 +1504,7 @@ void filterDumpInfoToString(SFilterInfo *info, const char *msg, int32_t options) if (unit->compare.optr2) { strcat(str, " && "); if (unit->compare.optr2 >= 0 && unit->compare.optr2 <= OP_TYPE_JSON_CONTAINS){ - sprintf(str + strlen(str), "[%d][%d] %s [", refNode->tupleId, refNode->slotId, gOptrStr[unit->compare.optr2].str); + sprintf(str + strlen(str), "[%d][%d] %s [", refNode->dataBlockId, refNode->slotId, gOptrStr[unit->compare.optr2].str); } if (unit->right2.type == FLD_TYPE_VALUE && FILTER_UNIT_OPTR(unit) != OP_TYPE_IN) { diff --git a/source/libs/scalar/test/filter/filterTests.cpp b/source/libs/scalar/test/filter/filterTests.cpp index dc45f14366..a0618cc662 100644 --- a/source/libs/scalar/test/filter/filterTests.cpp +++ b/source/libs/scalar/test/filter/filterTests.cpp @@ -80,7 +80,7 @@ void flttMakeColRefNode(SNode **pNode, SSDataBlock **block, int32_t dataType, in SColumnNode *rnode = (SColumnNode *)node; rnode->node.resType.type = dataType; rnode->node.resType.bytes = dataBytes; - rnode->tupleId = 0; + rnode->dataBlockId = 0; if (NULL == block) { rnode->slotId = 2; diff --git a/source/libs/scalar/test/scalar/scalarTests.cpp b/source/libs/scalar/test/scalar/scalarTests.cpp index df746e19cf..086594ddf5 100644 --- a/source/libs/scalar/test/scalar/scalarTests.cpp +++ b/source/libs/scalar/test/scalar/scalarTests.cpp @@ -79,7 +79,7 @@ void scltMakeColRefNode(SNode **pNode, SSDataBlock **block, int32_t dataType, in SColumnNode *rnode = (SColumnNode *)node; rnode->node.resType.type = dataType; rnode->node.resType.bytes = dataBytes; - rnode->tupleId = 0; + rnode->dataBlockId = 0; if (NULL == *block) { SSDataBlock *res = (SSDataBlock *)calloc(1, sizeof(SSDataBlock)); diff --git a/source/libs/sync/CMakeLists.txt b/source/libs/sync/CMakeLists.txt index 37ee5194c8..cb38d7e363 100644 --- a/source/libs/sync/CMakeLists.txt +++ b/source/libs/sync/CMakeLists.txt @@ -13,4 +13,8 @@ target_include_directories( sync PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/sync" PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" -) \ No newline at end of file +) + +if(${BUILD_TEST}) + add_subdirectory(test) +endif(${BUILD_TEST}) diff --git a/source/libs/sync/inc/syncAppendEntries.h b/source/libs/sync/inc/syncAppendEntries.h index 9ca0de19c5..b7c1c051cc 100644 --- a/source/libs/sync/inc/syncAppendEntries.h +++ b/source/libs/sync/inc/syncAppendEntries.h @@ -23,6 +23,7 @@ extern "C" { #include #include #include +#include "syncInt.h" #include "syncMessage.h" #include "syncRaft.h" #include "taosdef.h" diff --git a/source/libs/sync/inc/syncAppendEntriesReply.h b/source/libs/sync/inc/syncAppendEntriesReply.h index 8b5cbf1da5..22f8eb464f 100644 --- a/source/libs/sync/inc/syncAppendEntriesReply.h +++ b/source/libs/sync/inc/syncAppendEntriesReply.h @@ -23,6 +23,7 @@ extern "C" { #include #include #include +#include "syncInt.h" #include "syncMessage.h" #include "syncRaft.h" #include "taosdef.h" diff --git a/source/libs/sync/inc/syncElection.h b/source/libs/sync/inc/syncElection.h index 34dfdb3d09..7e9e637854 100644 --- a/source/libs/sync/inc/syncElection.h +++ b/source/libs/sync/inc/syncElection.h @@ -23,6 +23,7 @@ extern "C" { #include #include #include +#include "syncInt.h" #include "taosdef.h" #ifdef __cplusplus diff --git a/source/libs/sync/inc/syncEnv.h b/source/libs/sync/inc/syncEnv.h new file mode 100644 index 0000000000..f1c4327b69 --- /dev/null +++ b/source/libs/sync/inc/syncEnv.h @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_LIBS_SYNC_ENV_H +#define _TD_LIBS_SYNC_ENV_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include +#include +#include +#include "syncInt.h" +#include "taosdef.h" +#include "trpc.h" + +typedef struct SSyncEnv { + void *pTimer; + void *pTimerManager; +} SSyncEnv; + +int32_t syncEnvStart(); + +int32_t syncEnvStop(); + +static int32_t doSyncEnvStart(SSyncEnv *pSyncEnv); + +static int32_t doSyncEnvStop(SSyncEnv *pSyncEnv); + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_LIBS_SYNC_ENV_H*/ diff --git a/source/libs/sync/inc/syncIO.h b/source/libs/sync/inc/syncIO.h new file mode 100644 index 0000000000..4b788efd79 --- /dev/null +++ b/source/libs/sync/inc/syncIO.h @@ -0,0 +1,74 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_LIBS_IO_H +#define _TD_LIBS_IO_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include +#include +#include +#include "os.h" +#include "syncInt.h" +#include "taosdef.h" +#include "tqueue.h" +#include "trpc.h" + +typedef struct SSyncIO { + void * serverRpc; + void * clientRpc; + STaosQueue *pMsgQ; + STaosQset * pQset; + pthread_t tid; + int8_t isStart; + + SEpSet epSet; + + void *syncTimer; + void *syncTimerManager; + + int32_t (*start)(struct SSyncIO *ths); + int32_t (*stop)(struct SSyncIO *ths); + int32_t (*ping)(struct SSyncIO *ths); + + int32_t (*onMsg)(struct SSyncIO *ths, void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet); + int32_t (*destroy)(struct SSyncIO *ths); + + void *pSyncNode; + int32_t (*FpOnPing)(struct SSyncNode *ths, SyncPing *pMsg); + +} SSyncIO; + +extern SSyncIO *gSyncIO; + +int32_t syncIOStart(); +int32_t syncIOStop(); +int32_t syncIOSendMsg(void *handle, const SEpSet *pEpSet, SRpcMsg *pMsg); +SSyncIO *syncIOCreate(); + +static int32_t doSyncIOStart(SSyncIO *io); +static int32_t doSyncIOStop(SSyncIO *io); +static int32_t doSyncIOPing(SSyncIO *io); +static int32_t doSyncIOOnMsg(struct SSyncIO *io, void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet); +static int32_t doSyncIODestroy(SSyncIO *io); + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_LIBS_IO_H*/ diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 551ce83122..ad8484662a 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -23,7 +23,11 @@ extern "C" { #include #include #include +#include "sync.h" #include "taosdef.h" +#include "tlog.h" + +extern int32_t sDebugFlag; #define sFatal(...) \ { \ @@ -62,16 +66,81 @@ extern "C" { } \ } +struct SRaft; +typedef struct SRaft SRaft; + +struct SyncPing; +typedef struct SyncPing SyncPing; + +struct SyncPingReply; +typedef struct SyncPingReply SyncPingReply; + +struct SyncRequestVote; +typedef struct SyncRequestVote SyncRequestVote; + +struct SyncRequestVoteReply; +typedef struct SyncRequestVoteReply SyncRequestVoteReply; + +struct SyncAppendEntries; +typedef struct SyncAppendEntries SyncAppendEntries; + +struct SyncAppendEntriesReply; +typedef struct SyncAppendEntriesReply SyncAppendEntriesReply; + typedef struct SSyncNode { - char path[TSDB_FILENAME_LEN]; - int8_t replica; - int8_t quorum; - int8_t selfIndex; - uint32_t vgId; - int32_t refCount; - int64_t rid; + int8_t replica; + int8_t quorum; + + SyncGroupId vgId; + SSyncCfg syncCfg; + char path[TSDB_FILENAME_LEN]; + + SRaft* pRaft; + + int32_t (*FpPing)(struct SSyncNode* ths, const SyncPing* pMsg); + + int32_t (*FpOnPing)(struct SSyncNode* ths, SyncPing* pMsg); + + int32_t (*FpOnPingReply)(struct SSyncNode* ths, SyncPingReply* pMsg); + + int32_t (*FpRequestVote)(struct SSyncNode* ths, const SyncRequestVote* pMsg); + + int32_t (*FpOnRequestVote)(struct SSyncNode* ths, SyncRequestVote* pMsg); + + int32_t (*FpOnRequestVoteReply)(struct SSyncNode* ths, SyncRequestVoteReply* pMsg); + + int32_t (*FpAppendEntries)(struct SSyncNode* ths, const SyncAppendEntries* pMsg); + + int32_t (*FpOnAppendEntries)(struct SSyncNode* ths, SyncAppendEntries* pMsg); + + int32_t (*FpOnAppendEntriesReply)(struct SSyncNode* ths, SyncAppendEntriesReply* pMsg); + + int32_t (*FpSendMsg)(void* handle, const SEpSet* pEpSet, SRpcMsg* pMsg); + } SSyncNode; +SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo); + +void syncNodeClose(SSyncNode* pSyncNode); + +static int32_t doSyncNodePing(struct SSyncNode* ths, const SyncPing* pMsg); + +static int32_t onSyncNodePing(struct SSyncNode* ths, SyncPing* pMsg); + +static int32_t onSyncNodePingReply(struct SSyncNode* ths, SyncPingReply* pMsg); + +static int32_t doSyncNodeRequestVote(struct SSyncNode* ths, const SyncRequestVote* pMsg); + +static int32_t onSyncNodeRequestVote(struct SSyncNode* ths, SyncRequestVote* pMsg); + +static int32_t onSyncNodeRequestVoteReply(struct SSyncNode* ths, SyncRequestVoteReply* pMsg); + +static int32_t doSyncNodeAppendEntries(struct SSyncNode* ths, const SyncAppendEntries* pMsg); + +static int32_t onSyncNodeAppendEntries(struct SSyncNode* ths, SyncAppendEntries* pMsg); + +static int32_t onSyncNodeAppendEntriesReply(struct SSyncNode* ths, SyncAppendEntriesReply* pMsg); + #ifdef __cplusplus } #endif diff --git a/source/libs/sync/inc/syncMessage.h b/source/libs/sync/inc/syncMessage.h index f410c8cf6e..2ee5e0109c 100644 --- a/source/libs/sync/inc/syncMessage.h +++ b/source/libs/sync/inc/syncMessage.h @@ -41,26 +41,26 @@ typedef enum ESyncMessageType { typedef struct SyncPing { ESyncMessageType msgType; const SSyncBuffer *pData; -} SyncPing; +} SyncPing, RaftPing; typedef struct SyncPingReply { ESyncMessageType msgType; const SSyncBuffer *pData; -} SyncPingReply; +} SyncPingReply, RaftPingReply; typedef struct SyncClientRequest { ESyncMessageType msgType; const SSyncBuffer *pData; int64_t seqNum; bool isWeak; -} SyncClientRequest; +} SyncClientRequest, RaftClientRequest; typedef struct SyncClientRequestReply { ESyncMessageType msgType; int32_t errCode; const SSyncBuffer *pErrMsg; const SSyncBuffer *pLeaderHint; -} SyncClientRequestReply; +} SyncClientRequestReply, RaftClientRequestReply; typedef struct SyncRequestVote { ESyncMessageType msgType; @@ -69,7 +69,7 @@ typedef struct SyncRequestVote { SyncGroupId vgId; SyncIndex lastLogIndex; SyncTerm lastLogTerm; -} SyncRequestVote; +} SyncRequestVote, RaftRequestVote; typedef struct SyncRequestVoteReply { ESyncMessageType msgType; @@ -77,7 +77,7 @@ typedef struct SyncRequestVoteReply { SyncNodeId nodeId; SyncGroupId vgId; bool voteGranted; -} SyncRequestVoteReply; +} SyncRequestVoteReply, RaftRequestVoteReply; typedef struct SyncAppendEntries { ESyncMessageType msgType; @@ -88,7 +88,7 @@ typedef struct SyncAppendEntries { int32_t entryCount; SSyncRaftEntry * logEntries; SyncIndex commitIndex; -} SyncAppendEntries; +} SyncAppendEntries, RaftAppendEntries; typedef struct SyncAppendEntriesReply { ESyncMessageType msgType; @@ -96,7 +96,7 @@ typedef struct SyncAppendEntriesReply { SyncNodeId nodeId; bool success; SyncIndex matchIndex; -} SyncAppendEntriesReply; +} SyncAppendEntriesReply, RaftAppendEntriesReply; #ifdef __cplusplus } diff --git a/source/libs/sync/inc/syncRaft.h b/source/libs/sync/inc/syncRaft.h index 0c7e573572..a247a29fc4 100644 --- a/source/libs/sync/inc/syncRaft.h +++ b/source/libs/sync/inc/syncRaft.h @@ -28,14 +28,56 @@ extern "C" { #include "taosdef.h" typedef struct SRaftId { - SyncNodeId nodeId; + SyncNodeId addr; SyncGroupId vgId; } SRaftId; typedef struct SRaft { - SRaftId id; + SRaftId id; + SSyncFSM* pFsm; + + int32_t (*FpPing)(struct SRaft* ths, const RaftPing* pMsg); + + int32_t (*FpOnPing)(struct SRaft* ths, RaftPing* pMsg); + + int32_t (*FpOnPingReply)(struct SRaft* ths, RaftPingReply* pMsg); + + int32_t (*FpRequestVote)(struct SRaft* ths, const RaftRequestVote* pMsg); + + int32_t (*FpOnRequestVote)(struct SRaft* ths, RaftRequestVote* pMsg); + + int32_t (*FpOnRequestVoteReply)(struct SRaft* ths, RaftRequestVoteReply* pMsg); + + int32_t (*FpAppendEntries)(struct SRaft* ths, const RaftAppendEntries* pMsg); + + int32_t (*FpOnAppendEntries)(struct SRaft* ths, RaftAppendEntries* pMsg); + + int32_t (*FpOnAppendEntriesReply)(struct SRaft* ths, RaftAppendEntriesReply* pMsg); + } SRaft; +SRaft* raftOpen(SRaftId raftId, SSyncFSM* pFsm); + +void raftClose(SRaft* pRaft); + +static int32_t doRaftPing(struct SRaft* ths, const RaftPing* pMsg); + +static int32_t onRaftPing(struct SRaft* ths, RaftPing* pMsg); + +static int32_t onRaftPingReply(struct SRaft* ths, RaftPingReply* pMsg); + +static int32_t doRaftRequestVote(struct SRaft* ths, const RaftRequestVote* pMsg); + +static int32_t onRaftRequestVote(struct SRaft* ths, RaftRequestVote* pMsg); + +static int32_t onRaftRequestVoteReply(struct SRaft* ths, RaftRequestVoteReply* pMsg); + +static int32_t doRaftAppendEntries(struct SRaft* ths, const RaftAppendEntries* pMsg); + +static int32_t onRaftAppendEntries(struct SRaft* ths, RaftAppendEntries* pMsg); + +static int32_t onRaftAppendEntriesReply(struct SRaft* ths, RaftAppendEntriesReply* pMsg); + int32_t raftPropose(SRaft* pRaft, const SSyncBuffer* pBuf, bool isWeak); static int raftSendMsg(SRaftId destRaftId, const void* pMsg, const SRaft* pRaft); diff --git a/source/libs/sync/inc/syncRaftEntry.h b/source/libs/sync/inc/syncRaftEntry.h index adc82f2c5d..516bef4d48 100644 --- a/source/libs/sync/inc/syncRaftEntry.h +++ b/source/libs/sync/inc/syncRaftEntry.h @@ -13,8 +13,8 @@ * along with this program. If not, see . */ -#ifndef _TD_LIBS_TPL_H -#define _TD_LIBS_TPL_H +#ifndef _TD_LIBS_SYNC_RAFT_ENTRY_H +#define _TD_LIBS_SYNC_RAFT_ENTRY_H #ifdef __cplusplus extern "C" { @@ -23,7 +23,7 @@ extern "C" { #include #include #include -#include "sync.h" +#include "syncInt.h" #include "taosdef.h" typedef struct SSyncRaftEntry { @@ -37,4 +37,4 @@ typedef struct SSyncRaftEntry { } #endif -#endif /*_TD_LIBS_TPL_H*/ +#endif /*_TD_LIBS_SYNC_RAFT_ENTRY_H*/ diff --git a/source/libs/sync/inc/syncRaftLog.h b/source/libs/sync/inc/syncRaftLog.h index 8c4b5116ea..ee971062cf 100644 --- a/source/libs/sync/inc/syncRaftLog.h +++ b/source/libs/sync/inc/syncRaftLog.h @@ -23,7 +23,7 @@ extern "C" { #include #include #include -#include "sync.h" +#include "syncInt.h" #include "taosdef.h" int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncBuffer* pBuf); diff --git a/source/libs/sync/inc/syncRaftStore.h b/source/libs/sync/inc/syncRaftStore.h index 4cb852f34a..0fdbd7a150 100644 --- a/source/libs/sync/inc/syncRaftStore.h +++ b/source/libs/sync/inc/syncRaftStore.h @@ -23,17 +23,36 @@ extern "C" { #include #include #include -#include "sync.h" +#include "cJSON.h" +#include "syncInt.h" #include "syncRaft.h" #include "taosdef.h" -int32_t currentTerm(SyncTerm *pCurrentTerm); +#define RAFT_STORE_BLOCK_SIZE 512 +#define RAFT_STORE_PATH_LEN 128 -int32_t persistCurrentTerm(SyncTerm currentTerm); +typedef struct SRaftStore { + SyncTerm currentTerm; + SRaftId voteFor; + //FileFd fd; + char path[RAFT_STORE_PATH_LEN]; +} SRaftStore; -int32_t voteFor(SRaftId *pRaftId); +SRaftStore *raftStoreOpen(const char *path); -int32_t persistVoteFor(SRaftId *pRaftId); +static int32_t raftStoreInit(SRaftStore *pRaftStore); + +int32_t raftStoreClose(SRaftStore *pRaftStore); + +int32_t raftStorePersist(SRaftStore *pRaftStore); + +static bool raftStoreFileExist(char *path); + +int32_t raftStoreSerialize(SRaftStore *pRaftStore, char *buf, size_t len); + +int32_t raftStoreDeserialize(SRaftStore *pRaftStore, char *buf, size_t len); + +void raftStorePrint(SRaftStore *pRaftStore); #ifdef __cplusplus } diff --git a/source/libs/sync/inc/syncReplication.h b/source/libs/sync/inc/syncReplication.h index 40c5ff790b..7f97ae9e49 100644 --- a/source/libs/sync/inc/syncReplication.h +++ b/source/libs/sync/inc/syncReplication.h @@ -23,6 +23,7 @@ extern "C" { #include #include #include +#include "syncInt.h" #include "taosdef.h" #ifdef __cplusplus diff --git a/source/libs/sync/inc/syncRequestVote.h b/source/libs/sync/inc/syncRequestVote.h index 3ff96bbe8f..c2eca55151 100644 --- a/source/libs/sync/inc/syncRequestVote.h +++ b/source/libs/sync/inc/syncRequestVote.h @@ -23,6 +23,7 @@ extern "C" { #include #include #include +#include "syncInt.h" #include "syncMessage.h" #include "syncRaft.h" #include "taosdef.h" diff --git a/source/libs/sync/inc/syncRequestVoteReply.h b/source/libs/sync/inc/syncRequestVoteReply.h index 033ac89bc2..38068dd0e2 100644 --- a/source/libs/sync/inc/syncRequestVoteReply.h +++ b/source/libs/sync/inc/syncRequestVoteReply.h @@ -23,6 +23,7 @@ extern "C" { #include #include #include +#include "syncInt.h" #include "syncMessage.h" #include "syncRaft.h" #include "taosdef.h" diff --git a/source/libs/sync/inc/syncSnapshot.h b/source/libs/sync/inc/syncSnapshot.h index 3b6121578a..89fcb230fb 100644 --- a/source/libs/sync/inc/syncSnapshot.h +++ b/source/libs/sync/inc/syncSnapshot.h @@ -23,7 +23,7 @@ extern "C" { #include #include #include -#include "sync.h" +#include "syncInt.h" #include "syncRaft.h" #include "taosdef.h" diff --git a/source/libs/sync/inc/syncTimeout.h b/source/libs/sync/inc/syncTimeout.h index 8159d2566c..d9d6a17939 100644 --- a/source/libs/sync/inc/syncTimeout.h +++ b/source/libs/sync/inc/syncTimeout.h @@ -23,6 +23,7 @@ extern "C" { #include #include #include +#include "syncInt.h" #include "syncMessage.h" #include "syncRaft.h" #include "taosdef.h" diff --git a/source/libs/sync/inc/syncVoteMgr.h b/source/libs/sync/inc/syncVoteMgr.h new file mode 100644 index 0000000000..cfcf58bee2 --- /dev/null +++ b/source/libs/sync/inc/syncVoteMgr.h @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_LIBS_SYNC_VOTG_MGR_H +#define _TD_LIBS_SYNC_VOTG_MGR_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include +#include +#include +#include "syncInt.h" +#include "taosdef.h" + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_LIBS_SYNC_VOTG_MGR_H*/ diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 1286108664..2b9c59ec92 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -14,104 +14,98 @@ */ #include "syncAppendEntries.h" -#include "sync.h" void appendEntries(SRaft *pRaft, const SyncAppendEntries *pMsg) { - -// TLA+ Spec -//AppendEntries(i, j) == -// /\ i /= j -// /\ state[i] = Leader -// /\ LET prevLogIndex == nextIndex[i][j] - 1 -// prevLogTerm == IF prevLogIndex > 0 THEN -// log[i][prevLogIndex].term -// ELSE -// 0 -// \* Send up to 1 entry, constrained by the end of the log. -// lastEntry == Min({Len(log[i]), nextIndex[i][j]}) -// entries == SubSeq(log[i], nextIndex[i][j], lastEntry) -// IN Send([mtype |-> AppendEntriesRequest, -// mterm |-> currentTerm[i], -// mprevLogIndex |-> prevLogIndex, -// mprevLogTerm |-> prevLogTerm, -// mentries |-> entries, -// \* mlog is used as a history variable for the proof. -// \* It would not exist in a real implementation. -// mlog |-> log[i], -// mcommitIndex |-> Min({commitIndex[i], lastEntry}), -// msource |-> i, -// mdest |-> j]) -// /\ UNCHANGED <> - + // TLA+ Spec + // AppendEntries(i, j) == + // /\ i /= j + // /\ state[i] = Leader + // /\ LET prevLogIndex == nextIndex[i][j] - 1 + // prevLogTerm == IF prevLogIndex > 0 THEN + // log[i][prevLogIndex].term + // ELSE + // 0 + // \* Send up to 1 entry, constrained by the end of the log. + // lastEntry == Min({Len(log[i]), nextIndex[i][j]}) + // entries == SubSeq(log[i], nextIndex[i][j], lastEntry) + // IN Send([mtype |-> AppendEntriesRequest, + // mterm |-> currentTerm[i], + // mprevLogIndex |-> prevLogIndex, + // mprevLogTerm |-> prevLogTerm, + // mentries |-> entries, + // \* mlog is used as a history variable for the proof. + // \* It would not exist in a real implementation. + // mlog |-> log[i], + // mcommitIndex |-> Min({commitIndex[i], lastEntry}), + // msource |-> i, + // mdest |-> j]) + // /\ UNCHANGED <> } void onAppendEntries(SRaft *pRaft, const SyncAppendEntries *pMsg) { - -// TLA+ Spec -//HandleAppendEntriesRequest(i, j, m) == -// LET logOk == \/ m.mprevLogIndex = 0 -// \/ /\ m.mprevLogIndex > 0 -// /\ m.mprevLogIndex <= Len(log[i]) -// /\ m.mprevLogTerm = log[i][m.mprevLogIndex].term -// IN /\ m.mterm <= currentTerm[i] -// /\ \/ /\ \* reject request -// \/ m.mterm < currentTerm[i] -// \/ /\ m.mterm = currentTerm[i] -// /\ state[i] = Follower -// /\ \lnot logOk -// /\ Reply([mtype |-> AppendEntriesResponse, -// mterm |-> currentTerm[i], -// msuccess |-> FALSE, -// mmatchIndex |-> 0, -// msource |-> i, -// mdest |-> j], -// m) -// /\ UNCHANGED <> -// \/ \* return to follower state -// /\ m.mterm = currentTerm[i] -// /\ state[i] = Candidate -// /\ state' = [state EXCEPT ![i] = Follower] -// /\ UNCHANGED <> -// \/ \* accept request -// /\ m.mterm = currentTerm[i] -// /\ state[i] = Follower -// /\ logOk -// /\ LET index == m.mprevLogIndex + 1 -// IN \/ \* already done with request -// /\ \/ m.mentries = << >> -// \/ /\ m.mentries /= << >> -// /\ Len(log[i]) >= index -// /\ log[i][index].term = m.mentries[1].term -// \* This could make our commitIndex decrease (for -// \* example if we process an old, duplicated request), -// \* but that doesn't really affect anything. -// /\ commitIndex' = [commitIndex EXCEPT ![i] = -// m.mcommitIndex] -// /\ Reply([mtype |-> AppendEntriesResponse, -// mterm |-> currentTerm[i], -// msuccess |-> TRUE, -// mmatchIndex |-> m.mprevLogIndex + -// Len(m.mentries), -// msource |-> i, -// mdest |-> j], -// m) -// /\ UNCHANGED <> -// \/ \* conflict: remove 1 entry -// /\ m.mentries /= << >> -// /\ Len(log[i]) >= index -// /\ log[i][index].term /= m.mentries[1].term -// /\ LET new == [index2 \in 1..(Len(log[i]) - 1) |-> -// log[i][index2]] -// IN log' = [log EXCEPT ![i] = new] -// /\ UNCHANGED <> -// \/ \* no conflict: append entry -// /\ m.mentries /= << >> -// /\ Len(log[i]) = m.mprevLogIndex -// /\ log' = [log EXCEPT ![i] = -// Append(log[i], m.mentries[1])] -// /\ UNCHANGED <> -// /\ UNCHANGED <> -// - - + // TLA+ Spec + // HandleAppendEntriesRequest(i, j, m) == + // LET logOk == \/ m.mprevLogIndex = 0 + // \/ /\ m.mprevLogIndex > 0 + // /\ m.mprevLogIndex <= Len(log[i]) + // /\ m.mprevLogTerm = log[i][m.mprevLogIndex].term + // IN /\ m.mterm <= currentTerm[i] + // /\ \/ /\ \* reject request + // \/ m.mterm < currentTerm[i] + // \/ /\ m.mterm = currentTerm[i] + // /\ state[i] = Follower + // /\ \lnot logOk + // /\ Reply([mtype |-> AppendEntriesResponse, + // mterm |-> currentTerm[i], + // msuccess |-> FALSE, + // mmatchIndex |-> 0, + // msource |-> i, + // mdest |-> j], + // m) + // /\ UNCHANGED <> + // \/ \* return to follower state + // /\ m.mterm = currentTerm[i] + // /\ state[i] = Candidate + // /\ state' = [state EXCEPT ![i] = Follower] + // /\ UNCHANGED <> + // \/ \* accept request + // /\ m.mterm = currentTerm[i] + // /\ state[i] = Follower + // /\ logOk + // /\ LET index == m.mprevLogIndex + 1 + // IN \/ \* already done with request + // /\ \/ m.mentries = << >> + // \/ /\ m.mentries /= << >> + // /\ Len(log[i]) >= index + // /\ log[i][index].term = m.mentries[1].term + // \* This could make our commitIndex decrease (for + // \* example if we process an old, duplicated request), + // \* but that doesn't really affect anything. + // /\ commitIndex' = [commitIndex EXCEPT ![i] = + // m.mcommitIndex] + // /\ Reply([mtype |-> AppendEntriesResponse, + // mterm |-> currentTerm[i], + // msuccess |-> TRUE, + // mmatchIndex |-> m.mprevLogIndex + + // Len(m.mentries), + // msource |-> i, + // mdest |-> j], + // m) + // /\ UNCHANGED <> + // \/ \* conflict: remove 1 entry + // /\ m.mentries /= << >> + // /\ Len(log[i]) >= index + // /\ log[i][index].term /= m.mentries[1].term + // /\ LET new == [index2 \in 1..(Len(log[i]) - 1) |-> + // log[i][index2]] + // IN log' = [log EXCEPT ![i] = new] + // /\ UNCHANGED <> + // \/ \* no conflict: append entry + // /\ m.mentries /= << >> + // /\ Len(log[i]) = m.mprevLogIndex + // /\ log' = [log EXCEPT ![i] = + // Append(log[i], m.mentries[1])] + // /\ UNCHANGED <> + // /\ UNCHANGED <> + // } diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index 4a9055e172..05734237b9 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -14,21 +14,18 @@ */ #include "syncAppendEntriesReply.h" -#include "sync.h" void onAppendEntriesReply(SRaft *pRaft, const SyncAppendEntriesReply *pMsg) { - -// TLA+ Spec -//HandleAppendEntriesResponse(i, j, m) == -// /\ m.mterm = currentTerm[i] -// /\ \/ /\ m.msuccess \* successful -// /\ nextIndex' = [nextIndex EXCEPT ![i][j] = m.mmatchIndex + 1] -// /\ matchIndex' = [matchIndex EXCEPT ![i][j] = m.mmatchIndex] -// \/ /\ \lnot m.msuccess \* not successful -// /\ nextIndex' = [nextIndex EXCEPT ![i][j] = -// Max({nextIndex[i][j] - 1, 1})] -// /\ UNCHANGED <> -// /\ Discard(m) -// /\ UNCHANGED <> - + // TLA+ Spec + // HandleAppendEntriesResponse(i, j, m) == + // /\ m.mterm = currentTerm[i] + // /\ \/ /\ m.msuccess \* successful + // /\ nextIndex' = [nextIndex EXCEPT ![i][j] = m.mmatchIndex + 1] + // /\ matchIndex' = [matchIndex EXCEPT ![i][j] = m.mmatchIndex] + // \/ /\ \lnot m.msuccess \* not successful + // /\ nextIndex' = [nextIndex EXCEPT ![i][j] = + // Max({nextIndex[i][j] - 1, 1})] + // /\ UNCHANGED <> + // /\ Discard(m) + // /\ UNCHANGED <> } diff --git a/source/libs/sync/src/syncElection.c b/source/libs/sync/src/syncElection.c index 738fc4c5e1..329105e2a1 100644 --- a/source/libs/sync/src/syncElection.c +++ b/source/libs/sync/src/syncElection.c @@ -13,4 +13,4 @@ * along with this program. If not, see . */ -#include "sync.h" +#include "syncElection.h" diff --git a/source/libs/sync/src/syncEnv.c b/source/libs/sync/src/syncEnv.c new file mode 100644 index 0000000000..e71cf55cb1 --- /dev/null +++ b/source/libs/sync/src/syncEnv.c @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "syncEnv.h" +#include + +SSyncEnv *gSyncEnv = NULL; + +int32_t syncEnvStart() { + int32_t ret; + gSyncEnv = (SSyncEnv *)malloc(sizeof(SSyncEnv)); + assert(gSyncEnv != NULL); + ret = doSyncEnvStart(gSyncEnv); + return ret; +} + +int32_t syncEnvStop() { + int32_t ret = doSyncEnvStop(gSyncEnv); + return ret; +} + +static int32_t doSyncEnvStart(SSyncEnv *pSyncEnv) { return 0; } + +static int32_t doSyncEnvStop(SSyncEnv *pSyncEnv) { return 0; } diff --git a/source/libs/sync/src/syncIO.c b/source/libs/sync/src/syncIO.c new file mode 100644 index 0000000000..bb20d11e37 --- /dev/null +++ b/source/libs/sync/src/syncIO.c @@ -0,0 +1,249 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "syncIO.h" +#include +#include "syncOnMessage.h" +#include "tglobal.h" +#include "ttimer.h" +#include "tutil.h" + +SSyncIO *gSyncIO = NULL; + +int32_t syncIOSendMsg(void *handle, const SEpSet *pEpSet, SRpcMsg *pMsg) { return 0; } + +int32_t syncIOStart() { return 0; } + +int32_t syncIOStop() { return 0; } + +static void syncTick(void *param, void *tmrId) { + SSyncIO *io = (SSyncIO *)param; + sDebug("syncTick ... "); + + SRpcMsg rpcMsg; + rpcMsg.pCont = rpcMallocCont(10); + snprintf(rpcMsg.pCont, 10, "TICK"); + rpcMsg.contLen = 10; + rpcMsg.handle = NULL; + rpcMsg.msgType = 2; + + SRpcMsg *pTemp; + + pTemp = taosAllocateQitem(sizeof(SRpcMsg)); + memcpy(pTemp, &rpcMsg, sizeof(SRpcMsg)); + + taosWriteQitem(io->pMsgQ, pTemp); + + io->syncTimer = taosTmrStart(syncTick, 1000, io, io->syncTimerManager); +} + +void *syncConsumer(void *param) { + SSyncIO *io = param; + + STaosQall *qall; + SRpcMsg * pRpcMsg, rpcMsg; + int type; + + qall = taosAllocateQall(); + + while (1) { + int numOfMsgs = taosReadAllQitemsFromQset(io->pQset, qall, NULL, NULL); + sDebug("%d sync-io msgs are received", numOfMsgs); + if (numOfMsgs <= 0) break; + + for (int i = 0; i < numOfMsgs; ++i) { + taosGetQitem(qall, (void **)&pRpcMsg); + sDebug("sync-io recv type:%d msg:%s", pRpcMsg->msgType, (char *)(pRpcMsg->pCont)); + } + + taosResetQitems(qall); + for (int i = 0; i < numOfMsgs; ++i) { + taosGetQitem(qall, (void **)&pRpcMsg); + rpcFreeCont(pRpcMsg->pCont); + + if (pRpcMsg->handle != NULL) { + int msgSize = 128; + memset(&rpcMsg, 0, sizeof(rpcMsg)); + rpcMsg.pCont = rpcMallocCont(msgSize); + rpcMsg.contLen = msgSize; + rpcMsg.handle = pRpcMsg->handle; + rpcMsg.code = 0; + rpcSendResponse(&rpcMsg); + } + + taosFreeQitem(pRpcMsg); + } + } + + taosFreeQall(qall); + return NULL; +} + +static int retrieveAuthInfo(void *parent, char *meterId, char *spi, char *encrypt, char *secret, char *ckey) { + // app shall retrieve the auth info based on meterID from DB or a data file + // demo code here only for simple demo + int ret = 0; + return ret; +} + +static void processResponse(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { + sDebug("processResponse ... "); + rpcFreeCont(pMsg->pCont); +} + +static void processRequestMsg(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { + SSyncIO *io = pParent; + SRpcMsg *pTemp; + + pTemp = taosAllocateQitem(sizeof(SRpcMsg)); + memcpy(pTemp, pMsg, sizeof(SRpcMsg)); + + sDebug("request is received, type:%d, contLen:%d, item:%p", pMsg->msgType, pMsg->contLen, pTemp); + taosWriteQitem(io->pMsgQ, pTemp); +} + +SSyncIO *syncIOCreate() { + SSyncIO *io = (SSyncIO *)malloc(sizeof(SSyncIO)); + memset(io, 0, sizeof(*io)); + + io->pMsgQ = taosOpenQueue(); + io->pQset = taosOpenQset(); + taosAddIntoQset(io->pQset, io->pMsgQ, NULL); + + io->start = doSyncIOStart; + io->stop = doSyncIOStop; + io->ping = doSyncIOPing; + io->onMsg = doSyncIOOnMsg; + io->destroy = doSyncIODestroy; + + return io; +} + +static int32_t doSyncIOStart(SSyncIO *io) { + taosBlockSIGPIPE(); + + tsRpcForceTcp = 1; + + // cient rpc init + { + SRpcInit rpcInit; + memset(&rpcInit, 0, sizeof(rpcInit)); + rpcInit.localPort = 0; + rpcInit.label = "SYNC-IO-CLIENT"; + rpcInit.numOfThreads = 1; + rpcInit.cfp = processResponse; + rpcInit.sessions = 100; + rpcInit.idleTime = 100; + rpcInit.user = "sync-io"; + rpcInit.secret = "sync-io"; + rpcInit.ckey = "key"; + rpcInit.spi = 0; + rpcInit.connType = TAOS_CONN_CLIENT; + + io->clientRpc = rpcOpen(&rpcInit); + if (io->clientRpc == NULL) { + sError("failed to initialize RPC"); + return -1; + } + } + + // server rpc init + { + SRpcInit rpcInit; + memset(&rpcInit, 0, sizeof(rpcInit)); + rpcInit.localPort = 38000; + rpcInit.label = "SYNC-IO-SERVER"; + rpcInit.numOfThreads = 1; + rpcInit.cfp = processRequestMsg; + rpcInit.sessions = 1000; + rpcInit.idleTime = 2 * 1500; + rpcInit.afp = retrieveAuthInfo; + rpcInit.parent = io; + rpcInit.connType = TAOS_CONN_SERVER; + + void *pRpc = rpcOpen(&rpcInit); + if (pRpc == NULL) { + sError("failed to start RPC server"); + return -1; + } + } + + io->epSet.inUse = 0; + addEpIntoEpSet(&io->epSet, "127.0.0.1", 38000); + + // start consumer thread + { + if (pthread_create(&io->tid, NULL, syncConsumer, io) != 0) { + sError("failed to create sync consumer thread since %s", strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + } + + // start tmr thread + io->syncTimerManager = taosTmrInit(1000, 50, 10000, "SYNC"); + io->syncTimer = taosTmrStart(syncTick, 1000, io, io->syncTimerManager); + + return 0; +} + +static int32_t doSyncIOStop(SSyncIO *io) { + atomic_store_8(&io->isStart, 0); + pthread_join(io->tid, NULL); + return 0; +} + +static int32_t doSyncIOPing(SSyncIO *io) { + SRpcMsg rpcMsg, rspMsg; + + rpcMsg.pCont = rpcMallocCont(10); + snprintf(rpcMsg.pCont, 10, "ping"); + rpcMsg.contLen = 10; + rpcMsg.handle = NULL; + rpcMsg.msgType = 1; + + rpcSendRequest(io->clientRpc, &io->epSet, &rpcMsg, NULL); + + return 0; +} + +static int32_t doSyncIOOnMsg(struct SSyncIO *io, void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { return 0; } + +static int32_t doSyncIODestroy(SSyncIO *io) { + int8_t start = atomic_load_8(&io->isStart); + assert(start == 0); + + if (io->serverRpc != NULL) { + free(io->serverRpc); + io->serverRpc = NULL; + } + + if (io->clientRpc != NULL) { + free(io->clientRpc); + io->clientRpc = NULL; + } + + if (io->pMsgQ != NULL) { + free(io->pMsgQ); + io->pMsgQ = NULL; + } + + if (io->pQset != NULL) { + free(io->pQset); + io->pQset = NULL; + } + + return 0; +} diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index fbb969eb1c..bd2952505e 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -16,12 +16,17 @@ #include #include "sync.h" #include "syncInt.h" +#include "syncRaft.h" int32_t syncInit() { return 0; } void syncCleanUp() {} -int64_t syncStart(const SSyncInfo* pSyncInfo) { return 0; } +int64_t syncStart(const SSyncInfo* pSyncInfo) { + SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo); + assert(pSyncNode != NULL); + return 0; +} void syncStop(int64_t rid) {} @@ -31,4 +36,74 @@ int32_t syncForwardToPeer(int64_t rid, const SSyncBuffer* pBuf, bool isWeak) { r ESyncState syncGetMyRole(int64_t rid) { return TAOS_SYNC_STATE_LEADER; } -void syncGetNodesRole(int64_t rid, SNodesRole* pNodeRole) {} \ No newline at end of file +void syncGetNodesRole(int64_t rid, SNodesRole* pNodeRole) {} + +SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { + SSyncNode* pSyncNode = (SSyncNode*)malloc(sizeof(SSyncNode)); + assert(pSyncNode != NULL); + + pSyncNode->FpSendMsg = pSyncInfo->FpSendMsg; + + pSyncNode->FpPing = doSyncNodePing; + pSyncNode->FpOnPing = onSyncNodePing; + pSyncNode->FpOnPingReply = onSyncNodePingReply; + pSyncNode->FpRequestVote = doSyncNodeRequestVote; + pSyncNode->FpOnRequestVote = onSyncNodeRequestVote; + pSyncNode->FpOnRequestVoteReply = onSyncNodeRequestVoteReply; + pSyncNode->FpAppendEntries = doSyncNodeAppendEntries; + pSyncNode->FpOnAppendEntries = onSyncNodeAppendEntries; + pSyncNode->FpOnAppendEntriesReply = onSyncNodeAppendEntriesReply; + + return pSyncNode; +} + +void syncNodeClose(SSyncNode* pSyncNode) { + assert(pSyncNode != NULL); + raftClose(pSyncNode->pRaft); + free(pSyncNode); +} + +static int32_t doSyncNodePing(struct SSyncNode* ths, const SyncPing* pMsg) { + int32_t ret = ths->pRaft->FpPing(ths->pRaft, pMsg); + return ret; +} + +static int32_t onSyncNodePing(struct SSyncNode* ths, SyncPing* pMsg) { + int32_t ret = ths->pRaft->FpOnPing(ths->pRaft, pMsg); + return ret; +} + +static int32_t onSyncNodePingReply(struct SSyncNode* ths, SyncPingReply* pMsg) { + int32_t ret = ths->pRaft->FpOnPingReply(ths->pRaft, pMsg); + return ret; +} + +static int32_t doSyncNodeRequestVote(struct SSyncNode* ths, const SyncRequestVote* pMsg) { + int32_t ret = ths->pRaft->FpRequestVote(ths->pRaft, pMsg); + return ret; +} + +static int32_t onSyncNodeRequestVote(struct SSyncNode* ths, SyncRequestVote* pMsg) { + int32_t ret = ths->pRaft->FpOnRequestVote(ths->pRaft, pMsg); + return ret; +} + +static int32_t onSyncNodeRequestVoteReply(struct SSyncNode* ths, SyncRequestVoteReply* pMsg) { + int32_t ret = ths->pRaft->FpOnRequestVoteReply(ths->pRaft, pMsg); + return ret; +} + +static int32_t doSyncNodeAppendEntries(struct SSyncNode* ths, const SyncAppendEntries* pMsg) { + int32_t ret = ths->pRaft->FpAppendEntries(ths->pRaft, pMsg); + return ret; +} + +static int32_t onSyncNodeAppendEntries(struct SSyncNode* ths, SyncAppendEntries* pMsg) { + int32_t ret = ths->pRaft->FpOnAppendEntries(ths->pRaft, pMsg); + return ret; +} + +static int32_t onSyncNodeAppendEntriesReply(struct SSyncNode* ths, SyncAppendEntriesReply* pMsg) { + int32_t ret = ths->pRaft->FpOnAppendEntriesReply(ths->pRaft, pMsg); + return ret; +} \ No newline at end of file diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index dcfc940f76..8937303725 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -14,7 +14,6 @@ */ #include "syncMessage.h" -#include "sync.h" #include "syncRaft.h" void onMessage(SRaft *pRaft, void *pMsg) {} \ No newline at end of file diff --git a/source/libs/sync/src/syncOnMessage.c b/source/libs/sync/src/syncOnMessage.c index 738fc4c5e1..19a97ee156 100644 --- a/source/libs/sync/src/syncOnMessage.c +++ b/source/libs/sync/src/syncOnMessage.c @@ -13,4 +13,4 @@ * along with this program. If not, see . */ -#include "sync.h" +#include "syncOnMessage.h" diff --git a/source/libs/sync/src/syncRaft.c b/source/libs/sync/src/syncRaft.c index 85c2c6fe27..9f139730d1 100644 --- a/source/libs/sync/src/syncRaft.c +++ b/source/libs/sync/src/syncRaft.c @@ -16,6 +16,51 @@ #include "syncRaft.h" #include "sync.h" +SRaft* raftOpen(SRaftId raftId, SSyncFSM* pFsm) { + SRaft* pRaft = (SRaft*)malloc(sizeof(SRaft)); + assert(pRaft != NULL); + + pRaft->id = raftId; + pRaft->pFsm = pFsm; + + pRaft->FpPing = doRaftPing; + pRaft->FpOnPing = onRaftPing; + pRaft->FpOnPingReply = onRaftPingReply; + + pRaft->FpRequestVote = doRaftRequestVote; + pRaft->FpOnRequestVote = onRaftRequestVote; + pRaft->FpOnRequestVoteReply = onRaftRequestVoteReply; + + pRaft->FpAppendEntries = doRaftAppendEntries; + pRaft->FpOnAppendEntries = onRaftAppendEntries; + pRaft->FpOnAppendEntriesReply = onRaftAppendEntriesReply; + + return pRaft; +} + +void raftClose(SRaft* pRaft) { + assert(pRaft != NULL); + free(pRaft); +} + +static int32_t doRaftPing(struct SRaft* ths, const RaftPing* pMsg) { return 0; } + +static int32_t onRaftPing(struct SRaft* ths, RaftPing* pMsg) { return 0; } + +static int32_t onRaftPingReply(struct SRaft* ths, RaftPingReply* pMsg) { return 0; } + +static int32_t doRaftRequestVote(struct SRaft* ths, const RaftRequestVote* pMsg) { return 0; } + +static int32_t onRaftRequestVote(struct SRaft* ths, RaftRequestVote* pMsg) { return 0; } + +static int32_t onRaftRequestVoteReply(struct SRaft* ths, RaftRequestVoteReply* pMsg) { return 0; } + +static int32_t doRaftAppendEntries(struct SRaft* ths, const RaftAppendEntries* pMsg) { return 0; } + +static int32_t onRaftAppendEntries(struct SRaft* ths, RaftAppendEntries* pMsg) { return 0; } + +static int32_t onRaftAppendEntriesReply(struct SRaft* ths, RaftAppendEntriesReply* pMsg) { return 0; } + int32_t raftPropose(SRaft* pRaft, const SSyncBuffer* pBuf, bool isWeak) { return 0; } static int raftSendMsg(SRaftId destRaftId, const void* pMsg, const SRaft* pRaft) { return 0; } diff --git a/source/libs/sync/src/syncRaftEntry.c b/source/libs/sync/src/syncRaftEntry.c index 738fc4c5e1..e525d3c7c2 100644 --- a/source/libs/sync/src/syncRaftEntry.c +++ b/source/libs/sync/src/syncRaftEntry.c @@ -13,4 +13,4 @@ * along with this program. If not, see . */ -#include "sync.h" +#include "syncRaftEntry.h" diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index 4a5fc201b0..37bb3ce48c 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -14,7 +14,6 @@ */ #include "syncRaftLog.h" -#include "sync.h" int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncBuffer* pBuf) { return 0; } diff --git a/source/libs/sync/src/syncRaftStore.c b/source/libs/sync/src/syncRaftStore.c index d45e53132c..964cc78490 100644 --- a/source/libs/sync/src/syncRaftStore.c +++ b/source/libs/sync/src/syncRaftStore.c @@ -14,12 +14,142 @@ */ #include "syncRaftStore.h" -#include "sync.h" +#include "cJSON.h" -int32_t currentTerm(SyncTerm *pCurrentTerm) { return 0; } +// to complie success: FileIO interface is modified -int32_t persistCurrentTerm(SyncTerm currentTerm) { return 0; } +SRaftStore *raftStoreOpen(const char *path) { return NULL;} -int32_t voteFor(SRaftId *pRaftId) { return 0; } +static int32_t raftStoreInit(SRaftStore *pRaftStore) { return 0;} -int32_t persistVoteFor(SRaftId *pRaftId) { return 0; } \ No newline at end of file +int32_t raftStoreClose(SRaftStore *pRaftStore) { return 0;} + +int32_t raftStorePersist(SRaftStore *pRaftStore) { return 0;} + +static bool raftStoreFileExist(char *path) { return 0;} + +int32_t raftStoreSerialize(SRaftStore *pRaftStore, char *buf, size_t len) { return 0;} + +int32_t raftStoreDeserialize(SRaftStore *pRaftStore, char *buf, size_t len) { return 0;} + +void raftStorePrint(SRaftStore *pRaftStore) {} + + + +#if 0 + +SRaftStore *raftStoreOpen(const char *path) { + int32_t ret; + + SRaftStore *pRaftStore = malloc(sizeof(SRaftStore)); + if (pRaftStore == NULL) { + sError("raftStoreOpen malloc error"); + return NULL; + } + memset(pRaftStore, 0, sizeof(*pRaftStore)); + snprintf(pRaftStore->path, sizeof(pRaftStore->path), "%s", path); + + char storeBuf[RAFT_STORE_BLOCK_SIZE]; + memset(storeBuf, 0, sizeof(storeBuf)); + + if (!raftStoreFileExist(pRaftStore->path)) { + ret = raftStoreInit(pRaftStore); + assert(ret == 0); + } + + pRaftStore->fd = taosOpenFileReadWrite(pRaftStore->path); + if (pRaftStore->fd < 0) { + return NULL; + } + + int len = taosReadFile(pRaftStore->fd, storeBuf, sizeof(storeBuf)); + assert(len == RAFT_STORE_BLOCK_SIZE); + + ret = raftStoreDeserialize(pRaftStore, storeBuf, len); + assert(ret == 0); + + return pRaftStore; +} + +static int32_t raftStoreInit(SRaftStore *pRaftStore) { + pRaftStore->fd = taosOpenFileCreateWrite(pRaftStore->path); + if (pRaftStore->fd < 0) { + return -1; + } + + pRaftStore->currentTerm = 0; + pRaftStore->voteFor.addr = 0; + pRaftStore->voteFor.vgId = 0; + + int32_t ret = raftStorePersist(pRaftStore); + assert(ret == 0); + + taosCloseFile(pRaftStore->fd); + return 0; +} + +int32_t raftStoreClose(SRaftStore *pRaftStore) { + taosCloseFile(pRaftStore->fd); + free(pRaftStore); + return 0; +} + +int32_t raftStorePersist(SRaftStore *pRaftStore) { + int32_t ret; + char storeBuf[RAFT_STORE_BLOCK_SIZE]; + + ret = raftStoreSerialize(pRaftStore, storeBuf, sizeof(storeBuf)); + assert(ret == 0); + + taosLSeekFile(pRaftStore->fd, 0, SEEK_SET); + + ret = taosWriteFile(pRaftStore->fd, storeBuf, sizeof(storeBuf)); + assert(ret == RAFT_STORE_BLOCK_SIZE); + + fsync(pRaftStore->fd); + return 0; +} + +static bool raftStoreFileExist(char *path) { return taosStatFile(path, NULL, NULL) >= 0; } + +int32_t raftStoreSerialize(SRaftStore *pRaftStore, char *buf, size_t len) { + cJSON *pRoot = cJSON_CreateObject(); + cJSON_AddNumberToObject(pRoot, "current_term", pRaftStore->currentTerm); + cJSON_AddNumberToObject(pRoot, "vote_for_addr", pRaftStore->voteFor.addr); + cJSON_AddNumberToObject(pRoot, "vote_for_vgid", pRaftStore->voteFor.vgId); + + char *serialized = cJSON_Print(pRoot); + int len2 = strlen(serialized); + assert(len2 < len); + memset(buf, 0, len); + snprintf(buf, len, "%s", serialized); + free(serialized); + + cJSON_Delete(pRoot); + return 0; +} + +int32_t raftStoreDeserialize(SRaftStore *pRaftStore, char *buf, size_t len) { + assert(len > 0 && len <= RAFT_STORE_BLOCK_SIZE); + cJSON *pRoot = cJSON_Parse(buf); + + cJSON *pCurrentTerm = cJSON_GetObjectItem(pRoot, "current_term"); + pRaftStore->currentTerm = pCurrentTerm->valueint; + + cJSON *pVoteForAddr = cJSON_GetObjectItem(pRoot, "vote_for_addr"); + pRaftStore->voteFor.addr = pVoteForAddr->valueint; + + cJSON *pVoteForVgid = cJSON_GetObjectItem(pRoot, "vote_for_vgid"); + pRaftStore->voteFor.vgId = pVoteForVgid->valueint; + + cJSON_Delete(pRoot); + return 0; +} + +void raftStorePrint(SRaftStore *pRaftStore) { + char storeBuf[RAFT_STORE_BLOCK_SIZE]; + raftStoreSerialize(pRaftStore, storeBuf, sizeof(storeBuf)); + printf("%s\n", storeBuf); +} + +#endif diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index 738fc4c5e1..4cea7c150e 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -13,4 +13,4 @@ * along with this program. If not, see . */ -#include "sync.h" +#include "syncReplication.h" diff --git a/source/libs/sync/src/syncRequestVote.c b/source/libs/sync/src/syncRequestVote.c index c31ec0f34d..7aee47b8e4 100644 --- a/source/libs/sync/src/syncRequestVote.c +++ b/source/libs/sync/src/syncRequestVote.c @@ -14,46 +14,41 @@ */ #include "syncRequestVote.h" -#include "sync.h" void requestVote(SRaft *pRaft, const SyncRequestVote *pMsg) { - -// TLA+ Spec -//RequestVote(i, j) == -// /\ state[i] = Candidate -// /\ j \notin votesResponded[i] -// /\ Send([mtype |-> RequestVoteRequest, -// mterm |-> currentTerm[i], -// mlastLogTerm |-> LastTerm(log[i]), -// mlastLogIndex |-> Len(log[i]), -// msource |-> i, -// mdest |-> j]) -// /\ UNCHANGED <> - + // TLA+ Spec + // RequestVote(i, j) == + // /\ state[i] = Candidate + // /\ j \notin votesResponded[i] + // /\ Send([mtype |-> RequestVoteRequest, + // mterm |-> currentTerm[i], + // mlastLogTerm |-> LastTerm(log[i]), + // mlastLogIndex |-> Len(log[i]), + // msource |-> i, + // mdest |-> j]) + // /\ UNCHANGED <> } void onRequestVote(SRaft *pRaft, const SyncRequestVote *pMsg) { - -// TLA+ Spec -//HandleRequestVoteRequest(i, j, m) == -// LET logOk == \/ m.mlastLogTerm > LastTerm(log[i]) -// \/ /\ m.mlastLogTerm = LastTerm(log[i]) -// /\ m.mlastLogIndex >= Len(log[i]) -// grant == /\ m.mterm = currentTerm[i] -// /\ logOk -// /\ votedFor[i] \in {Nil, j} -// IN /\ m.mterm <= currentTerm[i] -// /\ \/ grant /\ votedFor' = [votedFor EXCEPT ![i] = j] -// \/ ~grant /\ UNCHANGED votedFor -// /\ Reply([mtype |-> RequestVoteResponse, -// mterm |-> currentTerm[i], -// mvoteGranted |-> grant, -// \* mlog is used just for the `elections' history variable for -// \* the proof. It would not exist in a real implementation. -// mlog |-> log[i], -// msource |-> i, -// mdest |-> j], -// m) -// /\ UNCHANGED <> - + // TLA+ Spec + // HandleRequestVoteRequest(i, j, m) == + // LET logOk == \/ m.mlastLogTerm > LastTerm(log[i]) + // \/ /\ m.mlastLogTerm = LastTerm(log[i]) + // /\ m.mlastLogIndex >= Len(log[i]) + // grant == /\ m.mterm = currentTerm[i] + // /\ logOk + // /\ votedFor[i] \in {Nil, j} + // IN /\ m.mterm <= currentTerm[i] + // /\ \/ grant /\ votedFor' = [votedFor EXCEPT ![i] = j] + // \/ ~grant /\ UNCHANGED votedFor + // /\ Reply([mtype |-> RequestVoteResponse, + // mterm |-> currentTerm[i], + // mvoteGranted |-> grant, + // \* mlog is used just for the `elections' history variable for + // \* the proof. It would not exist in a real implementation. + // mlog |-> log[i], + // msource |-> i, + // mdest |-> j], + // m) + // /\ UNCHANGED <> } diff --git a/source/libs/sync/src/syncRequestVoteReply.c b/source/libs/sync/src/syncRequestVoteReply.c index ba9787f00c..a9c88a7975 100644 --- a/source/libs/sync/src/syncRequestVoteReply.c +++ b/source/libs/sync/src/syncRequestVoteReply.c @@ -14,25 +14,22 @@ */ #include "syncRequestVoteReply.h" -#include "sync.h" void onRequestVoteReply(SRaft *pRaft, const SyncRequestVoteReply *pMsg) { - -// TLA+ Spec -//HandleRequestVoteResponse(i, j, m) == -// \* This tallies votes even when the current state is not Candidate, but -// \* they won't be looked at, so it doesn't matter. -// /\ m.mterm = currentTerm[i] -// /\ votesResponded' = [votesResponded EXCEPT ![i] = -// votesResponded[i] \cup {j}] -// /\ \/ /\ m.mvoteGranted -// /\ votesGranted' = [votesGranted EXCEPT ![i] = -// votesGranted[i] \cup {j}] -// /\ voterLog' = [voterLog EXCEPT ![i] = -// voterLog[i] @@ (j :> m.mlog)] -// \/ /\ ~m.mvoteGranted -// /\ UNCHANGED <> -// /\ Discard(m) -// /\ UNCHANGED <> - + // TLA+ Spec + // HandleRequestVoteResponse(i, j, m) == + // \* This tallies votes even when the current state is not Candidate, but + // \* they won't be looked at, so it doesn't matter. + // /\ m.mterm = currentTerm[i] + // /\ votesResponded' = [votesResponded EXCEPT ![i] = + // votesResponded[i] \cup {j}] + // /\ \/ /\ m.mvoteGranted + // /\ votesGranted' = [votesGranted EXCEPT ![i] = + // votesGranted[i] \cup {j}] + // /\ voterLog' = [voterLog EXCEPT ![i] = + // voterLog[i] @@ (j :> m.mlog)] + // \/ /\ ~m.mvoteGranted + // /\ UNCHANGED <> + // /\ Discard(m) + // /\ UNCHANGED <> } diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 8a27f097d1..da194780ff 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -14,7 +14,6 @@ */ #include "syncSnapshot.h" -#include "sync.h" #include "syncRaft.h" int32_t takeSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot) { return 0; } diff --git a/source/libs/sync/src/syncTimeout.c b/source/libs/sync/src/syncTimeout.c index 206dd70046..e27df55d07 100644 --- a/source/libs/sync/src/syncTimeout.c +++ b/source/libs/sync/src/syncTimeout.c @@ -14,6 +14,5 @@ */ #include "syncTimeout.h" -#include "sync.h" void onTimeout(SRaft *pRaft, void *pMsg) {} \ No newline at end of file diff --git a/source/libs/sync/src/syncVoteMgr.c b/source/libs/sync/src/syncVoteMgr.c new file mode 100644 index 0000000000..02cf4ac033 --- /dev/null +++ b/source/libs/sync/src/syncVoteMgr.c @@ -0,0 +1,16 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "syncVoteMgr.h" diff --git a/source/libs/sync/test/CMakeLists.txt b/source/libs/sync/test/CMakeLists.txt new file mode 100644 index 0000000000..e655ac01be --- /dev/null +++ b/source/libs/sync/test/CMakeLists.txt @@ -0,0 +1,55 @@ +add_executable(syncTest "") +add_executable(syncEnvTest "") +add_executable(syncPingTest "") + + +target_sources(syncTest + PRIVATE + "syncTest.cpp" +) +target_sources(syncEnvTest + PRIVATE + "syncEnvTest.cpp" +) +target_sources(syncPingTest + PRIVATE + "syncPingTest.cpp" +) + + +target_include_directories(syncTest + PUBLIC + "${CMAKE_SOURCE_DIR}/include/libs/sync" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) +target_include_directories(syncEnvTest + PUBLIC + "${CMAKE_SOURCE_DIR}/include/libs/sync" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) +target_include_directories(syncPingTest + PUBLIC + "${CMAKE_SOURCE_DIR}/include/libs/sync" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) + + +target_link_libraries(syncTest + sync + gtest_main +) +target_link_libraries(syncEnvTest + sync + gtest_main +) +target_link_libraries(syncPingTest + sync + gtest_main +) + + +enable_testing() +add_test( + NAME sync_test + COMMAND syncTest +) diff --git a/source/libs/sync/test/syncEnvTest.cpp b/source/libs/sync/test/syncEnvTest.cpp new file mode 100644 index 0000000000..1d050e7094 --- /dev/null +++ b/source/libs/sync/test/syncEnvTest.cpp @@ -0,0 +1,56 @@ +#include "syncEnv.h" +#include +#include "syncIO.h" +#include "syncInt.h" +#include "syncRaftStore.h" + +void logTest() { + sTrace("--- sync log test: trace"); + sDebug("--- sync log test: debug"); + sInfo("--- sync log test: info"); + sWarn("--- sync log test: warn"); + sError("--- sync log test: error"); + sFatal("--- sync log test: fatal"); +} + +void doSync() { + SSyncInfo syncInfo; + syncInfo.vgId = 1; + + SSyncCfg* pCfg = &syncInfo.syncCfg; + pCfg->replicaNum = 3; + + pCfg->nodeInfo[0].nodePort = 7010; + taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn); + + pCfg->nodeInfo[1].nodePort = 7110; + taosGetFqdn(pCfg->nodeInfo[1].nodeFqdn); + + pCfg->nodeInfo[2].nodePort = 7210; + taosGetFqdn(pCfg->nodeInfo[2].nodeFqdn); + + SSyncNode* pSyncNode = syncNodeOpen(&syncInfo); + assert(pSyncNode != NULL); +} + +int main() { + //taosInitLog((char*)"syncEnvTest.log", 100000, 10); + tsAsyncLog = 0; + sDebugFlag = 143 + 64; + + logTest(); + + int32_t ret = syncIOStart(); + assert(ret == 0); + + ret = syncEnvStart(); + assert(ret == 0); + + doSync(); + + while (1) { + taosMsleep(1000); + } + + return 0; +} diff --git a/source/libs/sync/test/syncPingTest.cpp b/source/libs/sync/test/syncPingTest.cpp new file mode 100644 index 0000000000..e62d051946 --- /dev/null +++ b/source/libs/sync/test/syncPingTest.cpp @@ -0,0 +1,65 @@ +#include +#include "syncEnv.h" +#include "syncIO.h" +#include "syncInt.h" +#include "syncRaftStore.h" + +void logTest() { + sTrace("--- sync log test: trace"); + sDebug("--- sync log test: debug"); + sInfo("--- sync log test: info"); + sWarn("--- sync log test: warn"); + sError("--- sync log test: error"); + sFatal("--- sync log test: fatal"); +} + +void doSync() { + SSyncFSM* pFsm; + + SSyncInfo syncInfo; + syncInfo.vgId = 1; + syncInfo.FpSendMsg = syncIOSendMsg; + syncInfo.pFsm = pFsm; + snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./test_sync_ping"); + + SSyncCfg* pCfg = &syncInfo.syncCfg; + pCfg->myIndex = 0; + pCfg->replicaNum = 3; + + pCfg->nodeInfo[0].nodePort = 7010; + taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn); + + pCfg->nodeInfo[1].nodePort = 7110; + taosGetFqdn(pCfg->nodeInfo[1].nodeFqdn); + + pCfg->nodeInfo[2].nodePort = 7210; + taosGetFqdn(pCfg->nodeInfo[2].nodeFqdn); + + SSyncNode* pSyncNode = syncNodeOpen(&syncInfo); + assert(pSyncNode != NULL); + + gSyncIO->FpOnPing = pSyncNode->FpOnPing; + gSyncIO->pSyncNode = pSyncNode; +} + +int main() { + //taosInitLog((char*)"syncPingTest.log", 100000, 10); + tsAsyncLog = 0; + sDebugFlag = 143 + 64; + + logTest(); + + int32_t ret = syncIOStart(); + assert(ret == 0); + + ret = syncEnvStart(); + assert(ret == 0); + + doSync(); + + while (1) { + taosMsleep(1000); + } + + return 0; +} diff --git a/source/libs/sync/test/syncTest.cpp b/source/libs/sync/test/syncTest.cpp index 47566d537e..0f72fd822f 100644 --- a/source/libs/sync/test/syncTest.cpp +++ b/source/libs/sync/test/syncTest.cpp @@ -1,7 +1,58 @@ #include +#include "syncIO.h" +#include "syncInt.h" +#include "syncRaftStore.h" +#include "gtest/gtest.h" -int main() { - printf("test \n"); - return 0; +void *pingFunc(void *param) { + SSyncIO *io = (SSyncIO *)param; + while (1) { + sDebug("io->ping"); + io->ping(io); + sleep(1); + } + return NULL; } +int main() { + //taosInitLog((char *)"syncTest.log", 100000, 10); + tsAsyncLog = 0; + sDebugFlag = 143 + 64; + + sTrace("sync log test: trace"); + sDebug("sync log test: debug"); + sInfo("sync log test: info"); + sWarn("sync log test: warn"); + sError("sync log test: error"); + sFatal("sync log test: fatal"); + + SRaftStore *pRaftStore = raftStoreOpen("./raft_store.json"); + // assert(pRaftStore != NULL); + + // raftStorePrint(pRaftStore); + + // pRaftStore->currentTerm = 100; + // pRaftStore->voteFor.addr = 200; + // pRaftStore->voteFor.vgId = 300; + + // raftStorePrint(pRaftStore); + + // raftStorePersist(pRaftStore); + + // sDebug("sync test"); + + // SSyncIO *syncIO = syncIOCreate(); + // assert(syncIO != NULL); + + // syncIO->start(syncIO); + + // sleep(2); + + // pthread_t tid; + // pthread_create(&tid, NULL, pingFunc, syncIO); + + // while (1) { + // sleep(1); + // } + return 0; +} diff --git a/source/os/src/osFile.c b/source/os/src/osFile.c index c34513a2f1..2d8314058e 100644 --- a/source/os/src/osFile.c +++ b/source/os/src/osFile.c @@ -240,6 +240,7 @@ int64_t taosCloseFile(TdFilePtr *ppFile) { return 0; } fflush((*ppFile)->fp); + fsync((*ppFile)->fd); close((*ppFile)->fd); (*ppFile)->fd = -1; (*ppFile)->fp = NULL; @@ -295,12 +296,16 @@ int64_t taosWriteFile(TdFilePtr pFile, const void *buf, int64_t count) { if (errno == EINTR) { continue; } + fflush(pFile->fp); + fsync(pFile->fd); return -1; } nleft -= nwritten; tbuf += nwritten; } + fflush(pFile->fp); + fsync(pFile->fd); return count; } diff --git a/source/util/src/tjson.c b/source/util/src/tjson.c index 13367843fc..0c9d32ea13 100644 --- a/source/util/src/tjson.c +++ b/source/util/src/tjson.c @@ -36,6 +36,10 @@ int32_t tjsonAddDoubleToObject(SJson* pJson, const char* pName, const double num return (NULL == cJSON_AddNumberToObject((cJSON*)pJson, pName, number) ? TSDB_CODE_FAILED : TSDB_CODE_SUCCESS); } +int32_t tjsonAddBoolToObject(SJson* pJson, const char* pName, const bool boolean) { + return (NULL == cJSON_AddBoolToObject((cJSON*)pJson, pName, boolean) ? TSDB_CODE_FAILED : TSDB_CODE_SUCCESS); +} + int32_t tjsonAddStringToObject(SJson* pJson, const char* pName, const char* pVal) { return (NULL == cJSON_AddStringToObject((cJSON*)pJson, pName, pVal) ? TSDB_CODE_FAILED : TSDB_CODE_SUCCESS); } @@ -81,4 +85,113 @@ char* tjsonToString(const SJson* pJson) { char* tjsonToUnformattedString(const SJson* pJson) { return cJSON_PrintUnformatted((cJSON*)pJson); -} \ No newline at end of file +} + + +SJson* tjsonGetObjectItem(const SJson* pJson, const char* pName) { + return cJSON_GetObjectItem(pJson, pName); +} + +int32_t tjsonGetStringValue(const SJson* pJson, const char* pName, char* pVal) { + char* p = cJSON_GetStringValue(tjsonGetObjectItem((cJSON*)pJson, pName)); + if (NULL == p) { + return TSDB_CODE_FAILED; + } + strcpy(pVal, p); + return TSDB_CODE_SUCCESS; +} + +int32_t tjsonDupStringValue(const SJson* pJson, const char* pName, char** pVal) { + char* p = cJSON_GetStringValue(tjsonGetObjectItem((cJSON*)pJson, pName)); + if (NULL == p) { + return TSDB_CODE_FAILED; + } + *pVal = strdup(p); + return TSDB_CODE_SUCCESS; +} + +int32_t tjsonGetBigIntValue(const SJson* pJson, const char* pName, int64_t* pVal) { + char* p = cJSON_GetStringValue(tjsonGetObjectItem((cJSON*)pJson, pName)); + if (NULL == p) { + return TSDB_CODE_FAILED; + } + char* pEnd = NULL; + *pVal = strtol(p, &pEnd, 10); + return (NULL == pEnd ? TSDB_CODE_SUCCESS : TSDB_CODE_FAILED); +} + +int32_t tjsonGetIntValue(const SJson* pJson, const char* pName, int32_t* pVal) { + int64_t val = 0; + int32_t code = tjsonGetBigIntValue(pJson, pName, &val); + *pVal = val; + return code; +} + +int32_t tjsonGetSmallIntValue(const SJson* pJson, const char* pName, int16_t* pVal) { + int64_t val = 0; + int32_t code = tjsonGetBigIntValue(pJson, pName, &val); + *pVal = val; + return code; +} + +int32_t tjsonGetTinyIntValue(const SJson* pJson, const char* pName, int8_t* pVal) { + int64_t val = 0; + int32_t code = tjsonGetBigIntValue(pJson, pName, &val); + *pVal = val; + return code; +} + +int32_t tjsonGetUBigIntValue(const SJson* pJson, const char* pName, uint64_t* pVal) { + char* p = cJSON_GetStringValue(tjsonGetObjectItem((cJSON*)pJson, pName)); + if (NULL == p) { + return TSDB_CODE_FAILED; + } + char* pEnd = NULL; + *pVal = strtoul(p, &pEnd, 10); + return (NULL == pEnd ? TSDB_CODE_SUCCESS : TSDB_CODE_FAILED); +} + +int32_t tjsonGetUTinyIntValue(const SJson* pJson, const char* pName, uint8_t* pVal) { + uint64_t val = 0; + int32_t code = tjsonGetUBigIntValue(pJson, pName, &val); + *pVal = val; + return code; +} + +int32_t tjsonGetBoolValue(const SJson* pJson, const char* pName, bool* pVal) { + const SJson* pObject = tjsonGetObjectItem(pJson, pName); + if (cJSON_IsBool(pObject)) { + return TSDB_CODE_FAILED; + } + *pVal = cJSON_IsTrue(pObject) ? true : false; + return TSDB_CODE_SUCCESS; +} + +int32_t tjsonGetDoubleValue(const SJson* pJson, const char* pName, double* pVal) { + const SJson* pObject = tjsonGetObjectItem(pJson, pName); + if (!cJSON_IsNumber(pObject)) { + return TSDB_CODE_FAILED; + } + *pVal = cJSON_GetNumberValue(pObject); + return TSDB_CODE_SUCCESS; +} + +int32_t tjsonGetArraySize(const SJson* pJson) { + return cJSON_GetArraySize(pJson); +} + +SJson* tjsonGetArrayItem(const SJson* pJson, int32_t index) { + return cJSON_GetArrayItem(pJson, index); +} + +int32_t tjsonToObject(const SJson* pJson, const char* pName, FToObject func, void* pObj) { + SJson* pJsonObj = tjsonGetObjectItem(pJson, pName); + if (NULL == pJsonObj) { + return TSDB_CODE_FAILED; + } + return func(pJsonObj, pObj); +} + +SJson* tjsonParse(const char* pStr) { + return cJSON_Parse(pStr); +} diff --git a/tests b/tests index 12233db374..904e6f0e15 160000 --- a/tests +++ b/tests @@ -1 +1 @@ -Subproject commit 12233db374f1fe97b327e89a3442c631578ad38d +Subproject commit 904e6f0e152e8fe61edfe0a0a9ae497cfde2a72c