Merge branch '3.0' of https://github.com/taosdata/TDengine into feat/tsdb_refact

This commit is contained in:
Hongze Cheng 2022-06-20 05:13:12 +00:00
commit a889ac3edc
37 changed files with 795 additions and 478 deletions

View File

@ -106,6 +106,9 @@ typedef struct SInterpFuncLogicNode {
SNodeList* pFuncs; SNodeList* pFuncs;
STimeWindow timeRange; STimeWindow timeRange;
int64_t interval; int64_t interval;
EFillMode fillMode;
SNode* pFillValues; // SNodeListNode
SNode* pTimeSeries; // SColumnNode
} SInterpFuncLogicNode; } SInterpFuncLogicNode;
typedef enum EModifyTableType { MODIFY_TABLE_TYPE_INSERT = 1, MODIFY_TABLE_TYPE_DELETE } EModifyTableType; typedef enum EModifyTableType { MODIFY_TABLE_TYPE_INSERT = 1, MODIFY_TABLE_TYPE_DELETE } EModifyTableType;
@ -309,6 +312,9 @@ typedef struct SInterpFuncPhysiNode {
SNodeList* pFuncs; SNodeList* pFuncs;
STimeWindow timeRange; STimeWindow timeRange;
int64_t interval; int64_t interval;
EFillMode fillMode;
SNode* pFillValues; // SNodeListNode
SNode* pTimeSeries; // SColumnNode
} SInterpFuncPhysiNode; } SInterpFuncPhysiNode;
typedef struct SJoinPhysiNode { typedef struct SJoinPhysiNode {

View File

@ -107,7 +107,9 @@ typedef struct SSyncFSM {
void (*FpReConfigCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta cbMeta); void (*FpReConfigCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta cbMeta);
void (*FpLeaderTransferCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta); void (*FpLeaderTransferCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
int32_t (*FpGetSnapshot)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot);
int32_t (*FpGetSnapshot)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot, void *pReaderParam, void** ppReader);
int32_t (*FpGetSnapshotInfo)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot);
int32_t (*FpSnapshotStartRead)(struct SSyncFSM* pFsm, void** ppReader); int32_t (*FpSnapshotStartRead)(struct SSyncFSM* pFsm, void** ppReader);
int32_t (*FpSnapshotStopRead)(struct SSyncFSM* pFsm, void* pReader); int32_t (*FpSnapshotStopRead)(struct SSyncFSM* pFsm, void* pReader);
@ -193,8 +195,7 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak);
bool syncEnvIsStart(); bool syncEnvIsStart();
const char* syncStr(ESyncState state); const char* syncStr(ESyncState state);
bool syncIsRestoreFinish(int64_t rid); bool syncIsRestoreFinish(int64_t rid);
int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta);
int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct SSnapshotMeta* sMeta);
int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg); int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg);

View File

@ -565,6 +565,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_PAR_STREAM_NOT_ALLOWED_FUNC TAOS_DEF_ERROR_CODE(0, 0x265A) #define TSDB_CODE_PAR_STREAM_NOT_ALLOWED_FUNC TAOS_DEF_ERROR_CODE(0, 0x265A)
#define TSDB_CODE_PAR_GROUP_BY_NOT_ALLOWED_FUNC TAOS_DEF_ERROR_CODE(0, 0x265B) #define TSDB_CODE_PAR_GROUP_BY_NOT_ALLOWED_FUNC TAOS_DEF_ERROR_CODE(0, 0x265B)
#define TSDB_CODE_PAR_INVALID_TABLE_OPTION TAOS_DEF_ERROR_CODE(0, 0x265C) #define TSDB_CODE_PAR_INVALID_TABLE_OPTION TAOS_DEF_ERROR_CODE(0, 0x265C)
#define TSDB_CODE_PAR_INVALID_INTERP_CLAUSE TAOS_DEF_ERROR_CODE(0, 0x265D)
//planner //planner
#define TSDB_CODE_PLAN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2700) #define TSDB_CODE_PLAN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2700)

View File

@ -73,7 +73,17 @@ void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbM
} }
} }
int32_t mndSyncGetSnapshot(struct SSyncFSM *pFsm, SSnapshot *pSnapshot) { int32_t mndSyncGetSnapshot(struct SSyncFSM *pFsm, SSnapshot *pSnapshot, void *pReaderParam, void **ppReader) {
// TODO:
// atomic operation
// step1. sdbGetCommitInfo
// step2. create ppReader with pReaderParam
return 0;
}
int32_t mndSyncGetSnapshotInfo(struct SSyncFSM *pFsm, SSnapshot *pSnapshot) {
SMnode *pMnode = pFsm->data; SMnode *pMnode = pFsm->data;
sdbGetCommitInfo(pMnode->pSdb, &pSnapshot->lastApplyIndex, &pSnapshot->lastApplyTerm, &pSnapshot->lastConfigIndex); sdbGetCommitInfo(pMnode->pSdb, &pSnapshot->lastApplyIndex, &pSnapshot->lastApplyTerm, &pSnapshot->lastConfigIndex);
return 0; return 0;
@ -159,6 +169,7 @@ SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) {
pFsm->FpRestoreFinishCb = mndRestoreFinish; pFsm->FpRestoreFinishCb = mndRestoreFinish;
pFsm->FpReConfigCb = mndReConfig; pFsm->FpReConfigCb = mndReConfig;
pFsm->FpGetSnapshot = mndSyncGetSnapshot; pFsm->FpGetSnapshot = mndSyncGetSnapshot;
pFsm->FpGetSnapshotInfo = mndSyncGetSnapshotInfo;
pFsm->FpSnapshotStartRead = mndSnapshotStartRead; pFsm->FpSnapshotStartRead = mndSnapshotStartRead;
pFsm->FpSnapshotStopRead = mndSnapshotStopRead; pFsm->FpSnapshotStopRead = mndSnapshotStopRead;
pFsm->FpSnapshotDoRead = mndSnapshotDoRead; pFsm->FpSnapshotDoRead = mndSnapshotDoRead;

View File

@ -353,8 +353,8 @@ static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta c
SyncIndex beginIndex = SYNC_INDEX_INVALID; SyncIndex beginIndex = SYNC_INDEX_INVALID;
char logBuf[256] = {0}; char logBuf[256] = {0};
if (pFsm->FpGetSnapshot != NULL) { if (pFsm->FpGetSnapshotInfo != NULL) {
(*pFsm->FpGetSnapshot)(pFsm, &snapshot); (*pFsm->FpGetSnapshotInfo)(pFsm, &snapshot);
beginIndex = snapshot.lastApplyIndex; beginIndex = snapshot.lastApplyIndex;
} }
@ -416,7 +416,7 @@ static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
pFsm->FpCommitCb = vnodeSyncCommitMsg; pFsm->FpCommitCb = vnodeSyncCommitMsg;
pFsm->FpPreCommitCb = vnodeSyncPreCommitMsg; pFsm->FpPreCommitCb = vnodeSyncPreCommitMsg;
pFsm->FpRollBackCb = vnodeSyncRollBackMsg; pFsm->FpRollBackCb = vnodeSyncRollBackMsg;
pFsm->FpGetSnapshot = vnodeSyncGetSnapshot; pFsm->FpGetSnapshotInfo = vnodeSyncGetSnapshot;
pFsm->FpRestoreFinishCb = NULL; pFsm->FpRestoreFinishCb = NULL;
pFsm->FpReConfigCb = vnodeSyncReconfig; pFsm->FpReConfigCb = vnodeSyncReconfig;
pFsm->FpSnapshotStartRead = vnodeSnapshotStartRead; pFsm->FpSnapshotStartRead = vnodeSnapshotStartRead;

View File

@ -2892,7 +2892,10 @@ static void rebuildTimeWindow(SStreamSessionAggOperatorInfo* pInfo, SArray* pWin
SArray* pChWins = getWinInfos(&pChInfo->streamAggSup, groupId); SArray* pChWins = getWinInfos(&pChInfo->streamAggSup, groupId);
int32_t chWinSize = taosArrayGetSize(pChWins); int32_t chWinSize = taosArrayGetSize(pChWins);
int32_t index = binarySearch(pChWins, chWinSize, pParentWin->win.skey, TSDB_ORDER_DESC, getSessionWindowEndkey); int32_t index = binarySearch(pChWins, chWinSize, pParentWin->win.skey, TSDB_ORDER_DESC, getSessionWindowEndkey);
for (int32_t k = index; k > 0 && k < chWinSize; k++) { if (index < 0) {
index = 0;
}
for (int32_t k = index; k < chWinSize; k++) {
SResultWindowInfo* pcw = taosArrayGet(pChWins, k); SResultWindowInfo* pcw = taosArrayGet(pChWins, k);
if (pParentWin->win.skey <= pcw->win.skey && pcw->win.ekey <= pParentWin->win.ekey) { if (pParentWin->win.skey <= pcw->win.skey && pcw->win.ekey <= pParentWin->win.ekey) {
SResultRow* pChResult = NULL; SResultRow* pChResult = NULL;

View File

@ -1481,11 +1481,13 @@ int32_t minMaxCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx, int3
if (pSBuf->assign && ((((*(double*)&pDBuf->v) < (*(double*)&pSBuf->v)) ^ isMinFunc) || !pDBuf->assign)) { if (pSBuf->assign && ((((*(double*)&pDBuf->v) < (*(double*)&pSBuf->v)) ^ isMinFunc) || !pDBuf->assign)) {
*(double*)&pDBuf->v = *(double*)&pSBuf->v; *(double*)&pDBuf->v = *(double*)&pSBuf->v;
replaceTupleData(&pDBuf->tuplePos, &pSBuf->tuplePos); replaceTupleData(&pDBuf->tuplePos, &pSBuf->tuplePos);
pDBuf->assign = true;
} }
} else { } else {
if (pSBuf->assign && (((pDBuf->v < pSBuf->v) ^ isMinFunc) || !pDBuf->assign)) { if (pSBuf->assign && (((pDBuf->v < pSBuf->v) ^ isMinFunc) || !pDBuf->assign)) {
pDBuf->v = pSBuf->v; pDBuf->v = pSBuf->v;
replaceTupleData(&pDBuf->tuplePos, &pSBuf->tuplePos); replaceTupleData(&pDBuf->tuplePos, &pSBuf->tuplePos);
pDBuf->assign = true;
} }
} }
pDResInfo->numOfRes = TMAX(pDResInfo->numOfRes, pSResInfo->numOfRes); pDResInfo->numOfRes = TMAX(pDResInfo->numOfRes, pSResInfo->numOfRes);
@ -1745,10 +1747,10 @@ int32_t stddevFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
if (IS_INTEGER_TYPE(type)) { if (IS_INTEGER_TYPE(type)) {
avg = pStddevRes->isum / ((double)pStddevRes->count); avg = pStddevRes->isum / ((double)pStddevRes->count);
pStddevRes->result = sqrt(pStddevRes->quadraticISum / ((double)pStddevRes->count) - avg * avg); pStddevRes->result = sqrt(fabs(pStddevRes->quadraticISum / ((double)pStddevRes->count) - avg * avg));
} else { } else {
avg = pStddevRes->dsum / ((double)pStddevRes->count); avg = pStddevRes->dsum / ((double)pStddevRes->count);
pStddevRes->result = sqrt(pStddevRes->quadraticDSum / ((double)pStddevRes->count) - avg * avg); pStddevRes->result = sqrt(fabs(pStddevRes->quadraticDSum / ((double)pStddevRes->count) - avg * avg));
} }
return functionFinalize(pCtx, pBlock); return functionFinalize(pCtx, pBlock);

View File

@ -464,6 +464,9 @@ static SNode* logicInterpFuncCopy(const SInterpFuncLogicNode* pSrc, SInterpFuncL
CLONE_NODE_LIST_FIELD(pFuncs); CLONE_NODE_LIST_FIELD(pFuncs);
COPY_OBJECT_FIELD(timeRange, sizeof(STimeWindow)); COPY_OBJECT_FIELD(timeRange, sizeof(STimeWindow));
COPY_SCALAR_FIELD(interval); COPY_SCALAR_FIELD(interval);
COPY_SCALAR_FIELD(fillMode);
CLONE_NODE_FIELD(pFillValues);
CLONE_NODE_FIELD(pTimeSeries);
return (SNode*)pDst; return (SNode*)pDst;
} }

View File

@ -2133,6 +2133,9 @@ static const char* jkInterpFuncPhysiPlanFuncs = "Funcs";
static const char* jkInterpFuncPhysiPlanStartTime = "StartTime"; static const char* jkInterpFuncPhysiPlanStartTime = "StartTime";
static const char* jkInterpFuncPhysiPlanEndTime = "EndTime"; static const char* jkInterpFuncPhysiPlanEndTime = "EndTime";
static const char* jkInterpFuncPhysiPlanInterval = "Interval"; static const char* jkInterpFuncPhysiPlanInterval = "Interval";
static const char* jkInterpFuncPhysiPlanFillMode = "FillMode";
static const char* jkInterpFuncPhysiPlanFillValues = "FillValues";
static const char* jkInterpFuncPhysiPlanTimeSeries = "TimeSeries";
static int32_t physiInterpFuncNodeToJson(const void* pObj, SJson* pJson) { static int32_t physiInterpFuncNodeToJson(const void* pObj, SJson* pJson) {
const SInterpFuncPhysiNode* pNode = (const SInterpFuncPhysiNode*)pObj; const SInterpFuncPhysiNode* pNode = (const SInterpFuncPhysiNode*)pObj;
@ -2153,6 +2156,15 @@ static int32_t physiInterpFuncNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkInterpFuncPhysiPlanInterval, pNode->interval); code = tjsonAddIntegerToObject(pJson, jkInterpFuncPhysiPlanInterval, pNode->interval);
} }
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkInterpFuncPhysiPlanFillMode, pNode->fillMode);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkInterpFuncPhysiPlanFillValues, nodeToJson, pNode->pFillValues);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkInterpFuncPhysiPlanTimeSeries, nodeToJson, pNode->pTimeSeries);
}
return code; return code;
} }
@ -2176,6 +2188,15 @@ static int32_t jsonToPhysiInterpFuncNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBigIntValue(pJson, jkInterpFuncPhysiPlanInterval, &pNode->interval); code = tjsonGetBigIntValue(pJson, jkInterpFuncPhysiPlanInterval, &pNode->interval);
} }
if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkInterpFuncPhysiPlanFillMode, pNode->fillMode, code);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkInterpFuncPhysiPlanFillValues, &pNode->pFillValues);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkInterpFuncPhysiPlanTimeSeries, &pNode->pTimeSeries);
}
return code; return code;
} }

View File

@ -700,8 +700,11 @@ SNode* addEveryClause(SAstCreateContext* pCxt, SNode* pStmt, SNode* pEvery) {
SNode* addFillClause(SAstCreateContext* pCxt, SNode* pStmt, SNode* pFill) { SNode* addFillClause(SAstCreateContext* pCxt, SNode* pStmt, SNode* pFill) {
CHECK_PARSER_STATUS(pCxt); CHECK_PARSER_STATUS(pCxt);
if (QUERY_NODE_SELECT_STMT == nodeType(pStmt)) { if (QUERY_NODE_SELECT_STMT == nodeType(pStmt) && NULL != pFill) {
((SSelectStmt*)pStmt)->pFill = pFill; SFillNode* pFillClause = (SFillNode*)pFill;
nodesDestroyNode(pFillClause->pWStartTs);
pFillClause->pWStartTs = createPrimaryKeyCol(pCxt);
((SSelectStmt*)pStmt)->pFill = (SNode*)pFillClause;
} }
return pStmt; return pStmt;
} }
@ -909,7 +912,7 @@ SNode* createDefaultTableOptions(SAstCreateContext* pCxt) {
pOptions->watermark1 = TSDB_DEFAULT_ROLLUP_WATERMARK; pOptions->watermark1 = TSDB_DEFAULT_ROLLUP_WATERMARK;
pOptions->watermark2 = TSDB_DEFAULT_ROLLUP_WATERMARK; pOptions->watermark2 = TSDB_DEFAULT_ROLLUP_WATERMARK;
pOptions->ttl = TSDB_DEFAULT_TABLE_TTL; pOptions->ttl = TSDB_DEFAULT_TABLE_TTL;
pOptions->commentNull = true; // mark null pOptions->commentNull = true; // mark null
return (SNode*)pOptions; return (SNode*)pOptions;
} }
@ -918,7 +921,7 @@ SNode* createAlterTableOptions(SAstCreateContext* pCxt) {
STableOptions* pOptions = (STableOptions*)nodesMakeNode(QUERY_NODE_TABLE_OPTIONS); STableOptions* pOptions = (STableOptions*)nodesMakeNode(QUERY_NODE_TABLE_OPTIONS);
CHECK_OUT_OF_MEM(pOptions); CHECK_OUT_OF_MEM(pOptions);
pOptions->ttl = -1; pOptions->ttl = -1;
pOptions->commentNull = true; // mark null pOptions->commentNull = true; // mark null
return (SNode*)pOptions; return (SNode*)pOptions;
} }
@ -940,9 +943,9 @@ SNode* setTableOption(SAstCreateContext* pCxt, SNode* pOptions, ETableOptionType
case TABLE_OPTION_ROLLUP: case TABLE_OPTION_ROLLUP:
((STableOptions*)pOptions)->pRollupFuncs = pVal; ((STableOptions*)pOptions)->pRollupFuncs = pVal;
break; break;
case TABLE_OPTION_TTL:{ case TABLE_OPTION_TTL: {
int64_t ttl = taosStr2Int64(((SToken*)pVal)->z, NULL, 10); int64_t ttl = taosStr2Int64(((SToken*)pVal)->z, NULL, 10);
if (ttl > INT32_MAX){ if (ttl > INT32_MAX) {
ttl = INT32_MAX; ttl = INT32_MAX;
} }
// ttl can not be smaller than 0, because there is a limitation in sql.y (TTL NK_INTEGER) // ttl can not be smaller than 0, because there is a limitation in sql.y (TTL NK_INTEGER)

View File

@ -1932,26 +1932,33 @@ static int32_t getFillTimeRange(STranslateContext* pCxt, SNode* pWhere, STimeWin
return code; return code;
} }
static int32_t checkFill(STranslateContext* pCxt, SIntervalWindowNode* pInterval) { static int32_t checkFill(STranslateContext* pCxt, SFillNode* pFill, SValueNode* pInterval) {
SFillNode* pFill = (SFillNode*)pInterval->pFill; if (FILL_MODE_NONE == pFill->mode) {
return TSDB_CODE_SUCCESS;
}
if (TSWINDOW_IS_EQUAL(pFill->timeRange, TSWINDOW_INITIALIZER) || if (TSWINDOW_IS_EQUAL(pFill->timeRange, TSWINDOW_INITIALIZER) ||
TSWINDOW_IS_EQUAL(pFill->timeRange, TSWINDOW_DESC_INITIALIZER)) { TSWINDOW_IS_EQUAL(pFill->timeRange, TSWINDOW_DESC_INITIALIZER)) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_FILL_TIME_RANGE); return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_FILL_TIME_RANGE);
} }
int64_t timeRange = TABS(pFill->timeRange.skey - pFill->timeRange.ekey); // interp FILL clause
int64_t intervalRange = 0; if (NULL == pInterval) {
SValueNode* pInter = (SValueNode*)pInterval->pInterval; return TSDB_CODE_SUCCESS;
if (TIME_IS_VAR_DURATION(pInter->unit)) { }
int64_t timeRange = TABS(pFill->timeRange.skey - pFill->timeRange.ekey);
int64_t intervalRange = 0;
if (TIME_IS_VAR_DURATION(pInterval->unit)) {
int64_t f = 1; int64_t f = 1;
if (pInter->unit == 'n') { if (pInterval->unit == 'n') {
f = 30L * MILLISECOND_PER_DAY; f = 30L * MILLISECOND_PER_DAY;
} else if (pInter->unit == 'y') { } else if (pInterval->unit == 'y') {
f = 365L * MILLISECOND_PER_DAY; f = 365L * MILLISECOND_PER_DAY;
} }
intervalRange = pInter->datum.i * f; intervalRange = pInterval->datum.i * f;
} else { } else {
intervalRange = pInter->datum.i; intervalRange = pInterval->datum.i;
} }
if ((timeRange == 0) || (timeRange / intervalRange) >= MAX_INTERVAL_TIME_WINDOW) { if ((timeRange == 0) || (timeRange / intervalRange) >= MAX_INTERVAL_TIME_WINDOW) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_FILL_TIME_RANGE); return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_FILL_TIME_RANGE);
@ -1967,7 +1974,7 @@ static int32_t translateFill(STranslateContext* pCxt, SNode* pWhere, SIntervalWi
int32_t code = getFillTimeRange(pCxt, pWhere, &(((SFillNode*)pInterval->pFill)->timeRange)); int32_t code = getFillTimeRange(pCxt, pWhere, &(((SFillNode*)pInterval->pFill)->timeRange));
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = checkFill(pCxt, pInterval); code = checkFill(pCxt, (SFillNode*)pInterval->pFill, (SValueNode*)pInterval->pInterval);
} }
return code; return code;
} }
@ -2109,6 +2116,64 @@ static int32_t translateWindow(STranslateContext* pCxt, SSelectStmt* pSelect) {
return code; return code;
} }
static int32_t createDefaultFillNode(STranslateContext* pCxt, SNode** pOutput) {
SFillNode* pFill = (SFillNode*)nodesMakeNode(QUERY_NODE_FILL);
if (NULL == pFill) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pFill->mode = FILL_MODE_NONE;
SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
if (NULL == pCol) {
nodesDestroyNode((SNode*)pFill);
return TSDB_CODE_OUT_OF_MEMORY;
}
pCol->colId = PRIMARYKEY_TIMESTAMP_COL_ID;
strcpy(pCol->colName, PK_TS_COL_INTERNAL_NAME);
pFill->pWStartTs = (SNode*)pCol;
*pOutput = (SNode*)pFill;
return TSDB_CODE_SUCCESS;
}
static int32_t translateInterpFill(STranslateContext* pCxt, SSelectStmt* pSelect) {
int32_t code = TSDB_CODE_SUCCESS;
if (NULL == pSelect->pFill) {
code = createDefaultFillNode(pCxt, &pSelect->pFill);
}
if (TSDB_CODE_SUCCESS == code) {
code = translateExpr(pCxt, &pSelect->pFill);
}
if (TSDB_CODE_SUCCESS == code) {
code = getFillTimeRange(pCxt, pSelect->pRange, &(((SFillNode*)pSelect->pFill)->timeRange));
}
if (TSDB_CODE_SUCCESS == code) {
code = checkFill(pCxt, (SFillNode*)pSelect->pFill, (SValueNode*)pSelect->pEvery);
}
return code;
}
static int32_t translateInterp(STranslateContext* pCxt, SSelectStmt* pSelect) {
if (!pSelect->hasInterpFunc) {
if (NULL != pSelect->pRange || NULL != pSelect->pEvery || NULL != pSelect->pFill) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_INTERP_CLAUSE);
}
return TSDB_CODE_SUCCESS;
}
int32_t code = translateExpr(pCxt, &pSelect->pRange);
if (TSDB_CODE_SUCCESS == code) {
code = translateExpr(pCxt, &pSelect->pEvery);
}
if (TSDB_CODE_SUCCESS == code) {
code = translateInterpFill(pCxt, pSelect);
}
return code;
}
static int32_t translatePartitionBy(STranslateContext* pCxt, SNodeList* pPartitionByList) { static int32_t translatePartitionBy(STranslateContext* pCxt, SNodeList* pPartitionByList) {
pCxt->currClause = SQL_CLAUSE_PARTITION_BY; pCxt->currClause = SQL_CLAUSE_PARTITION_BY;
return translateExprList(pCxt, pPartitionByList); return translateExprList(pCxt, pPartitionByList);
@ -2378,6 +2443,9 @@ static int32_t translateSelect(STranslateContext* pCxt, SSelectStmt* pSelect) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = checkLimit(pCxt, pSelect); code = checkLimit(pCxt, pSelect);
} }
if (TSDB_CODE_SUCCESS == code) {
code = translateInterp(pCxt, pSelect);
}
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = rewriteUniqueStmt(pCxt, pSelect); code = rewriteUniqueStmt(pCxt, pSelect);
} }
@ -3388,18 +3456,18 @@ static int32_t buildCreateStbReq(STranslateContext* pCxt, SCreateTableStmt* pStm
pReq->delay2 = pStmt->pOptions->maxDelay2; pReq->delay2 = pStmt->pOptions->maxDelay2;
pReq->watermark1 = pStmt->pOptions->watermark1; pReq->watermark1 = pStmt->pOptions->watermark1;
pReq->watermark2 = pStmt->pOptions->watermark2; pReq->watermark2 = pStmt->pOptions->watermark2;
// pReq->ttl = pStmt->pOptions->ttl; // pReq->ttl = pStmt->pOptions->ttl;
columnDefNodeToField(pStmt->pCols, &pReq->pColumns); columnDefNodeToField(pStmt->pCols, &pReq->pColumns);
columnDefNodeToField(pStmt->pTags, &pReq->pTags); columnDefNodeToField(pStmt->pTags, &pReq->pTags);
pReq->numOfColumns = LIST_LENGTH(pStmt->pCols); pReq->numOfColumns = LIST_LENGTH(pStmt->pCols);
pReq->numOfTags = LIST_LENGTH(pStmt->pTags); pReq->numOfTags = LIST_LENGTH(pStmt->pTags);
if(pStmt->pOptions->commentNull == false){ if (pStmt->pOptions->commentNull == false) {
pReq->comment = strdup(pStmt->pOptions->comment); pReq->comment = strdup(pStmt->pOptions->comment);
if (NULL == pReq->comment) { if (NULL == pReq->comment) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
pReq->commentLen = strlen(pStmt->pOptions->comment); pReq->commentLen = strlen(pStmt->pOptions->comment);
}else{ } else {
pReq->commentLen = -1; pReq->commentLen = -1;
} }
@ -3452,14 +3520,14 @@ static int32_t buildAlterSuperTableReq(STranslateContext* pCxt, SAlterTableStmt*
pAlterReq->alterType = pStmt->alterType; pAlterReq->alterType = pStmt->alterType;
if (TSDB_ALTER_TABLE_UPDATE_OPTIONS == pStmt->alterType) { if (TSDB_ALTER_TABLE_UPDATE_OPTIONS == pStmt->alterType) {
// pAlterReq->ttl = pStmt->pOptions->ttl; // pAlterReq->ttl = pStmt->pOptions->ttl;
if (pStmt->pOptions->commentNull == false) { if (pStmt->pOptions->commentNull == false) {
pAlterReq->comment = strdup(pStmt->pOptions->comment); pAlterReq->comment = strdup(pStmt->pOptions->comment);
if (NULL == pAlterReq->comment) { if (NULL == pAlterReq->comment) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
pAlterReq->commentLen = strlen(pStmt->pOptions->comment); pAlterReq->commentLen = strlen(pStmt->pOptions->comment);
}else{ } else {
pAlterReq->commentLen = -1; pAlterReq->commentLen = -1;
} }
@ -4725,7 +4793,7 @@ static int32_t buildNormalTableBatchReq(int32_t acctId, const SCreateTableStmt*
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
req.commentLen = strlen(pStmt->pOptions->comment); req.commentLen = strlen(pStmt->pOptions->comment);
}else{ } else {
req.commentLen = -1; req.commentLen = -1;
} }
req.ntb.schemaRow.nCols = LIST_LENGTH(pStmt->pCols); req.ntb.schemaRow.nCols = LIST_LENGTH(pStmt->pCols);
@ -4878,11 +4946,11 @@ static void addCreateTbReqIntoVgroup(int32_t acctId, SHashObj* pVgroupHashmap, S
struct SVCreateTbReq req = {0}; struct SVCreateTbReq req = {0};
req.type = TD_CHILD_TABLE; req.type = TD_CHILD_TABLE;
req.name = strdup(pStmt->tableName); req.name = strdup(pStmt->tableName);
req.ttl = pStmt->pOptions->ttl; req.ttl = pStmt->pOptions->ttl;
if (pStmt->pOptions->commentNull == false) { if (pStmt->pOptions->commentNull == false) {
req.comment = strdup(pStmt->pOptions->comment); req.comment = strdup(pStmt->pOptions->comment);
req.commentLen = strlen(pStmt->pOptions->comment); req.commentLen = strlen(pStmt->pOptions->comment);
} else{ } else {
req.commentLen = -1; req.commentLen = -1;
} }
req.ctb.suid = suid; req.ctb.suid = suid;
@ -5460,15 +5528,14 @@ static int32_t buildUpdateOptionsReq(STranslateContext* pCxt, SAlterTableStmt* p
pReq->newTTL = pStmt->pOptions->ttl; pReq->newTTL = pStmt->pOptions->ttl;
} }
if (TSDB_CODE_SUCCESS == code){ if (TSDB_CODE_SUCCESS == code) {
if(pStmt->pOptions->commentNull == false) { if (pStmt->pOptions->commentNull == false) {
pReq->newComment = strdup(pStmt->pOptions->comment); pReq->newComment = strdup(pStmt->pOptions->comment);
if (NULL == pReq->newComment) { if (NULL == pReq->newComment) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
} }
pReq->newCommentLen = strlen(pReq->newComment); pReq->newCommentLen = strlen(pReq->newComment);
} } else {
else{
pReq->newCommentLen = -1; pReq->newCommentLen = -1;
} }
} }

View File

@ -196,6 +196,8 @@ static char* getSyntaxErrFormat(int32_t errCode) {
return "%s function does not supportted in group query"; return "%s function does not supportted in group query";
case TSDB_CODE_PAR_INVALID_TABLE_OPTION: case TSDB_CODE_PAR_INVALID_TABLE_OPTION:
return "Invalid option %s"; return "Invalid option %s";
case TSDB_CODE_PAR_INVALID_INTERP_CLAUSE:
return "Invalid usage of RANGE clause, EVERY clause or FILL clause";
case TSDB_CODE_OUT_OF_MEMORY: case TSDB_CODE_OUT_OF_MEMORY:
return "Out of memory"; return "Out of memory";
default: default:

View File

@ -267,8 +267,6 @@ TEST_F(ParserSelectTest, interp) {
run("SELECT INTERP(c1) FROM t1 EVERY(5s)"); run("SELECT INTERP(c1) FROM t1 EVERY(5s)");
run("SELECT INTERP(c1) FROM t1 EVERY(5s) FILL(LINEAR)");
run("SELECT INTERP(c1) FROM t1 RANGE('2017-7-14 18:00:00', '2017-7-14 19:00:00') EVERY(5s)"); run("SELECT INTERP(c1) FROM t1 RANGE('2017-7-14 18:00:00', '2017-7-14 19:00:00') EVERY(5s)");
run("SELECT INTERP(c1) FROM t1 RANGE('2017-7-14 18:00:00', '2017-7-14 19:00:00') EVERY(5s) FILL(LINEAR)"); run("SELECT INTERP(c1) FROM t1 RANGE('2017-7-14 18:00:00', '2017-7-14 19:00:00') EVERY(5s) FILL(LINEAR)");

View File

@ -508,10 +508,15 @@ static int32_t createInterpFuncLogicNode(SLogicPlanContext* pCxt, SSelectStmt* p
code = rewriteExprsForSelect(pInterpFunc->pFuncs, pSelect, SQL_CLAUSE_SELECT); code = rewriteExprsForSelect(pInterpFunc->pFuncs, pSelect, SQL_CLAUSE_SELECT);
} }
if (TSDB_CODE_SUCCESS == code && NULL != pSelect->pRange) { if (TSDB_CODE_SUCCESS == code && NULL != pSelect->pFill) {
// SRangeNode* pRange = (SRangeNode*)pSelect->pRange; SFillNode* pFill = (SFillNode*)pSelect->pFill;
// pInterpFunc->timeRange.skey = ((SValueNode*)pRange->pStart)->datum.i; pInterpFunc->timeRange = pFill->timeRange;
// pInterpFunc->timeRange.ekey = ((SValueNode*)pRange->pEnd)->datum.i; pInterpFunc->fillMode = pFill->mode;
pInterpFunc->pTimeSeries = nodesCloneNode(pFill->pWStartTs);
pInterpFunc->pFillValues = nodesCloneNode(pFill->pValues);
if (NULL == pInterpFunc->pTimeSeries || (NULL != pFill->pValues && NULL == pInterpFunc->pFillValues)) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
} }
if (TSDB_CODE_SUCCESS == code && NULL != pSelect->pEvery) { if (TSDB_CODE_SUCCESS == code && NULL != pSelect->pEvery) {

View File

@ -877,6 +877,12 @@ static int32_t createInterpFuncPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pCh
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
pInterpFunc->timeRange = pFuncLogicNode->timeRange; pInterpFunc->timeRange = pFuncLogicNode->timeRange;
pInterpFunc->interval = pFuncLogicNode->interval; pInterpFunc->interval = pFuncLogicNode->interval;
pInterpFunc->fillMode = pFuncLogicNode->fillMode;
pInterpFunc->pFillValues = nodesCloneNode(pFuncLogicNode->pFillValues);
pInterpFunc->pTimeSeries = nodesCloneNode(pFuncLogicNode->pTimeSeries);
if (NULL == pInterpFunc->pTimeSeries || (NULL != pFuncLogicNode->pFillValues && NULL == pInterpFunc->pFillValues)) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {

View File

@ -238,6 +238,9 @@ int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg);
bool syncNodeInRaftGroup(SSyncNode* ths, SRaftId* pRaftId); bool syncNodeInRaftGroup(SSyncNode* ths, SRaftId* pRaftId);
SSyncSnapshotSender* syncNodeGetSnapshotSender(SSyncNode* ths, SRaftId* pDestId); SSyncSnapshotSender* syncNodeGetSnapshotSender(SSyncNode* ths, SRaftId* pDestId);
int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta);
int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct SSnapshotMeta* sMeta);
void syncStartNormal(int64_t rid); void syncStartNormal(int64_t rid);
void syncStartStandBy(int64_t rid); void syncStartStandBy(int64_t rid);

View File

@ -497,7 +497,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
do { do {
SyncIndex myLastIndex = syncNodeGetLastIndex(ths); SyncIndex myLastIndex = syncNodeGetLastIndex(ths);
SSnapshot snapshot; SSnapshot snapshot;
ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot); ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot);
bool condition0 = (pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) && bool condition0 = (pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) &&
syncNodeHasSnapshot(ths); syncNodeHasSnapshot(ths);
@ -710,7 +710,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
if (pMsg->commitIndex <= ths->pLogStore->syncLogLastIndex(ths->pLogStore)) { if (pMsg->commitIndex <= ths->pLogStore->syncLogLastIndex(ths->pLogStore)) {
// advance commit index to sanpshot first // advance commit index to sanpshot first
SSnapshot snapshot; SSnapshot snapshot;
ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot); ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot);
if (snapshot.lastApplyIndex >= 0 && snapshot.lastApplyIndex > ths->commitIndex) { if (snapshot.lastApplyIndex >= 0 && snapshot.lastApplyIndex > ths->commitIndex) {
SyncIndex commitBegin = ths->commitIndex; SyncIndex commitBegin = ths->commitIndex;
SyncIndex commitEnd = snapshot.lastApplyIndex; SyncIndex commitEnd = snapshot.lastApplyIndex;

View File

@ -123,7 +123,7 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
syncIndexMgrLog2("recv SyncAppendEntriesReply, before pMatchIndex:", ths->pMatchIndex); syncIndexMgrLog2("recv SyncAppendEntriesReply, before pMatchIndex:", ths->pMatchIndex);
if (gRaftDetailLog) { if (gRaftDetailLog) {
SSnapshot snapshot; SSnapshot snapshot;
ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot); ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot);
sTrace("recv SyncAppendEntriesReply, before snapshot.lastApplyIndex:%ld, snapshot.lastApplyTerm:%lu", sTrace("recv SyncAppendEntriesReply, before snapshot.lastApplyIndex:%ld, snapshot.lastApplyTerm:%lu",
snapshot.lastApplyIndex, snapshot.lastApplyTerm); snapshot.lastApplyIndex, snapshot.lastApplyTerm);
} }
@ -175,7 +175,7 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
ASSERT(pSender != NULL); ASSERT(pSender != NULL);
bool hasSnapshot = syncNodeHasSnapshot(ths); bool hasSnapshot = syncNodeHasSnapshot(ths);
SSnapshot snapshot; SSnapshot snapshot;
ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot); ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot);
// start sending snapshot first time // start sending snapshot first time
// start here, stop by receiver // start here, stop by receiver
@ -209,7 +209,7 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
syncIndexMgrLog2("recv SyncAppendEntriesReply, after pMatchIndex:", ths->pMatchIndex); syncIndexMgrLog2("recv SyncAppendEntriesReply, after pMatchIndex:", ths->pMatchIndex);
if (gRaftDetailLog) { if (gRaftDetailLog) {
SSnapshot snapshot; SSnapshot snapshot;
ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot); ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot);
sTrace("recv SyncAppendEntriesReply, after snapshot.lastApplyIndex:%ld, snapshot.lastApplyTerm:%lu", sTrace("recv SyncAppendEntriesReply, after snapshot.lastApplyIndex:%ld, snapshot.lastApplyTerm:%lu",
snapshot.lastApplyIndex, snapshot.lastApplyTerm); snapshot.lastApplyIndex, snapshot.lastApplyTerm);
} }

View File

@ -50,7 +50,7 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
// advance commit index to sanpshot first // advance commit index to sanpshot first
SSnapshot snapshot; SSnapshot snapshot;
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot); pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
if (snapshot.lastApplyIndex > 0 && snapshot.lastApplyIndex > pSyncNode->commitIndex) { if (snapshot.lastApplyIndex > 0 && snapshot.lastApplyIndex > pSyncNode->commitIndex) {
SyncIndex commitBegin = pSyncNode->commitIndex; SyncIndex commitBegin = pSyncNode->commitIndex;
SyncIndex commitEnd = snapshot.lastApplyIndex; SyncIndex commitEnd = snapshot.lastApplyIndex;

View File

@ -809,9 +809,9 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
pSyncNode->restoreFinish = false; pSyncNode->restoreFinish = false;
// pSyncNode->pSnapshot = NULL; // pSyncNode->pSnapshot = NULL;
// if (pSyncNode->pFsm->FpGetSnapshot != NULL) { // if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
// pSyncNode->pSnapshot = taosMemoryMalloc(sizeof(SSnapshot)); // pSyncNode->pSnapshot = taosMemoryMalloc(sizeof(SSnapshot));
// pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, pSyncNode->pSnapshot); // pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, pSyncNode->pSnapshot);
// } // }
// tsem_init(&(pSyncNode->restoreSem), 0, 0); // tsem_init(&(pSyncNode->restoreSem), 0, 0);
@ -1658,8 +1658,8 @@ void syncNodeVoteForSelf(SSyncNode* pSyncNode) {
bool syncNodeHasSnapshot(SSyncNode* pSyncNode) { bool syncNodeHasSnapshot(SSyncNode* pSyncNode) {
bool ret = false; bool ret = false;
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0}; SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
if (pSyncNode->pFsm->FpGetSnapshot != NULL) { if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot); pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
if (snapshot.lastApplyIndex >= SYNC_INDEX_BEGIN) { if (snapshot.lastApplyIndex >= SYNC_INDEX_BEGIN) {
ret = true; ret = true;
} }
@ -1669,19 +1669,19 @@ bool syncNodeHasSnapshot(SSyncNode* pSyncNode) {
bool syncNodeIsIndexInSnapshot(SSyncNode* pSyncNode, SyncIndex index) { bool syncNodeIsIndexInSnapshot(SSyncNode* pSyncNode, SyncIndex index) {
ASSERT(syncNodeHasSnapshot(pSyncNode)); ASSERT(syncNodeHasSnapshot(pSyncNode));
ASSERT(pSyncNode->pFsm->FpGetSnapshot != NULL); ASSERT(pSyncNode->pFsm->FpGetSnapshotInfo != NULL);
ASSERT(index >= SYNC_INDEX_BEGIN); ASSERT(index >= SYNC_INDEX_BEGIN);
SSnapshot snapshot; SSnapshot snapshot;
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot); pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
bool b = (index <= snapshot.lastApplyIndex); bool b = (index <= snapshot.lastApplyIndex);
return b; return b;
} }
SyncIndex syncNodeGetLastIndex(SSyncNode* pSyncNode) { SyncIndex syncNodeGetLastIndex(SSyncNode* pSyncNode) {
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0}; SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
if (pSyncNode->pFsm->FpGetSnapshot != NULL) { if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot); pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
} }
SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore); SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
@ -1694,8 +1694,8 @@ SyncTerm syncNodeGetLastTerm(SSyncNode* pSyncNode) {
if (syncNodeHasSnapshot(pSyncNode)) { if (syncNodeHasSnapshot(pSyncNode)) {
// has snapshot // has snapshot
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0}; SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
if (pSyncNode->pFsm->FpGetSnapshot != NULL) { if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot); pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
} }
SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore); SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
@ -1747,8 +1747,8 @@ SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) {
if (syncNodeHasSnapshot(pSyncNode)) { if (syncNodeHasSnapshot(pSyncNode)) {
// has snapshot // has snapshot
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0}; SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
if (pSyncNode->pFsm->FpGetSnapshot != NULL) { if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot); pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
} }
if (index > snapshot.lastApplyIndex + 1) { if (index > snapshot.lastApplyIndex + 1) {

View File

@ -124,7 +124,7 @@ int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode) {
logStoreSimpleLog2("begin append entries peers LogStore:", pSyncNode->pLogStore); logStoreSimpleLog2("begin append entries peers LogStore:", pSyncNode->pLogStore);
if (gRaftDetailLog) { if (gRaftDetailLog) {
SSnapshot snapshot; SSnapshot snapshot;
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot); pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
sTrace("begin append entries peers, snapshot.lastApplyIndex:%ld, snapshot.lastApplyTerm:%lu", sTrace("begin append entries peers, snapshot.lastApplyIndex:%ld, snapshot.lastApplyTerm:%lu",
snapshot.lastApplyIndex, snapshot.lastApplyTerm); snapshot.lastApplyIndex, snapshot.lastApplyTerm);
} }

View File

@ -45,7 +45,7 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI
pSender->replicaIndex = replicaIndex; pSender->replicaIndex = replicaIndex;
pSender->term = pSyncNode->pRaftStore->currentTerm; pSender->term = pSyncNode->pRaftStore->currentTerm;
pSender->privateTerm = taosGetTimestampMs() + 100; pSender->privateTerm = taosGetTimestampMs() + 100;
pSender->pSyncNode->pFsm->FpGetSnapshot(pSender->pSyncNode->pFsm, &(pSender->snapshot)); pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &(pSender->snapshot));
pSender->finish = false; pSender->finish = false;
} else { } else {
sError("snapshotSenderCreate cannot create sender"); sError("snapshotSenderCreate cannot create sender");
@ -84,7 +84,7 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender) {
pSender->blockLen = 0; pSender->blockLen = 0;
// get current snapshot info // get current snapshot info
pSender->pSyncNode->pFsm->FpGetSnapshot(pSender->pSyncNode->pFsm, &(pSender->snapshot)); pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &(pSender->snapshot));
sTrace("snapshotSenderStart lastApplyIndex:%ld, lastApplyTerm:%lu, lastConfigIndex:%ld", sTrace("snapshotSenderStart lastApplyIndex:%ld, lastApplyTerm:%lu, lastConfigIndex:%ld",
pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex); pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex);
@ -558,7 +558,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
} }
SSnapshot snapshot; SSnapshot snapshot;
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot); pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
do { do {
char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver finish"); char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver finish");

View File

@ -36,9 +36,9 @@ void cleanup() { walCleanUp(); }
void CommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) { void CommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) {
SyncIndex beginIndex = SYNC_INDEX_INVALID; SyncIndex beginIndex = SYNC_INDEX_INVALID;
if (pFsm->FpGetSnapshot != NULL) { if (pFsm->FpGetSnapshotInfo != NULL) {
SSnapshot snapshot; SSnapshot snapshot;
pFsm->FpGetSnapshot(pFsm, &snapshot); pFsm->FpGetSnapshotInfo(pFsm, &snapshot);
beginIndex = snapshot.lastApplyIndex; beginIndex = snapshot.lastApplyIndex;
} }
@ -159,7 +159,7 @@ SSyncFSM* createFsm() {
pFsm->FpPreCommitCb = PreCommitCb; pFsm->FpPreCommitCb = PreCommitCb;
pFsm->FpRollBackCb = RollBackCb; pFsm->FpRollBackCb = RollBackCb;
pFsm->FpGetSnapshot = GetSnapshotCb; pFsm->FpGetSnapshotInfo = GetSnapshotCb;
pFsm->FpRestoreFinishCb = RestoreFinishCb; pFsm->FpRestoreFinishCb = RestoreFinishCb;
pFsm->FpSnapshotStartRead = SnapshotStartRead; pFsm->FpSnapshotStartRead = SnapshotStartRead;
pFsm->FpSnapshotStopRead = SnapshotStopRead; pFsm->FpSnapshotStopRead = SnapshotStopRead;

View File

@ -35,9 +35,9 @@ void cleanup() { walCleanUp(); }
void CommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) { void CommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) {
SyncIndex beginIndex = SYNC_INDEX_INVALID; SyncIndex beginIndex = SYNC_INDEX_INVALID;
if (pFsm->FpGetSnapshot != NULL) { if (pFsm->FpGetSnapshotInfo != NULL) {
SSnapshot snapshot; SSnapshot snapshot;
pFsm->FpGetSnapshot(pFsm, &snapshot); pFsm->FpGetSnapshotInfo(pFsm, &snapshot);
beginIndex = snapshot.lastApplyIndex; beginIndex = snapshot.lastApplyIndex;
} }
@ -90,7 +90,7 @@ SSyncFSM* createFsm() {
pFsm->FpPreCommitCb = PreCommitCb; pFsm->FpPreCommitCb = PreCommitCb;
pFsm->FpRollBackCb = RollBackCb; pFsm->FpRollBackCb = RollBackCb;
pFsm->FpGetSnapshot = GetSnapshotCb; pFsm->FpGetSnapshotInfo = GetSnapshotCb;
pFsm->FpRestoreFinishCb = RestoreFinishCb; pFsm->FpRestoreFinishCb = RestoreFinishCb;
pFsm->FpReConfigCb = ReConfigCb; pFsm->FpReConfigCb = ReConfigCb;

View File

@ -54,7 +54,7 @@ void init() {
pSyncNode->pWal = pWal; pSyncNode->pWal = pWal;
pSyncNode->pFsm = (SSyncFSM*)taosMemoryMalloc(sizeof(SSyncFSM)); pSyncNode->pFsm = (SSyncFSM*)taosMemoryMalloc(sizeof(SSyncFSM));
pSyncNode->pFsm->FpGetSnapshot = GetSnapshotCb; pSyncNode->pFsm->FpGetSnapshotInfo = GetSnapshotCb;
} }
void cleanup() { void cleanup() {

View File

@ -54,7 +54,7 @@ void init() {
pSyncNode->pWal = pWal; pSyncNode->pWal = pWal;
pSyncNode->pFsm = (SSyncFSM*)taosMemoryMalloc(sizeof(SSyncFSM)); pSyncNode->pFsm = (SSyncFSM*)taosMemoryMalloc(sizeof(SSyncFSM));
pSyncNode->pFsm->FpGetSnapshot = GetSnapshotCb; pSyncNode->pFsm->FpGetSnapshotInfo = GetSnapshotCb;
} }
void cleanup() { void cleanup() {
@ -80,7 +80,7 @@ void test1() {
bool hasSnapshot = syncNodeHasSnapshot(pSyncNode); bool hasSnapshot = syncNodeHasSnapshot(pSyncNode);
SSnapshot snapshot; SSnapshot snapshot;
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot); pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode); SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode);
SyncTerm lastTerm = syncNodeGetLastTerm(pSyncNode); SyncTerm lastTerm = syncNodeGetLastTerm(pSyncNode);
@ -146,7 +146,7 @@ void test2() {
bool hasSnapshot = syncNodeHasSnapshot(pSyncNode); bool hasSnapshot = syncNodeHasSnapshot(pSyncNode);
SSnapshot snapshot; SSnapshot snapshot;
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot); pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode); SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode);
SyncTerm lastTerm = syncNodeGetLastTerm(pSyncNode); SyncTerm lastTerm = syncNodeGetLastTerm(pSyncNode);
@ -203,7 +203,7 @@ void test3() {
bool hasSnapshot = syncNodeHasSnapshot(pSyncNode); bool hasSnapshot = syncNodeHasSnapshot(pSyncNode);
SSnapshot snapshot; SSnapshot snapshot;
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot); pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode); SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode);
SyncTerm lastTerm = syncNodeGetLastTerm(pSyncNode); SyncTerm lastTerm = syncNodeGetLastTerm(pSyncNode);
@ -268,7 +268,7 @@ void test4() {
bool hasSnapshot = syncNodeHasSnapshot(pSyncNode); bool hasSnapshot = syncNodeHasSnapshot(pSyncNode);
SSnapshot snapshot; SSnapshot snapshot;
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot); pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode); SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode);
SyncTerm lastTerm = syncNodeGetLastTerm(pSyncNode); SyncTerm lastTerm = syncNodeGetLastTerm(pSyncNode);
@ -335,7 +335,7 @@ void test5() {
bool hasSnapshot = syncNodeHasSnapshot(pSyncNode); bool hasSnapshot = syncNodeHasSnapshot(pSyncNode);
SSnapshot snapshot; SSnapshot snapshot;
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot); pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode); SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode);
SyncTerm lastTerm = syncNodeGetLastTerm(pSyncNode); SyncTerm lastTerm = syncNodeGetLastTerm(pSyncNode);

View File

@ -32,9 +32,9 @@ void cleanup() { walCleanUp(); }
void CommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) { void CommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) {
SyncIndex beginIndex = SYNC_INDEX_INVALID; SyncIndex beginIndex = SYNC_INDEX_INVALID;
if (pFsm->FpGetSnapshot != NULL) { if (pFsm->FpGetSnapshotInfo != NULL) {
SSnapshot snapshot; SSnapshot snapshot;
pFsm->FpGetSnapshot(pFsm, &snapshot); pFsm->FpGetSnapshotInfo(pFsm, &snapshot);
beginIndex = snapshot.lastApplyIndex; beginIndex = snapshot.lastApplyIndex;
} }
@ -75,7 +75,7 @@ SSyncFSM* createFsm() {
pFsm->FpCommitCb = CommitCb; pFsm->FpCommitCb = CommitCb;
pFsm->FpPreCommitCb = PreCommitCb; pFsm->FpPreCommitCb = PreCommitCb;
pFsm->FpRollBackCb = RollBackCb; pFsm->FpRollBackCb = RollBackCb;
pFsm->FpGetSnapshot = GetSnapshotCb; pFsm->FpGetSnapshotInfo = GetSnapshotCb;
return pFsm; return pFsm;
} }

View File

@ -40,7 +40,7 @@ SSyncSnapshotSender* createSender() {
pSyncNode->pFsm->FpSnapshotStartRead = SnapshotStartRead; pSyncNode->pFsm->FpSnapshotStartRead = SnapshotStartRead;
pSyncNode->pFsm->FpSnapshotStopRead = SnapshotStopRead; pSyncNode->pFsm->FpSnapshotStopRead = SnapshotStopRead;
pSyncNode->pFsm->FpSnapshotDoRead = SnapshotDoRead; pSyncNode->pFsm->FpSnapshotDoRead = SnapshotDoRead;
pSyncNode->pFsm->FpGetSnapshot = GetSnapshot; pSyncNode->pFsm->FpGetSnapshotInfo = GetSnapshot;
SSyncSnapshotSender* pSender = snapshotSenderCreate(pSyncNode, 2); SSyncSnapshotSender* pSender = snapshotSenderCreate(pSyncNode, 2);
pSender->start = true; pSender->start = true;

View File

@ -35,9 +35,9 @@ const char *pWalDir = "./syncSnapshotTest_wal";
void CommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { void CommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
SyncIndex beginIndex = SYNC_INDEX_INVALID; SyncIndex beginIndex = SYNC_INDEX_INVALID;
if (pFsm->FpGetSnapshot != NULL) { if (pFsm->FpGetSnapshotInfo != NULL) {
SSnapshot snapshot; SSnapshot snapshot;
pFsm->FpGetSnapshot(pFsm, &snapshot); pFsm->FpGetSnapshotInfo(pFsm, &snapshot);
beginIndex = snapshot.lastApplyIndex; beginIndex = snapshot.lastApplyIndex;
} }
@ -79,7 +79,7 @@ void initFsm() {
pFsm->FpCommitCb = CommitCb; pFsm->FpCommitCb = CommitCb;
pFsm->FpPreCommitCb = PreCommitCb; pFsm->FpPreCommitCb = PreCommitCb;
pFsm->FpRollBackCb = RollBackCb; pFsm->FpRollBackCb = RollBackCb;
pFsm->FpGetSnapshot = GetSnapshotCb; pFsm->FpGetSnapshotInfo = GetSnapshotCb;
} }
SSyncNode *syncNodeInit() { SSyncNode *syncNodeInit() {

View File

@ -172,7 +172,7 @@ SSyncFSM* createFsm() {
pFsm->FpRollBackCb = RollBackCb; pFsm->FpRollBackCb = RollBackCb;
pFsm->FpReConfigCb = ReConfigCb; pFsm->FpReConfigCb = ReConfigCb;
pFsm->FpGetSnapshot = GetSnapshotCb; pFsm->FpGetSnapshotInfo = GetSnapshotCb;
pFsm->FpRestoreFinishCb = RestoreFinishCb; pFsm->FpRestoreFinishCb = RestoreFinishCb;
pFsm->FpSnapshotStartRead = SnapshotStartRead; pFsm->FpSnapshotStartRead = SnapshotStartRead;

View File

@ -11,13 +11,20 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
from collections import defaultdict
import random import random
import string import string
from util.sql import tdSql
from util.dnodes import tdDnodes
import requests import requests
import time import time
import socket import socket
import taos
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
from util.common import *
class TDCom: class TDCom:
def init(self, conn, logSql): def init(self, conn, logSql):
tdSql.init(conn.cursor(), logSql) tdSql.init(conn.cursor(), logSql)
@ -170,4 +177,122 @@ class TDCom:
def close(self): def close(self):
self.cursor.close() self.cursor.close()
def create_database(self,tsql, dbName='test',dropFlag=1,precision="ms", **kwargs):
if dropFlag == 1:
tsql.execute("drop database if exists %s"%(dbName))
'''
vgroups replica precision strict wal fsync comp cachelast single_stable buffer pagesize pages minrows maxrows duration keep retentions
'''
sqlString = f'create database if not exists {dbName} precision "{precision}" vgroups 4'
if len(kwargs) > 0:
dbParams = ""
for param, value in kwargs.items():
dbParams += f'{param} {value} '
sqlString += f'{dbParams}'
tsql.execute(sqlString)
tdLog.debug("complete to create database %s"%(dbName))
return
def create_stable(self,tsql, dbName,stbName,columnDict,tagDict):
colSchema = ''
for i in range(columnDict['int']):
colSchema += ', c%d int'%i
tagSchema = ''
for i in range(tagDict['int']):
if i > 0:
tagSchema += ','
tagSchema += 't%d int'%i
tsql.execute("create table if not exists %s.%s (ts timestamp %s) tags(%s)"%(dbName, stbName, colSchema, tagSchema))
tdLog.debug("complete to create %s.%s" %(dbName, stbName))
return
def create_ctables(self,tsql, dbName,stbName,ctbNum,tagDict):
tsql.execute("use %s" %dbName)
tagsValues = ''
for i in range(tagDict['int']):
if i > 0:
tagsValues += ','
tagsValues += '%d'%i
pre_create = "create table"
sql = pre_create
#tdLog.debug("doing create one stable %s and %d child table in %s ..." %(stbname, count ,dbname))
for i in range(ctbNum):
sql += " %s_%d using %s tags(%s)"%(stbName,i,stbName,tagsValues)
if (i > 0) and (i%100 == 0):
tsql.execute(sql)
sql = pre_create
if sql != pre_create:
tsql.execute(sql)
tdLog.debug("complete to create %d child tables in %s.%s" %(ctbNum, dbName, stbName))
return
def insert_data(self,tsql,dbName,stbName,ctbNum,rowsPerTbl,batchNum,startTs=0):
tdLog.debug("start to insert data ............")
tsql.execute("use %s" %dbName)
pre_insert = "insert into "
sql = pre_insert
if startTs == 0:
t = time.time()
startTs = int(round(t * 1000))
#tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows))
for i in range(ctbNum):
sql += " %s_%d values "%(stbName,i)
for j in range(rowsPerTbl):
sql += "(%d, %d, %d)"%(startTs + j, j, j)
if (j > 0) and ((j%batchNum == 0) or (j == rowsPerTbl - 1)):
tsql.execute(sql)
if j < rowsPerTbl - 1:
sql = "insert into %s_%d values " %(stbName,i)
else:
sql = "insert into "
#end sql
if sql != pre_insert:
#print("insert sql:%s"%sql)
tsql.execute(sql)
tdLog.debug("insert data ............ [OK]")
return
def getBuildPath(self):
selfPath = os.path.dirname(os.path.realpath(__file__))
if ("community" in selfPath):
projPath = selfPath[:selfPath.find("community")]
else:
projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath):
if ("taosd" in files or "taosd.exe" in files):
rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath):
buildPath = root[:len(root) - len("/build/bin")]
break
return buildPath
def getClientCfgPath(self):
buildPath = self.getBuildPath()
if (buildPath == ""):
tdLog.exit("taosd not found!")
else:
tdLog.info("taosd found in %s" % buildPath)
cfgPath = buildPath + "/../sim/psim/cfg"
tdLog.info("cfgPath: %s" % cfgPath)
return cfgPath
def newcur(self,host='localhost',port=6030,user='root',password='taosdata'):
cfgPath = self.getClientCfgPath()
con=taos.connect(host=host, user=user, password=password, config=cfgPath, port=port)
cur=con.cursor()
print(cur)
return cur
def newTdSql(self, host='localhost',port=6030,user='root',password='taosdata'):
newTdSql = TDSql()
cur = self.newcur(host=host,port=port,user=user,password=password)
newTdSql.init(cur, False)
return newTdSql
tdCom = TDCom() tdCom = TDCom()

View File

@ -73,6 +73,7 @@
./test.sh -f tsim/stream/basic1.sim ./test.sh -f tsim/stream/basic1.sim
./test.sh -f tsim/stream/basic2.sim ./test.sh -f tsim/stream/basic2.sim
# ./test.sh -f tsim/stream/distributeInterval0.sim # ./test.sh -f tsim/stream/distributeInterval0.sim
# ./test.sh -f tsim/stream/distributesession0.sim
# ./test.sh -f tsim/stream/session0.sim # ./test.sh -f tsim/stream/session0.sim
# ./test.sh -f tsim/stream/session1.sim # ./test.sh -f tsim/stream/session1.sim
# ./test.sh -f tsim/stream/state0.sim # ./test.sh -f tsim/stream/state0.sim

View File

@ -0,0 +1,58 @@
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/deploy.sh -n dnode2 -i 2
system sh/exec.sh -n dnode1 -s start
sleep 50
sql connect
sql create dnode $hostname2 port 7200
system sh/exec.sh -n dnode2 -s start
sql create database test vgroups 4;
sql use test;
sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create table ts1 using st tags(1,1,1);
sql create table ts2 using st tags(2,2,2);
sql create stream stream_t1 trigger at_once into streamtST as select _wstartts, count(*) c1, sum(a) c2 , max(b) c3 from st session(ts, 10s) ;
sleep 1000
sql insert into ts1 values(1648791211000,1,1,1) (1648791211005,1,1,1);
sql insert into ts2 values(1648791221004,1,2,3) (1648791221008,2,2,3);
sql insert into ts1 values(1648791211005,1,1,1);
sql insert into ts2 values(1648791221006,5,5,5) (1648791221007,5,5,5);
sql insert into ts2 values(1648791221008,5,5,5) (1648791221008,5,5,5)(1648791221006,5,5,5);
sql insert into ts1 values(1648791231000,1,1,1) (1648791231002,1,1,1) (1648791231006,1,1,1);
sql insert into ts1 values(1648791211000,6,6,6) (1648791231002,2,2,2);
sql insert into ts1 values(1648791211002,7,7,7);
sql insert into ts1 values(1648791211002,7,7,7) ts2 values(1648791221008,5,5,5) ;
$loop_count = 0
loop1:
sql select * from streamtST;
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
# row 0
if $data01 != 10 then
print =====data01=$data01
goto loop1
endi
if $data02 != 34 then
print =====data02=$data02
goto loop1
endi
if $data03 != 7 then
print ======$data03
return -1
endi
system sh/stop_dnodes.sh

View File

@ -4,394 +4,163 @@ from util.dnodes import *
from util.log import * from util.log import *
from util.sql import * from util.sql import *
from util.cases import * from util.cases import *
import datetime
class TDTestCase: class TDTestCase:
updatecfgDict = {'rpcDebugFlag': '143'}
def init(self, conn, logSql): def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}") tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor()) tdSql.init(conn.cursor())
self.today_date = datetime.datetime.strptime(
datetime.datetime.now().strftime("%Y-%m-%d"), "%Y-%m-%d")
self.time_unit = ['b','u','a','s','m','h','d','w']
self.error_param = ['1.5','abc','!@#','"abc"','today()']
self.arithmetic_operators = ['+','-','*','/']
self.relational_operator = ['<','<=','=','>=','>']
# prepare data
self.ntbname = 'ntb'
self.stbname = 'stb'
self.column_dict = {
'ts':'timestamp',
'c1':'int',
'c2':'float',
'c3':'double',
'c4':'timestamp'
}
self.tag_dict = {
't0':'int'
}
self.tbnum = 2
self.tag_values = [
f'10',
f'100'
]
self.values_list = [f'now,1,1.55,100.555555,today()',
f'now+1d,10,11.11,99.999999,now()',
f'today(),3,3.333,333.333333,now()',
f'today()-1d,10,11.11,99.999999,now()',
f'today()+1d,1,1.55,100.555555,today()']
def set_create_normaltable_sql(self, ntbname, column_dict):
column_sql = ''
for k, v in column_dict.items():
column_sql += f"{k} {v},"
create_ntb_sql = f'create table {ntbname} ({column_sql[:-1]})'
return create_ntb_sql
def set_create_stable_sql(self,stbname,column_dict,tag_dict):
column_sql = ''
tag_sql = ''
for k,v in column_dict.items():
column_sql += f"{k} {v},"
for k,v in tag_dict.items():
tag_sql += f"{k} {v},"
create_stb_sql = f'create table {stbname} ({column_sql[:-1]}) tags({tag_sql[:-1]})'
return create_stb_sql
def data_check(self,column_dict={},tbname = '',values_list = [],tb_num = 1,tb = 'tb'):
for k,v in column_dict.items():
num_up = 0
num_down = 0
num_same = 0
if v.lower() == 'timestamp':
tdSql.query(f'select {k} from {tbname}')
for i in tdSql.queryResult:
if i[0] > self.today_date:
num_up += 1
elif i[0] == self.today_date:
num_same += 1
elif i[0] < self.today_date:
num_down += 1
tdSql.query(f"select today() from {tbname}")
tdSql.checkRows(len(values_list)*tb_num)
tdSql.checkData(0, 0, str(self.today_date))
tdSql.query(f"select * from {tbname} where {k}=today()")
if tb == 'tb':
tdSql.checkRows(num_same*tb_num)
elif tb == 'stb':
tdSql.checkRows(num_same)
for i in [f'{tbname}',f'db.{tbname}']:
for unit in self.time_unit:
for symbol in ['+','-']:
tdSql.query(f"select today() {symbol}1{unit} from {i}")
tdSql.checkRows(len(values_list)*tb_num)
for unit in self.error_param:
for symbol in self.arithmetic_operators:
tdSql.error(f'select today() {symbol}{unit} from {i}')
for symbol in self.arithmetic_operators:
tdSql.query(f'select now(){symbol}null from {i}')
tdSql.checkData(0,0,None)
for symbol in self.relational_operator:
tdSql.query(f'select * from {i} where {k} {symbol} today()')
if symbol == '<' :
if tb == 'tb':
tdSql.checkRows(num_down*tb_num)
elif tb == 'stb':
tdSql.checkRows(num_down)
elif symbol == '<=':
if tb == 'tb':
tdSql.checkRows((num_same+num_down)*tb_num)
elif tb == 'stb':
tdSql.checkRows(num_same+num_down)
elif symbol == '=':
if tb == 'tb':
tdSql.checkRows(num_same*tb_num)
elif tb == 'stb':
tdSql.checkRows(num_same)
elif symbol == '>=':
if tb == 'tb':
tdSql.checkRows((num_up + num_same)*tb_num)
elif tb == 'stb':
tdSql.checkRows(num_up + num_same)
elif symbol == '>':
if tb == 'tb':
tdSql.checkRows(num_up*tb_num)
elif tb == 'stb':
tdSql.checkRows(num_up)
tdSql.query(f"select today()/0 from {tbname}")
tdSql.checkRows(len(values_list)*tb_num)
tdSql.checkData(0,0,None)
tdSql.query(f"select {k} from {tbname} where {k}=today()")
if tb == 'tb':
tdSql.checkRows(num_same*tb_num)
for i in range(num_same*tb_num):
tdSql.checkData(i, 0, str(self.today_date))
elif tb == 'stb':
tdSql.checkRows(num_same)
for i in range(num_same):
tdSql.checkData(i, 0, str(self.today_date))
def today_check_ntb(self):
tdSql.prepare()
tdSql.execute(self.set_create_normaltable_sql(self.ntbname,self.column_dict))
for i in self.values_list:
tdSql.execute(
f'insert into {self.ntbname} values({i})')
self.data_check(self.column_dict,self.ntbname,self.values_list)
tdSql.execute('drop database db')
def today_check_stb_tb(self):
tdSql.prepare()
tdSql.execute(self.set_create_stable_sql(self.stbname,self.column_dict,self.tag_dict))
for i in range(self.tbnum):
tdSql.execute(f'create table if not exists {self.stbname}_{i} using {self.stbname} tags({self.tag_values[i]})')
for j in self.values_list:
tdSql.execute(f'insert into {self.stbname}_{i} values ({j})')
# check child table
for i in range(self.tbnum):
self.data_check(self.column_dict,f'{self.stbname}_{i}',self.values_list)
# check stable
self.data_check(self.column_dict,self.stbname,self.values_list,self.tbnum,'stb')
tdSql.execute('drop database db')
def run(self): # sourcery skip: extract-duplicate-method def run(self): # sourcery skip: extract-duplicate-method
# for func now() , today(), timezone()
tdSql.prepare()
today_date = datetime.datetime.strptime(
datetime.datetime.now().strftime("%Y-%m-%d"), "%Y-%m-%d")
tdLog.printNoPrefix("==========step1:create tables==========") tdLog.printNoPrefix("==========check today() for normal table ==========")
tdSql.execute( self.today_check_ntb()
'''create table if not exists ntb tdLog.printNoPrefix("==========check today() for stable and child table==========")
(ts timestamp, c1 int, c2 float,c3 double,c4 timestamp) self.today_check_stb_tb()
'''
)
tdSql.execute(
'''create table if not exists stb
(ts timestamp, c1 int, c2 float,c3 double,c4 timestamp) tags(t0 int)
'''
)
tdSql.execute(
'''create table if not exists stb_1 using stb tags(100)
'''
)
tdLog.printNoPrefix("==========step2:insert data into ntb==========")
tdSql.execute(
'insert into ntb values(now,1,1.55,100.555555,today())("2020-1-1 00:00:00",10,11.11,99.999999,now())(today(),3,3.333,333.333333,now())')
tdSql.execute(
'insert into stb_1 values(now,1,1.55,100.555555,today())("2020-1-1 00:00:00",10,11.11,99.999999,now())(today(),3,3.333,333.333333,now())')
tdLog.printNoPrefix("==========step2:query test of ntb ==========")
# test function today()
# normal table
tdSql.query("select today() from ntb")
tdSql.checkRows(3)
tdSql.checkData(0, 0, str(today_date))
tdSql.query("select today() from db.ntb")
tdSql.checkRows(3)
tdSql.checkData(0, 0, str(today_date))
tdSql.query("select today() +1w from ntb")
tdSql.checkRows(3)
tdSql.query("select today() +1w from db.ntb")
tdSql.checkRows(3)
tdSql.query("select today() +1d from ntb")
tdSql.checkRows(3)
tdSql.query("select today() +1d from db.ntb")
tdSql.checkRows(3)
tdSql.query("select today() +1h from ntb")
tdSql.checkRows(3)
tdSql.query("select today() +1h from db.ntb")
tdSql.checkRows(3)
tdSql.query("select today() +1m from ntb")
tdSql.checkRows(3)
tdSql.query("select today() +1m from db.ntb")
tdSql.checkRows(3)
tdSql.query("select today() +1s from ntb")
tdSql.checkRows(3)
tdSql.query("select today() +1s from db.ntb")
tdSql.checkRows(3)
tdSql.query("select today() +1a from ntb")
tdSql.checkRows(3)
tdSql.query("select today() +1a from db.ntb")
tdSql.checkRows(3)
tdSql.query("select today() +1u from ntb")
tdSql.checkRows(3)
tdSql.query("select today() +1u from db.ntb")
tdSql.checkRows(3)
tdSql.query("select today() +1b from ntb")
tdSql.checkRows(3)
tdSql.query("select today() +1b from db.ntb")
tdSql.checkRows(3)
tdSql.query("select today() -1w from ntb")
tdSql.checkRows(3)
tdSql.query("select today() -1w from db.ntb")
tdSql.checkRows(3)
tdSql.query("select today() -1d from ntb")
tdSql.checkRows(3)
tdSql.query("select today() -1d from db.ntb")
tdSql.checkRows(3)
tdSql.query("select today() -1h from ntb")
tdSql.checkRows(3)
tdSql.query("select today() -1h from db.ntb")
tdSql.checkRows(3)
tdSql.query("select today() -1m from ntb")
tdSql.checkRows(3)
tdSql.query("select today() -1m from db.ntb")
tdSql.checkRows(3)
tdSql.query("select today() -1s from ntb")
tdSql.checkRows(3)
tdSql.query("select today() -1s from db.ntb")
tdSql.checkRows(3)
tdSql.query("select today() -1a from ntb")
tdSql.checkRows(3)
tdSql.query("select today() -1a from db.ntb")
tdSql.checkRows(3)
tdSql.query("select today() -1u from ntb")
tdSql.checkRows(3)
tdSql.query("select today() -1u from db.ntb")
tdSql.checkRows(3)
tdSql.query("select today() -1b from ntb")
tdSql.checkRows(3)
tdSql.query("select today() -1b from db.ntb")
tdSql.checkRows(3)
tdSql.query("select * from ntb where ts=today()")
tdSql.checkRows(1)
tdSql.checkData(0, 1, 3)
tdSql.query("select * from ntb where ts<=today()")
tdSql.checkRows(2)
tdSql.checkData(0, 1, 10)
# for bug
# tdSql.query("select * from ntb where ts<today()")
# tdSql.checkRows(1)
tdSql.query("select * from ntb where ts>=today()")
tdSql.checkRows(2)
tdSql.checkData(0, 1, 3)
# tdSql.query("select * from ntb where ts>today()")
# tdSql.checkRows(1)
tdSql.query("select c4 from ntb where c4=today()")
tdSql.checkRows(1)
tdSql.checkData(0, 0, str(today_date))
tdSql.query("select today() from ntb where ts<now()")
tdSql.checkRows(3)
tdSql.checkData(0, 0, str(today_date))
tdSql.error("select today()+1.5 from ntb")
tdSql.error("select today()-1.5 from ntb")
tdSql.error("select today()*1.5 from ntb")
tdSql.error("select today()/1.5 from ntb")
tdSql.error("select today()+1.5 from db.ntb")
tdSql.error("select today()-1.5 from db.ntb")
tdSql.error("select today()*1.5 from db.ntb")
tdSql.error("select today()/1.5 from db.ntb")
tdSql.query("select today()+null from ntb")
tdSql.checkData(0,0,None)
tdSql.query("select today()+null from db.ntb")
tdSql.checkData(0,0,None)
tdSql.query("select today()-null from ntb")
tdSql.checkData(0,0,None)
tdSql.query("select today()-null from db.ntb")
tdSql.checkData(0,0,None)
tdSql.query("select today()*null from ntb")
tdSql.checkData(0,0,None)
tdSql.query("select today()*null from db.ntb")
tdSql.checkData(0,0,None)
tdSql.query("select today()/null from ntb")
tdSql.checkData(0,0,None)
tdSql.query("select today()/null from db.ntb")
tdSql.checkData(0,0,None)
tdSql.query("select today()/0 from db.ntb")
tdSql.checkRows(3)
tdSql.checkData(0,0,None)
tdSql.checkData(1,0,None)
tdSql.checkData(2,0,None)
# stable
tdSql.query("select today() from stb")
tdSql.checkRows(3)
tdSql.checkData(0, 0, str(today_date))
tdSql.query("select today() +1w from stb")
tdSql.checkRows(3)
tdSql.query("select today() +1w from db.stb")
tdSql.checkRows(3)
tdSql.query("select today() +1d from stb")
tdSql.checkRows(3)
tdSql.query("select today() +1d from db.stb")
tdSql.checkRows(3)
tdSql.query("select today() +1h from stb")
tdSql.checkRows(3)
tdSql.query("select today() +1h from db.stb")
tdSql.checkRows(3)
tdSql.query("select today() +1m from stb")
tdSql.checkRows(3)
tdSql.query("select today() +1m from db.stb")
tdSql.checkRows(3)
tdSql.query("select today() +1s from stb")
tdSql.checkRows(3)
tdSql.query("select today() +1s from db.stb")
tdSql.checkRows(3)
tdSql.query("select today() +1a from stb")
tdSql.checkRows(3)
tdSql.query("select today() +1a from db.stb")
tdSql.checkRows(3)
tdSql.query("select today() +1u from stb")
tdSql.checkRows(3)
tdSql.query("select today() +1u from db.stb")
tdSql.checkRows(3)
tdSql.query("select today() +1b from stb")
tdSql.checkRows(3)
tdSql.query("select today() +1b from db.stb")
tdSql.checkRows(3)
tdSql.query("select today() -1w from stb")
tdSql.checkRows(3)
tdSql.query("select today() -1w from db.stb")
tdSql.checkRows(3)
tdSql.query("select today() -1d from stb")
tdSql.checkRows(3)
tdSql.query("select today() -1d from db.stb")
tdSql.checkRows(3)
tdSql.query("select today() -1h from stb")
tdSql.checkRows(3)
tdSql.query("select today() -1h from db.stb")
tdSql.checkRows(3)
tdSql.query("select today() -1m from stb")
tdSql.checkRows(3)
tdSql.query("select today() -1m from db.stb")
tdSql.checkRows(3)
tdSql.query("select today() -1s from stb")
tdSql.checkRows(3)
tdSql.query("select today() -1s from db.stb")
tdSql.checkRows(3)
tdSql.query("select today() -1a from stb")
tdSql.checkRows(3)
tdSql.query("select today() -1a from db.stb")
tdSql.checkRows(3)
tdSql.query("select today() -1u from stb")
tdSql.checkRows(3)
tdSql.query("select today() -1u from db.stb")
tdSql.checkRows(3)
tdSql.query("select today() -1b from stb")
tdSql.checkRows(3)
tdSql.query("select today() -1b from db.stb")
tdSql.checkRows(3)
tdSql.query("select ts from stb where ts=today()")
tdSql.checkRows(1)
tdSql.checkData(0, 0, str(today_date))
tdSql.query("select ts from stb where ts<=today()")
tdSql.checkRows(2)
tdSql.error("select today()+1.5 from stb")
tdSql.error("select today()-1.5 from stb")
tdSql.error("select today()*1.5 from stb")
tdSql.error("select today()/1.5 from stb")
tdSql.query("select today()+null from stb")
tdSql.checkData(0,0,None)
tdSql.query("select today()+null from db.stb")
tdSql.checkData(0,0,None)
tdSql.query("select today()-null from stb")
tdSql.checkData(0,0,None)
tdSql.query("select today()-null from db.stb")
tdSql.checkData(0,0,None)
tdSql.query("select today()*null from stb")
tdSql.checkData(0,0,None)
tdSql.query("select today()*null from db.stb")
tdSql.checkData(0,0,None)
tdSql.query("select today()/null from stb")
tdSql.checkData(0,0,None)
tdSql.query("select today()/null from db.stb")
tdSql.checkData(0,0,None)
#
# tdSql.query("select * from ntb where ts<today()")
# tdSql.checkRows(1)
# tdSql.query("select * from stb where ts>=today()")
# tdSql.checkRows(2)
# tdSql.query("select * from ntb where ts>today()")
# tdSql.checkRows(1)
tdSql.query("select c4 from stb where c4=today()")
tdSql.checkRows(1)
tdSql.checkData(0, 0, str(today_date))
tdSql.query("select today() from stb where ts<now()")
tdSql.checkRows(3)
tdSql.query("select today()/0 from db.stb")
tdSql.checkRows(3)
tdSql.checkData(0,0,None)
tdSql.checkData(1,0,None)
tdSql.checkData(2,0,None)
# table
tdSql.query("select today() from stb_1")
tdSql.checkRows(3)
tdSql.checkData(0, 0, str(today_date))
tdSql.query("select today() from db.stb_1")
tdSql.checkRows(3)
tdSql.checkData(0, 0, str(today_date))
tdSql.query("select today() +1w from stb_1")
tdSql.checkRows(3)
tdSql.query("select today() +1w from db.stb_1")
tdSql.checkRows(3)
tdSql.query("select today() +1d from stb_1")
tdSql.checkRows(3)
tdSql.query("select today() +1d from db.stb_1")
tdSql.checkRows(3)
tdSql.query("select today() +1h from stb_1")
tdSql.checkRows(3)
tdSql.query("select today() +1h from db.stb_1")
tdSql.checkRows(3)
tdSql.query("select today() +1m from stb_1")
tdSql.checkRows(3)
tdSql.query("select today() +1m from db.stb_1")
tdSql.checkRows(3)
tdSql.query("select today() +1s from stb_1")
tdSql.checkRows(3)
tdSql.query("select today() +1s from db.stb_1")
tdSql.checkRows(3)
tdSql.query("select today() +1a from stb_1")
tdSql.checkRows(3)
tdSql.query("select today() +1a from db.stb_1")
tdSql.checkRows(3)
tdSql.query("select today() +1u from stb_1")
tdSql.checkRows(3)
tdSql.query("select today() +1u from db.stb_1")
tdSql.checkRows(3)
tdSql.query("select today() +1b from stb_1")
tdSql.checkRows(3)
tdSql.query("select today() +1b from db.stb_1")
tdSql.checkRows(3)
tdSql.query("select today() -1w from stb_1")
tdSql.checkRows(3)
tdSql.query("select today() -1w from db.stb_1")
tdSql.checkRows(3)
tdSql.query("select today() -1d from stb_1")
tdSql.checkRows(3)
tdSql.query("select today() -1d from db.stb_1")
tdSql.checkRows(3)
tdSql.query("select today() -1h from stb_1")
tdSql.checkRows(3)
tdSql.query("select today() -1h from db.stb_1")
tdSql.checkRows(3)
tdSql.query("select today() -1m from stb_1")
tdSql.checkRows(3)
tdSql.query("select today() -1m from db.stb_1")
tdSql.checkRows(3)
tdSql.query("select today() -1s from stb_1")
tdSql.checkRows(3)
tdSql.query("select today() -1s from db.stb_1")
tdSql.checkRows(3)
tdSql.query("select today() -1a from stb_1")
tdSql.checkRows(3)
tdSql.query("select today() -1a from db.stb_1")
tdSql.checkRows(3)
tdSql.query("select today() -1u from stb_1")
tdSql.checkRows(3)
tdSql.query("select today() -1u from db.stb_1")
tdSql.checkRows(3)
tdSql.query("select today() -1b from stb_1")
tdSql.checkRows(3)
tdSql.query("select today() -1b from db.stb_1")
tdSql.checkRows(3)
tdSql.query("select ts from stb_1 where ts=today()")
tdSql.checkRows(1)
tdSql.checkData(0, 0, str(today_date))
tdSql.query("select ts from stb_1 where ts<=today()")
tdSql.checkRows(2)
# for bug
tdSql.query("select * from ntb where ts<today()")
tdSql.checkRows(1)
tdSql.checkData(0,0,"2020-1-1 00:00:00")
tdSql.query("select * from stb_1 where ts>=today()")
tdSql.checkRows(2)
tdSql.query("select * from ntb where ts>today()")
tdSql.checkRows(1)
tdSql.query("select c4 from stb_1 where c4=today()")
tdSql.checkRows(1)
tdSql.query("select today() from stb_1 where ts<now()")
tdSql.checkRows(3)
tdSql.error("select today()+1.5 from stb_1")
tdSql.error("select today()-1.5 from stb_1")
tdSql.error("select today()*1.5 from stb_1")
tdSql.error("select today()/1.5 from stb_1")
tdSql.query("select today()+null from stb_1")
tdSql.checkData(0,0,None)
tdSql.query("select today()+null from db.stb_1")
tdSql.checkData(0,0,None)
tdSql.query("select today()-null from stb_1")
tdSql.checkData(0,0,None)
tdSql.query("select today()-null from db.stb_1")
tdSql.checkData(0,0,None)
tdSql.query("select today()*null from stb_1")
tdSql.checkData(0,0,None)
tdSql.query("select today()*null from db.stb_1")
tdSql.checkData(0,0,None)
tdSql.query("select today()/null from stb_1")
tdSql.checkData(0,0,None)
tdSql.query("select today()/null from db.stb_1")
tdSql.checkData(0,0,None)
tdSql.query("select today()/0 from db.stb_1")
tdSql.checkRows(3)
tdSql.checkData(0,0,None)
tdSql.checkData(1,0,None)
tdSql.checkData(2,0,None)
def stop(self): def stop(self):
tdSql.close() tdSql.close()
tdLog.success(f"{__file__} successfully executed") tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase()) tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase()) tdCases.addWindows(__file__, TDTestCase())

View File

@ -0,0 +1,116 @@
import sys
import time
import socket
import os
import threading
import taos
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
from util.common import *
sys.path.append("./7-tmq")
from tmqCommon import *
class TDTestCase:
paraDict = {'dbName': 'db12',
'dropFlag':1,
'vgroups': 4,
'precision': 'ms',
'stbName': 'stb0',
'ctbNum': 10,
'rowsPerTbl': 10000,
'batchNum': 10,
'startTs': 0, # 1640966400000 ----> 2022-01-01 00:00:00.000
'event':'',
'columnDict': {'int':2},
'tagDict': {'int':1}
}
cdbName = 'cdb'
# some parameter to consumer processor
consumerId = 0
expectrowcnt = 0
topicList = ''
ifcheckdata = 0
ifManualCommit = 1
groupId = 'group.id:cgrp1'
autoCommit = 'enable.auto.commit:false'
autoCommitInterval = 'auto.commit.interval.ms:1000'
autoOffset = 'auto.offset.reset:earliest'
pollDelay = 20
showMsg = 1
showRow = 1
hostname = socket.gethostname()
def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}")
logSql = False
tdSql.init(conn.cursor(), logSql)
def tmqCase12(self):
tdLog.printNoPrefix("======== test case 12: ")
tdLog.info("step 1: create database, stb, ctb and insert data")
tmqCom.initConsumerTable(self.cdbName)
tdCom.create_database(tdSql,self.paraDict["dbName"],self.paraDict["dropFlag"], self.paraDict['precision'])
self.paraDict["stbName"] = 'stb1'
tdCom.create_stable(tdSql,self.paraDict["dbName"],self.paraDict["stbName"],self.paraDict["columnDict"],self.paraDict["tagDict"])
tdCom.create_ctables(tdSql,self.paraDict["dbName"],self.paraDict["stbName"],self.paraDict["ctbNum"],self.paraDict["tagDict"])
tdCom.insert_data(tdSql,self.paraDict["dbName"],self.paraDict["stbName"],self.paraDict["ctbNum"],self.paraDict["rowsPerTbl"],self.paraDict["batchNum"])
self.paraDict["stbName"] = 'stb2'
tdCom.create_stable(tdSql,self.paraDict["dbName"],self.paraDict["stbName"],self.paraDict["columnDict"],self.paraDict["tagDict"])
tdCom.create_ctables(tdSql,self.paraDict["dbName"],self.paraDict["stbName"],self.paraDict["ctbNum"],self.paraDict["tagDict"])
tdCom.insert_data(tdSql,self.paraDict["dbName"],self.paraDict["stbName"],self.paraDict["ctbNum"],self.paraDict["rowsPerTbl"],self.paraDict["batchNum"])
tdLog.info("create topics from db")
topicName1 = 'topic_%s'%(self.paraDict['dbName'])
tdSql.execute("create topic %s as database %s" %(topicName1, self.paraDict['dbName']))
topicList = topicName1
keyList = '%s,%s,%s,%s'%(self.groupId,self.autoCommit,self.autoCommitInterval,self.autoOffset)
self.expectrowcnt = self.paraDict["rowsPerTbl"] * self.paraDict["ctbNum"] * 2
tmqCom.insertConsumerInfo(self.consumerId, self.expectrowcnt,topicList,keyList,self.ifcheckdata,self.ifManualCommit)
tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(self.pollDelay,self.paraDict["dbName"],self.showMsg, self.showRow,self.cdbName)
tdLog.info("After waiting for a period of time, drop one stable")
time.sleep(10)
tdSql.execute("drop table %s.%s" %(self.paraDict['dbName'], self.paraDict['stbName']))
tdLog.info("wait result from consumer, then check it")
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
totalConsumeRows = 0
for i in range(expectRows):
totalConsumeRows += resultList[i]
if not (totalConsumeRows >= self.expectrowcnt/2 and totalConsumeRows <= self.expectrowcnt):
tdLog.info("act consume rows: %d, expect consume rows: between %d and %d"%(totalConsumeRows, self.expectrowcnt/2, self.expectrowcnt))
tdLog.exit("tmq consume rows error!")
time.sleep(15)
tdSql.query("drop topic %s"%topicName1)
tdLog.printNoPrefix("======== test case 12 end ...... ")
def run(self):
tdSql.prepare()
self.tmqCase12()
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
event = threading.Event()
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())

View File

@ -196,16 +196,16 @@ class TDTestCase:
auotCtbPrefix = 'autoCtb' auotCtbPrefix = 'autoCtb'
# create and start thread # create and start thread
parameterDict = {'cfg': '', \ parameterDict = {'cfg': '',
'actionType': 0, \ 'actionType': 0,
'dbName': 'db1', \ 'dbName': 'db1',
'dropFlag': 1, \ 'dropFlag': 1,
'vgroups': 4, \ 'vgroups': 4,
'replica': 1, \ 'replica': 1,
'stbName': 'stb1', \ 'stbName': 'stb1',
'ctbNum': 10, \ 'ctbNum': 10,
'rowsPerTbl': 10000, \ 'rowsPerTbl': 10000,
'batchNum': 100, \ 'batchNum': 100,
'startTs': 1640966400000} # 2022-01-01 00:00:00.000 'startTs': 1640966400000} # 2022-01-01 00:00:00.000
parameterDict['cfg'] = cfgPath parameterDict['cfg'] = cfgPath
@ -349,7 +349,5 @@ class TDTestCase:
tdSql.close() tdSql.close()
tdLog.success(f"{__file__} successfully executed") tdLog.success(f"{__file__} successfully executed")
event = threading.Event()
tdCases.addLinux(__file__, TDTestCase()) tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase()) tdCases.addWindows(__file__, TDTestCase())

View File

@ -0,0 +1,118 @@
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
from collections import defaultdict
import random
import string
import threading
import requests
import time
# import socketfrom
import taos
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
from util.common import *
# class actionType(Enum):
# CREATE_DATABASE = 0
# CREATE_STABLE = 1
# CREATE_CTABLE = 2
# INSERT_DATA = 3
class TMQCom:
def init(self, conn, logSql):
tdSql.init(conn.cursor())
# tdSql.init(conn.cursor(), logSql) # output sql.txt file
def initConsumerTable(self,cdbName='cdb'):
tdLog.info("create consume database, and consume info table, and consume result table")
tdSql.query("create database if not exists %s vgroups 1"%(cdbName))
tdSql.query("drop table if exists %s.consumeinfo "%(cdbName))
tdSql.query("drop table if exists %s.consumeresult "%(cdbName))
tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName)
tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName)
def initConsumerInfoTable(self,cdbName='cdb'):
tdLog.info("drop consumeinfo table")
tdSql.query("drop table if exists %s.consumeinfo "%(cdbName))
tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName)
def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'):
sql = "insert into %s.consumeinfo values "%cdbName
sql += "(now, %d, '%s', '%s', %d, %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata, ifmanualcommit)
tdLog.info("consume info sql: %s"%sql)
tdSql.query(sql)
def selectConsumeResult(self,expectRows,cdbName='cdb'):
resultList=[]
while 1:
tdSql.query("select * from %s.consumeresult"%cdbName)
#tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))
if tdSql.getRows() == expectRows:
break
else:
time.sleep(5)
for i in range(expectRows):
tdLog.info ("consume id: %d, consume msgs: %d, consume rows: %d"%(tdSql.getData(i , 1), tdSql.getData(i , 2), tdSql.getData(i , 3)))
resultList.append(tdSql.getData(i , 3))
return resultList
def startTmqSimProcess(self,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0):
buildPath = tdCom.getBuildPath()
cfgPath = tdCom.getClientCfgPath()
if valgrind == 1:
logFile = cfgPath + '/../log/valgrind-tmq.log'
shellCmd = 'nohup valgrind --log-file=' + logFile
shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes '
if (platform.system().lower() == 'windows'):
shellCmd = 'mintty -h never -w hide ' + buildPath + '\\build\\bin\\tmq_sim.exe -c ' + cfgPath
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
shellCmd += "> nul 2>&1 &"
else:
shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
shellCmd += "> /dev/null 2>&1 &"
tdLog.info(shellCmd)
os.system(shellCmd)
def syncCreateDbStbCtbInsertData(self, tsql, paraDict):
tdCom.create_database(tsql, paraDict["dbName"],paraDict["dropFlag"], paraDict['precision'])
tdCom.create_stable(tsql, paraDict["dbName"],paraDict["stbName"], paraDict["columnDict"], paraDict["tagDict"])
tdCom.create_ctables(tsql, paraDict["dbName"],paraDict["stbName"],paraDict["ctbNum"],paraDict["tagDict"])
if "event" in paraDict and type(paraDict['event']) == type(threading.Event()):
paraDict["event"].set()
tdCom.insert_data(tsql,paraDict["dbName"],paraDict["stbName"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"])
return
def threadFunction(self, **paraDict):
# create new connector for new tdSql instance in my thread
newTdSql = tdCom.newTdSql()
self.syncCreateDbStbCtbInsertData(self, newTdSql, paraDict)
return
def asyncCreateDbStbCtbInsertData(self, paraDict):
pThread = threading.Thread(target=self.threadFunction, kwargs=paraDict)
pThread.start()
return pThread
def close(self):
self.cursor.close()
tmqCom = TMQCom()