Merge branch '3.0' into fix/3_liaohj
This commit is contained in:
commit
7fda9d9fcb
|
@ -176,6 +176,7 @@ typedef enum EStreamType {
|
|||
STREAM_CREATE_CHILD_TABLE,
|
||||
STREAM_TRANS_STATE,
|
||||
STREAM_MID_RETRIEVE,
|
||||
STREAM_PARTITION_DELETE_DATA,
|
||||
} EStreamType;
|
||||
|
||||
#pragma pack(push, 1)
|
||||
|
|
|
@ -153,6 +153,7 @@ typedef struct SJoinLogicNode {
|
|||
bool seqWinGroup;
|
||||
bool grpJoin;
|
||||
bool hashJoinHint;
|
||||
bool batchScanHint;
|
||||
|
||||
// FOR HASH JOIN
|
||||
int32_t timeRangeTarget; //table onCond filter
|
||||
|
|
|
@ -538,7 +538,6 @@ void stopAllQueries(SRequestObj *pRequest) {
|
|||
pTmp = acquireRequest(tmpRefId);
|
||||
if (pTmp) {
|
||||
pReqList[++reqIdx] = pTmp;
|
||||
releaseRequest(tmpRefId);
|
||||
} else {
|
||||
tscError("prev req ref 0x%" PRIx64 " is not there", tmpRefId);
|
||||
break;
|
||||
|
@ -547,6 +546,7 @@ void stopAllQueries(SRequestObj *pRequest) {
|
|||
|
||||
for (int32_t i = reqIdx; i >= 0; i--) {
|
||||
taosStopQueryImpl(pReqList[i]);
|
||||
releaseRequest(pReqList[i]->self);
|
||||
}
|
||||
|
||||
taosStopQueryImpl(pRequest);
|
||||
|
|
|
@ -1526,6 +1526,8 @@ static int32_t mndCreateTSMATxnPrepare(SCreateTSMACxt* pCxt) {
|
|||
int32_t code = -1;
|
||||
STransAction createStreamRedoAction = {0};
|
||||
STransAction createStreamUndoAction = {0};
|
||||
STransAction dropStbUndoAction = {0};
|
||||
SMDropStbReq dropStbReq = {0};
|
||||
STrans *pTrans =
|
||||
mndTransCreate(pCxt->pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pCxt->pRpcReq, "create-tsma");
|
||||
if (!pTrans) {
|
||||
|
@ -1556,7 +1558,7 @@ static int32_t mndCreateTSMATxnPrepare(SCreateTSMACxt* pCxt) {
|
|||
|
||||
createStreamUndoAction.epSet = createStreamRedoAction.epSet;
|
||||
createStreamUndoAction.acceptableCode = TSDB_CODE_MND_STREAM_NOT_EXIST;
|
||||
createStreamUndoAction.actionType = TDMT_STREAM_DROP;
|
||||
createStreamUndoAction.msgType = TDMT_STREAM_DROP;
|
||||
createStreamUndoAction.contLen = tSerializeSMDropStreamReq(0, 0, pCxt->pDropStreamReq);
|
||||
createStreamUndoAction.pCont = taosMemoryCalloc(1, createStreamUndoAction.contLen);
|
||||
if (!createStreamUndoAction.pCont) {
|
||||
|
@ -1569,6 +1571,24 @@ static int32_t mndCreateTSMATxnPrepare(SCreateTSMACxt* pCxt) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
dropStbReq.igNotExists = true;
|
||||
strncpy(dropStbReq.name, pCxt->targetStbFullName, TSDB_TABLE_FNAME_LEN);
|
||||
dropStbUndoAction.epSet = createStreamRedoAction.epSet;
|
||||
dropStbUndoAction.acceptableCode = TSDB_CODE_MND_STB_NOT_EXIST;
|
||||
dropStbUndoAction.retryCode = TSDB_CODE_MND_STREAM_MUST_BE_DELETED;
|
||||
dropStbUndoAction.msgType = TDMT_MND_STB_DROP;
|
||||
dropStbUndoAction.contLen = tSerializeSMDropStbReq(0, 0, &dropStbReq);
|
||||
dropStbUndoAction.pCont = taosMemoryCalloc(1, dropStbUndoAction.contLen);
|
||||
if (!dropStbUndoAction.pCont) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _OVER;
|
||||
}
|
||||
if (dropStbUndoAction.contLen != tSerializeSMDropStbReq(dropStbUndoAction.pCont, dropStbUndoAction.contLen, &dropStbReq)) {
|
||||
mError("sma: %s, failed to create due to drop stb req encode failure", pCxt->pCreateSmaReq->name);
|
||||
terrno = TSDB_CODE_INVALID_MSG;
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
SDbObj newDb = {0};
|
||||
memcpy(&newDb, pCxt->pDb, sizeof(SDbObj));
|
||||
newDb.tsmaVersion++;
|
||||
|
@ -1579,6 +1599,7 @@ static int32_t mndCreateTSMATxnPrepare(SCreateTSMACxt* pCxt) {
|
|||
if (mndSetCreateSmaCommitLogs(pCxt->pMnode, pTrans, pCxt->pSma) != 0) goto _OVER;
|
||||
if (mndTransAppendRedoAction(pTrans, &createStreamRedoAction) != 0) goto _OVER;
|
||||
if (mndTransAppendUndoAction(pTrans, &createStreamUndoAction) != 0) goto _OVER;
|
||||
if (mndTransAppendUndoAction(pTrans, &dropStbUndoAction) != 0) goto _OVER;
|
||||
if (mndTransPrepare(pCxt->pMnode, pTrans) != 0) goto _OVER;
|
||||
|
||||
code = TSDB_CODE_SUCCESS;
|
||||
|
|
|
@ -1109,7 +1109,7 @@ int32_t mndTransProcessRsp(SRpcMsg *pRsp) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
int32_t actionNum = taosArrayGetSize(pTrans->redoActions);
|
||||
int32_t actionNum = taosArrayGetSize(pArray);
|
||||
if (action < 0 || action >= actionNum) {
|
||||
mError("trans:%d, invalid action:%d", transId, action);
|
||||
goto _OVER;
|
||||
|
|
|
@ -31,21 +31,21 @@ int meteEncodeColCmprEntry(SEncoder *pCoder, const SMetaEntry *pME) {
|
|||
int meteDecodeColCmprEntry(SDecoder *pDecoder, SMetaEntry *pME) {
|
||||
SColCmprWrapper *pWrapper = &pME->colCmpr;
|
||||
if (tDecodeI32v(pDecoder, &pWrapper->nCols) < 0) return -1;
|
||||
if (pWrapper->nCols == 0) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (tDecodeI32v(pDecoder, &pWrapper->version) < 0) return -1;
|
||||
uDebug("dencode cols:%d", pWrapper->nCols);
|
||||
|
||||
pWrapper->pColCmpr = (SColCmpr *)tDecoderMalloc(pDecoder, pWrapper->nCols * sizeof(SColCmpr));
|
||||
if (pWrapper->pColCmpr == NULL) return -1;
|
||||
|
||||
for (int i = 0; i < pWrapper->nCols; i++) {
|
||||
SColCmpr *p = &pWrapper->pColCmpr[i];
|
||||
if (tDecodeI16v(pDecoder, &p->id) < 0) goto END;
|
||||
if (tDecodeU32(pDecoder, &p->alg) < 0) goto END;
|
||||
if (tDecodeI16v(pDecoder, &p->id) < 0) return -1;
|
||||
if (tDecodeU32(pDecoder, &p->alg) < 0) return -1;
|
||||
}
|
||||
return 0;
|
||||
END:
|
||||
// taosMemoryFree(pWrapper->pColCmpr);
|
||||
return -1;
|
||||
}
|
||||
static FORCE_INLINE void metatInitDefaultSColCmprWrapper(SDecoder *pDecoder, SColCmprWrapper *pCmpr,
|
||||
SSchemaWrapper *pSchema) {
|
||||
|
@ -152,6 +152,10 @@ int metaDecodeEntry(SDecoder *pCoder, SMetaEntry *pME) {
|
|||
if (pME->type == TSDB_SUPER_TABLE) {
|
||||
if (TABLE_IS_COL_COMPRESSED(pME->flags)) {
|
||||
if (meteDecodeColCmprEntry(pCoder, pME) < 0) return -1;
|
||||
|
||||
if (pME->colCmpr.nCols == 0) {
|
||||
metatInitDefaultSColCmprWrapper(pCoder, &pME->colCmpr, &pME->stbEntry.schemaRow);
|
||||
}
|
||||
} else {
|
||||
metatInitDefaultSColCmprWrapper(pCoder, &pME->colCmpr, &pME->stbEntry.schemaRow);
|
||||
TABLE_SET_COL_COMPRESSED(pME->flags);
|
||||
|
@ -160,6 +164,9 @@ int metaDecodeEntry(SDecoder *pCoder, SMetaEntry *pME) {
|
|||
if (!tDecodeIsEnd(pCoder)) {
|
||||
uDebug("set type: %d, tableName:%s", pME->type, pME->name);
|
||||
if (meteDecodeColCmprEntry(pCoder, pME) < 0) return -1;
|
||||
if (pME->colCmpr.nCols == 0) {
|
||||
metatInitDefaultSColCmprWrapper(pCoder, &pME->colCmpr, &pME->ntbEntry.schemaRow);
|
||||
}
|
||||
} else {
|
||||
uDebug("set default type: %d, tableName:%s", pME->type, pME->name);
|
||||
metatInitDefaultSColCmprWrapper(pCoder, &pME->colCmpr, &pME->ntbEntry.schemaRow);
|
||||
|
|
|
@ -274,8 +274,6 @@ int metaCreateSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
|
|||
me.name = pReq->name;
|
||||
me.stbEntry.schemaRow = pReq->schemaRow;
|
||||
me.stbEntry.schemaTag = pReq->schemaTag;
|
||||
// me.stbEntry.colCmpr = pReq->colCmpr;
|
||||
// me.stbEntry.colCmpr = pReq->
|
||||
if (pReq->rollup) {
|
||||
TABLE_SET_ROLLUP(me.flags);
|
||||
me.stbEntry.rsmaParam = pReq->rsmaParam;
|
||||
|
@ -283,10 +281,6 @@ int metaCreateSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
|
|||
if (pReq->colCmpred) {
|
||||
TABLE_SET_COL_COMPRESSED(me.flags);
|
||||
me.colCmpr = pReq->colCmpr;
|
||||
} else {
|
||||
TABLE_SET_COL_COMPRESSED(me.flags);
|
||||
// TODO(yihao)
|
||||
// SETUP default compress algr
|
||||
}
|
||||
|
||||
if (metaHandleEntry(pMeta, &me) < 0) goto _err;
|
||||
|
|
|
@ -287,7 +287,19 @@ int32_t prepareDataBlockBuf(SSDataBlock* pDataBlock, SColMatchInfo* pMatchInfo)
|
|||
|
||||
EDealRes doTranslateTagExpr(SNode** pNode, void* pContext) {
|
||||
SMetaReader* mr = (SMetaReader*)pContext;
|
||||
bool isTagCol = false, isTbname = false;
|
||||
if (nodeType(*pNode) == QUERY_NODE_COLUMN) {
|
||||
SColumnNode* pCol = (SColumnNode*)*pNode;
|
||||
if (pCol->colType == COLUMN_TYPE_TBNAME)
|
||||
isTbname = true;
|
||||
else
|
||||
isTagCol = true;
|
||||
} else if (nodeType(*pNode) == QUERY_NODE_FUNCTION) {
|
||||
SFunctionNode* pFunc = (SFunctionNode*)*pNode;
|
||||
if (pFunc->funcType == FUNCTION_TYPE_TBNAME)
|
||||
isTbname = true;
|
||||
}
|
||||
if (isTagCol) {
|
||||
SColumnNode* pSColumnNode = *(SColumnNode**)pNode;
|
||||
|
||||
SValueNode* res = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE);
|
||||
|
@ -316,24 +328,21 @@ EDealRes doTranslateTagExpr(SNode** pNode, void* pContext) {
|
|||
}
|
||||
nodesDestroyNode(*pNode);
|
||||
*pNode = (SNode*)res;
|
||||
} else if (nodeType(*pNode) == QUERY_NODE_FUNCTION) {
|
||||
SFunctionNode* pFuncNode = *(SFunctionNode**)pNode;
|
||||
if (pFuncNode->funcType == FUNCTION_TYPE_TBNAME) {
|
||||
SValueNode* res = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE);
|
||||
if (NULL == res) {
|
||||
return DEAL_RES_ERROR;
|
||||
}
|
||||
|
||||
res->translate = true;
|
||||
res->node.resType = pFuncNode->node.resType;
|
||||
|
||||
int32_t len = strlen(mr->me.name);
|
||||
res->datum.p = taosMemoryCalloc(len + VARSTR_HEADER_SIZE + 1, 1);
|
||||
memcpy(varDataVal(res->datum.p), mr->me.name, len);
|
||||
varDataSetLen(res->datum.p, len);
|
||||
nodesDestroyNode(*pNode);
|
||||
*pNode = (SNode*)res;
|
||||
} else if (isTbname) {
|
||||
SValueNode* res = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE);
|
||||
if (NULL == res) {
|
||||
return DEAL_RES_ERROR;
|
||||
}
|
||||
|
||||
res->translate = true;
|
||||
res->node.resType = ((SExprNode*)(*pNode))->resType;
|
||||
|
||||
int32_t len = strlen(mr->me.name);
|
||||
res->datum.p = taosMemoryCalloc(len + VARSTR_HEADER_SIZE + 1, 1);
|
||||
memcpy(varDataVal(res->datum.p), mr->me.name, len);
|
||||
varDataSetLen(res->datum.p, len);
|
||||
nodesDestroyNode(*pNode);
|
||||
*pNode = (SNode*)res;
|
||||
}
|
||||
|
||||
return DEAL_RES_CONTINUE;
|
||||
|
|
|
@ -1640,7 +1640,7 @@ static void getPreVersionDataBlock(uint64_t uid, TSKEY startTs, TSKEY endTs, int
|
|||
printDataBlock(pBlock, "new delete", taskIdStr);
|
||||
}
|
||||
|
||||
static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
|
||||
static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock, EStreamType mode) {
|
||||
if (pSrcBlock->info.rows == 0) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -1659,7 +1659,7 @@ static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSr
|
|||
}
|
||||
int64_t ver = pSrcBlock->info.version - 1;
|
||||
|
||||
if (pInfo->partitionSup.needCalc && (startData[0] != endData[0] || hasPrimaryKey(pInfo))) {
|
||||
if (pInfo->partitionSup.needCalc && ( startData[0] != endData[0] || (hasPrimaryKey(pInfo) && mode == STREAM_DELETE_DATA) )) {
|
||||
getPreVersionDataBlock(uidCol[0], startData[0], endData[0], ver, GET_TASKID(pTaskInfo), pInfo, pSrcBlock);
|
||||
startData = (TSKEY*)pStartTsCol->pData;
|
||||
endData = (TSKEY*)pEndTsCol->pData;
|
||||
|
@ -1736,7 +1736,7 @@ static int32_t generateCountScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcB
|
|||
}
|
||||
int64_t ver = pSrcBlock->info.version - 1;
|
||||
|
||||
if (pInfo->partitionSup.needCalc && (startData[0] != endData[0] || hasPrimaryKey(pInfo))) {
|
||||
if (pInfo->partitionSup.needCalc && ( startData[0] != endData[0] || (hasPrimaryKey(pInfo) && mode == STREAM_DELETE_DATA) )) {
|
||||
getPreVersionDataBlock(uidCol[0], startData[0], endData[0], ver, GET_TASKID(pTaskInfo), pInfo, pSrcBlock);
|
||||
startData = (TSKEY*)pStartTsCol->pData;
|
||||
endData = (TSKEY*)pEndTsCol->pData;
|
||||
|
@ -1779,7 +1779,7 @@ static int32_t generateCountScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcB
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
|
||||
static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock, EStreamType mode) {
|
||||
blockDataCleanup(pDestBlock);
|
||||
if (pSrcBlock->info.rows == 0) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -1800,7 +1800,7 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS
|
|||
TSKEY* srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData;
|
||||
int64_t ver = pSrcBlock->info.version - 1;
|
||||
|
||||
if (pInfo->partitionSup.needCalc && (srcStartTsCol[0] != srcEndTsCol[0] || hasPrimaryKey(pInfo))) {
|
||||
if (pInfo->partitionSup.needCalc && ( srcStartTsCol[0] != srcEndTsCol[0] || (hasPrimaryKey(pInfo) && mode == STREAM_DELETE_DATA) )) {
|
||||
getPreVersionDataBlock(srcUidData[0], srcStartTsCol[0], srcEndTsCol[0], ver, GET_TASKID(pTaskInfo), pInfo, pSrcBlock);
|
||||
srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData;
|
||||
srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData;
|
||||
|
@ -1959,9 +1959,9 @@ static int32_t generateDeleteResultBlock(SStreamScanInfo* pInfo, SSDataBlock* pS
|
|||
static int32_t generateScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock, EStreamType type) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
if (isIntervalWindow(pInfo)) {
|
||||
code = generateIntervalScanRange(pInfo, pSrcBlock, pDestBlock);
|
||||
code = generateIntervalScanRange(pInfo, pSrcBlock, pDestBlock, type);
|
||||
} else if (isSessionWindow(pInfo) || isStateWindow(pInfo)) {
|
||||
code = generateSessionScanRange(pInfo, pSrcBlock, pDestBlock);
|
||||
code = generateSessionScanRange(pInfo, pSrcBlock, pDestBlock, type);
|
||||
} else if (isCountWindow(pInfo)) {
|
||||
code = generateCountScanRange(pInfo, pSrcBlock, pDestBlock, type);
|
||||
} else {
|
||||
|
@ -2660,7 +2660,7 @@ FETCH_NEXT_BLOCK:
|
|||
}
|
||||
} break;
|
||||
case STREAM_SCAN_FROM_DELETE_DATA: {
|
||||
generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes, STREAM_DELETE_DATA);
|
||||
generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes, STREAM_PARTITION_DELETE_DATA);
|
||||
prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
|
||||
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
|
||||
copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes);
|
||||
|
|
|
@ -185,7 +185,7 @@ void getCountWinRange(SStreamAggSupporter* pAggSup, const SSessionKey* pKey, ESt
|
|||
return;
|
||||
}
|
||||
pDelRange->win = tmpKey.win;
|
||||
while (mode == STREAM_DELETE_DATA) {
|
||||
while (mode == STREAM_DELETE_DATA || mode == STREAM_PARTITION_DELETE_DATA) {
|
||||
pAggSup->stateStore.streamStateCurNext(pAggSup->pState, pCur);
|
||||
code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &tmpKey, NULL, 0);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
|
|
@ -262,7 +262,7 @@ static void doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, SSDa
|
|||
if (chIds) {
|
||||
int32_t childId = getChildIndex(pBlock);
|
||||
if (pInvalidWins) {
|
||||
qDebug("===stream===save mid delete window:%" PRId64 ",groupId:%" PRId64 ",chId:%d", winRes.ts, winRes.groupId, childId);
|
||||
qDebug("===stream===save invalid delete window:%" PRId64 ",groupId:%" PRId64 ",chId:%d", winRes.ts, winRes.groupId, childId);
|
||||
taosHashPut(pInvalidWins, &winRes, sizeof(SWinKey), NULL, 0);
|
||||
}
|
||||
|
||||
|
@ -654,11 +654,12 @@ static bool processPullOver(SSDataBlock* pBlock, SHashObj* pMap, SHashObj* pFina
|
|||
.calWin.skey = nextWin.skey,
|
||||
.calWin.ekey = nextWin.skey};
|
||||
// add pull data request
|
||||
qDebug("===stream===prepare final retrive for delete window:%" PRId64 ",groupId%" PRId64 ", size:%d", winRes.ts, winRes.groupId, numOfCh);
|
||||
qDebug("===stream===prepare final retrive for delete window:%" PRId64 ",groupId:%" PRId64 ", size:%d", winRes.ts, winRes.groupId, numOfCh);
|
||||
if (IS_MID_INTERVAL_OP(pOperator)) {
|
||||
SStreamIntervalOperatorInfo* pInfo = (SStreamIntervalOperatorInfo*)pOperator->info;
|
||||
taosArrayPush(pInfo->pMidPullDatas, &winRes);
|
||||
} else if (savePullWindow(&pull, pPullWins) == TSDB_CODE_SUCCESS) {
|
||||
taosArrayPush(pInfo->pDelWins, &winRes);
|
||||
addPullWindow(pMap, &winRes, numOfCh);
|
||||
if (pInfo->destHasPrimaryKey) {
|
||||
tSimpleHashPut(pInfo->pDeletedMap,&winRes, sizeof(SWinKey), NULL, 0);
|
||||
|
@ -1328,7 +1329,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
|||
} else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
|
||||
pBlock->info.type == STREAM_CLEAR) {
|
||||
SArray* delWins = taosArrayInit(8, sizeof(SWinKey));
|
||||
doDeleteWindows(pOperator, &pInfo->interval, pBlock, delWins, pInfo->pUpdatedMap, NULL);
|
||||
SHashObj* finalMap = IS_FINAL_INTERVAL_OP(pOperator) ? pInfo->pFinalPullDataMap : NULL;
|
||||
doDeleteWindows(pOperator, &pInfo->interval, pBlock, delWins, pInfo->pUpdatedMap, finalMap);
|
||||
if (IS_FINAL_INTERVAL_OP(pOperator)) {
|
||||
int32_t chId = getChildIndex(pBlock);
|
||||
addRetriveWindow(delWins, pInfo, chId);
|
||||
|
|
|
@ -2517,7 +2517,7 @@ static int32_t translateMd5(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
|||
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
|
||||
}
|
||||
|
||||
pFunc->node.resType = (SDataType){.bytes = MD5_OUTPUT_LEN, .type = TSDB_DATA_TYPE_VARCHAR};
|
||||
pFunc->node.resType = (SDataType){.bytes = MD5_OUTPUT_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR};
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
|
@ -511,6 +511,7 @@ static int32_t logicJoinCopy(const SJoinLogicNode* pSrc, SJoinLogicNode* pDst) {
|
|||
COPY_SCALAR_FIELD(seqWinGroup);
|
||||
COPY_SCALAR_FIELD(grpJoin);
|
||||
COPY_SCALAR_FIELD(hashJoinHint);
|
||||
COPY_SCALAR_FIELD(batchScanHint);
|
||||
CLONE_NODE_FIELD(pLeftOnCond);
|
||||
CLONE_NODE_FIELD(pRightOnCond);
|
||||
COPY_SCALAR_FIELD(timeRangeTarget);
|
||||
|
|
|
@ -582,6 +582,7 @@ static int32_t createJoinLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
|
|||
pJoin->node.inputTsOrder = ORDER_ASC;
|
||||
pJoin->node.groupAction = GROUP_ACTION_CLEAR;
|
||||
pJoin->hashJoinHint = getHashJoinOptHint(pSelect->pHint);
|
||||
pJoin->batchScanHint = getBatchScanOptionFromHint(pSelect->pHint);
|
||||
pJoin->node.requireDataOrder = pJoin->hashJoinHint ? DATA_ORDER_LEVEL_NONE : DATA_ORDER_LEVEL_GLOBAL;
|
||||
pJoin->node.resultDataOrder = DATA_ORDER_LEVEL_NONE;
|
||||
pJoin->isLowLevelJoin = pJoinTable->isLowLevelJoin;
|
||||
|
|
|
@ -2751,16 +2751,20 @@ static bool partTagsIsOptimizableNode(SLogicNode* pNode) {
|
|||
if (!ret) return ret;
|
||||
switch (nodeType(pNode)) {
|
||||
case QUERY_NODE_LOGIC_PLAN_PARTITION: {
|
||||
if (pNode->pParent && nodeType(pNode->pParent) == QUERY_NODE_LOGIC_PLAN_WINDOW) {
|
||||
SWindowLogicNode* pWindow = (SWindowLogicNode*)pNode->pParent;
|
||||
if (pWindow->winType == WINDOW_TYPE_INTERVAL) {
|
||||
// if interval has slimit, we push down partition node to scan, and scan will set groupOrderScan to true
|
||||
// we want to skip groups of blocks after slimit satisfied
|
||||
// if interval only has limit, we do not push down partition node to scan
|
||||
// we want to get grouped output from partition node and make use of limit
|
||||
// if no slimit and no limit, we push down partition node and groupOrderScan is false, cause we do not need
|
||||
// group ordered output
|
||||
if (!pWindow->node.pSlimit && pWindow->node.pLimit) ret = false;
|
||||
if (pNode->pParent) {
|
||||
if (nodeType(pNode->pParent) == QUERY_NODE_LOGIC_PLAN_WINDOW) {
|
||||
SWindowLogicNode* pWindow = (SWindowLogicNode*)pNode->pParent;
|
||||
if (pWindow->winType == WINDOW_TYPE_INTERVAL) {
|
||||
// if interval has slimit, we push down partition node to scan, and scan will set groupOrderScan to true
|
||||
// we want to skip groups of blocks after slimit satisfied
|
||||
// if interval only has limit, we do not push down partition node to scan
|
||||
// we want to get grouped output from partition node and make use of limit
|
||||
// if no slimit and no limit, we push down partition node and groupOrderScan is false, cause we do not need
|
||||
// group ordered output
|
||||
if (!pWindow->node.pSlimit && pWindow->node.pLimit) ret = false;
|
||||
}
|
||||
} else if (nodeType(pNode->pParent) == QUERY_NODE_LOGIC_PLAN_JOIN) {
|
||||
ret = false;
|
||||
}
|
||||
}
|
||||
} break;
|
||||
|
@ -5607,7 +5611,7 @@ static int32_t grpJoinOptPartByTags(SLogicNode* pNode) {
|
|||
|
||||
static int32_t grpJoinOptRewriteGroupJoin(SOptimizeContext* pCxt, SLogicNode* pNode, SLogicSubplan* pLogicSubplan) {
|
||||
SJoinLogicNode* pJoin = (SJoinLogicNode*)pNode;
|
||||
int32_t code = (pJoin->allEqTags && !pJoin->hasSubQuery) ? grpJoinOptPartByTags(pNode) : grpJoinOptInsertPartitionNode(pNode);
|
||||
int32_t code = (pJoin->allEqTags && !pJoin->hasSubQuery && !pJoin->batchScanHint) ? grpJoinOptPartByTags(pNode) : grpJoinOptInsertPartitionNode(pNode);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
pJoin->grpJoin = true;
|
||||
pCxt->optimized = true;
|
||||
|
@ -6328,6 +6332,7 @@ static int32_t tsmaOptRewriteScan(STSMAOptCtx* pTsmaOptCtx, SScanLogicNode* pNew
|
|||
if (code == TSDB_CODE_SUCCESS) {
|
||||
code = tsmaOptRewriteNodeList(pNewScan->pGroupTags, pTsmaOptCtx, pTsma, true, true);
|
||||
}
|
||||
pTsmaOptCtx->pScan->dataRequired = FUNC_DATA_REQUIRED_DATA_LOAD;
|
||||
if (pTsmaOptCtx->pScan->pTsmaTargetTbVgInfo && pTsmaOptCtx->pScan->pTsmaTargetTbVgInfo->size > 0) {
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pTsmaOptCtx->pScan->pTsmas); ++i) {
|
||||
STableTSMAInfo* pTsmaInfo = taosArrayGetP(pTsmaOptCtx->pScan->pTsmas, i);
|
||||
|
|
|
@ -141,6 +141,13 @@ class TDTestCase(TBase):
|
|||
tdSql.checkData(i, 5, self.defCompress)
|
||||
tdSql.checkData(i, 6, self.defLevel)
|
||||
|
||||
# geometry encode is disabled
|
||||
sql = f"create table {self.db}.ta(ts timestamp, pos geometry(64)) "
|
||||
tdSql.execute(sql)
|
||||
sql = f"describe {self.db}.ta"
|
||||
tdSql.query(sql)
|
||||
tdSql.checkData(1, 4, "disabled")
|
||||
|
||||
tdLog.info("check default encode compress and level successfully.")
|
||||
|
||||
def checkDataDesc(self, tbname, row, col, value):
|
||||
|
|
|
@ -7,8 +7,8 @@
|
|||
"password": "taosdata",
|
||||
"connection_pool_size": 8,
|
||||
"num_of_records_per_req": 4000,
|
||||
"prepared_rand": 1000,
|
||||
"thread_count": 2,
|
||||
"prepared_rand": 500,
|
||||
"thread_count": 4,
|
||||
"create_table_thread_count": 1,
|
||||
"confirm_parameter_prompt": "no",
|
||||
"databases": [
|
||||
|
@ -18,20 +18,26 @@
|
|||
"drop": "yes",
|
||||
"vgroups": 2,
|
||||
"replica": 1,
|
||||
"duration":"15d",
|
||||
"flush_each_batch":"yes",
|
||||
"keep": "60d,100d,200d"
|
||||
"duration":"10d",
|
||||
"s3_keeplocal":"30d",
|
||||
"s3_chunksize":"131072",
|
||||
"tsdb_pagesize":"1",
|
||||
"s3_compact":"1",
|
||||
"wal_retention_size":"1",
|
||||
"wal_retention_period":"1",
|
||||
"flush_each_batch":"no",
|
||||
"keep": "3650d"
|
||||
},
|
||||
"super_tables": [
|
||||
{
|
||||
"name": "stb",
|
||||
"child_table_exists": "no",
|
||||
"childtable_count": 2,
|
||||
"childtable_count": 10,
|
||||
"insert_rows": 2000000,
|
||||
"childtable_prefix": "d",
|
||||
"insert_mode": "taosc",
|
||||
"timestamp_step": 1000,
|
||||
"start_timestamp":"now-90d",
|
||||
"start_timestamp": 1600000000000,
|
||||
"columns": [
|
||||
{ "type": "bool", "name": "bc"},
|
||||
{ "type": "float", "name": "fc" },
|
|
@ -0,0 +1,351 @@
|
|||
###################################################################
|
||||
# Copyright (c) 2016 by TAOS Technologies, Inc.
|
||||
# All rights reserved.
|
||||
#
|
||||
# This file is proprietary and confidential to TAOS Technologies.
|
||||
# No part of this file may be reproduced, stored, transmitted,
|
||||
# disclosed or used in any form or by any means other than as
|
||||
# expressly provided by the written permission from Jianhui Tao
|
||||
#
|
||||
###################################################################
|
||||
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import sys
|
||||
import time
|
||||
import random
|
||||
|
||||
import taos
|
||||
import frame
|
||||
import frame.etool
|
||||
import frame.eos
|
||||
|
||||
from frame.log import *
|
||||
from frame.cases import *
|
||||
from frame.sql import *
|
||||
from frame.caseBase import *
|
||||
from frame.srvCtl import *
|
||||
from frame import *
|
||||
from frame.eos import *
|
||||
|
||||
|
||||
#
|
||||
# 192.168.1.52 MINIO S3
|
||||
#
|
||||
|
||||
'''
|
||||
s3EndPoint http://192.168.1.52:9000
|
||||
s3AccessKey 'zOgllR6bSnw2Ah3mCNel:cdO7oXAu3Cqdb1rUdevFgJMi0LtRwCXdWKQx4bhX'
|
||||
s3BucketName ci-bucket
|
||||
s3UploadDelaySec 60
|
||||
'''
|
||||
|
||||
|
||||
class TDTestCase(TBase):
|
||||
updatecfgDict = {
|
||||
"supportVnodes":"1000",
|
||||
's3EndPoint': 'http://192.168.1.52:9000',
|
||||
's3AccessKey': 'zOgllR6bSnw2Ah3mCNel:cdO7oXAu3Cqdb1rUdevFgJMi0LtRwCXdWKQx4bhX',
|
||||
's3BucketName': 'ci-bucket',
|
||||
's3PageCacheSize': '10240',
|
||||
"s3UploadDelaySec": "10",
|
||||
's3MigrateIntervalSec': '600',
|
||||
's3MigrateEnabled': '1'
|
||||
}
|
||||
|
||||
maxFileSize = (128 + 10) * 1014 * 1024 # add 10M buffer
|
||||
|
||||
def insertData(self):
|
||||
tdLog.info(f"insert data.")
|
||||
# taosBenchmark run
|
||||
json = etool.curFile(__file__, "s3Basic.json")
|
||||
etool.benchMark(json=json)
|
||||
|
||||
tdSql.execute(f"use {self.db}")
|
||||
# come from s3_basic.json
|
||||
self.childtable_count = 10
|
||||
self.insert_rows = 2000000
|
||||
self.timestamp_step = 1000
|
||||
|
||||
def createStream(self, sname):
|
||||
sql = f"create stream {sname} fill_history 1 into stm1 as select count(*) from {self.db}.{self.stb} interval(10s);"
|
||||
tdSql.execute(sql)
|
||||
|
||||
def migrateDbS3(self):
|
||||
sql = f"s3migrate database {self.db}"
|
||||
tdSql.execute(sql, show=True)
|
||||
|
||||
def checkDataFile(self, lines, maxFileSize):
|
||||
# ls -l
|
||||
# -rwxrwxrwx 1 root root 41652224 Apr 17 14:47 vnode2/tsdb/v2f1974ver47.3.data
|
||||
overCnt = 0
|
||||
for line in lines:
|
||||
cols = line.split()
|
||||
fileSize = int(cols[4])
|
||||
fileName = cols[8]
|
||||
#print(f" filesize={fileSize} fileName={fileName} line={line}")
|
||||
if fileSize > maxFileSize:
|
||||
tdLog.info(f"error, {fileSize} over max size({maxFileSize})\n")
|
||||
overCnt += 1
|
||||
else:
|
||||
tdLog.info(f"{fileName}({fileSize}) check size passed.")
|
||||
|
||||
return overCnt
|
||||
|
||||
def checkUploadToS3(self):
|
||||
rootPath = sc.clusterRootPath()
|
||||
cmd = f"ls -l {rootPath}/dnode*/data/vnode/vnode*/tsdb/*.data"
|
||||
tdLog.info(cmd)
|
||||
loop = 0
|
||||
rets = []
|
||||
overCnt = 0
|
||||
while loop < 180:
|
||||
time.sleep(3)
|
||||
|
||||
# check upload to s3
|
||||
rets = eos.runRetList(cmd)
|
||||
cnt = len(rets)
|
||||
if cnt == 0:
|
||||
overCnt = 0
|
||||
tdLog.info("All data file upload to server over.")
|
||||
break
|
||||
overCnt = self.checkDataFile(rets, self.maxFileSize)
|
||||
if overCnt == 0:
|
||||
uploadOK = True
|
||||
tdLog.info(f"All data files({len(rets)}) size bellow {self.maxFileSize}, check upload to s3 ok.")
|
||||
break
|
||||
|
||||
tdLog.info(f"loop={loop} no upload {overCnt} data files wait 3s retry ...")
|
||||
if loop == 3:
|
||||
sc.dnodeStop(1)
|
||||
time.sleep(2)
|
||||
sc.dnodeStart(1)
|
||||
loop += 1
|
||||
# miggrate
|
||||
self.migrateDbS3()
|
||||
|
||||
# check can pass
|
||||
if overCnt > 0:
|
||||
tdLog.exit(f"s3 have {overCnt} files over size.")
|
||||
|
||||
|
||||
def doAction(self):
|
||||
tdLog.info(f"do action.")
|
||||
|
||||
self.flushDb(show=True)
|
||||
#self.compactDb(show=True)
|
||||
|
||||
# sleep 70s
|
||||
self.migrateDbS3()
|
||||
|
||||
# check upload to s3
|
||||
self.checkUploadToS3()
|
||||
|
||||
def checkStreamCorrect(self):
|
||||
sql = f"select count(*) from {self.db}.stm1"
|
||||
count = 0
|
||||
for i in range(120):
|
||||
tdSql.query(sql)
|
||||
count = tdSql.getData(0, 0)
|
||||
if count == 100000 or count == 100001:
|
||||
return True
|
||||
time.sleep(1)
|
||||
|
||||
tdLog.exit(f"stream count is not expect . expect = 100000 or 100001 real={count} . sql={sql}")
|
||||
|
||||
|
||||
def checkCreateDb(self, keepLocal, chunkSize, compact):
|
||||
# keyword
|
||||
kw1 = kw2 = kw3 = ""
|
||||
if keepLocal is not None:
|
||||
kw1 = f"s3_keeplocal {keepLocal}"
|
||||
if chunkSize is not None:
|
||||
kw2 = f"s3_chunksize {chunkSize}"
|
||||
if compact is not None:
|
||||
kw3 = f"s3_compact {compact}"
|
||||
|
||||
sql = f" create database db1 duration 1h {kw1} {kw2} {kw3}"
|
||||
tdSql.execute(sql, show=True)
|
||||
#sql = f"select name,s3_keeplocal,s3_chunksize,s3_compact from information_schema.ins_databases where name='db1';"
|
||||
sql = f"select * from information_schema.ins_databases where name='db1';"
|
||||
tdSql.query(sql)
|
||||
# 29 30 31 -> chunksize keeplocal compact
|
||||
if chunkSize is not None:
|
||||
tdSql.checkData(0, 29, chunkSize)
|
||||
if keepLocal is not None:
|
||||
keepLocalm = keepLocal * 24 * 60
|
||||
tdSql.checkData(0, 30, f"{keepLocalm}m")
|
||||
if compact is not None:
|
||||
tdSql.checkData(0, 31, compact)
|
||||
sql = "drop database db1"
|
||||
tdSql.execute(sql)
|
||||
|
||||
def checkExcept(self):
|
||||
# errors
|
||||
sqls = [
|
||||
f"create database db2 s3_keeplocal -1",
|
||||
f"create database db2 s3_keeplocal 0",
|
||||
f"create database db2 s3_keeplocal 365001",
|
||||
f"create database db2 s3_chunksize -1",
|
||||
f"create database db2 s3_chunksize 0",
|
||||
f"create database db2 s3_chunksize 900000000",
|
||||
f"create database db2 s3_compact -1",
|
||||
f"create database db2 s3_compact 100",
|
||||
f"create database db2 duration 1d s3_keeplocal 1d"
|
||||
]
|
||||
tdSql.errors(sqls)
|
||||
|
||||
|
||||
def checkBasic(self):
|
||||
# create db
|
||||
keeps = [1, 256, 1024, 365000, None]
|
||||
chunks = [131072, 600000, 820000, 1048576, None]
|
||||
comps = [0, 1, None]
|
||||
|
||||
for keep in keeps:
|
||||
for chunk in chunks:
|
||||
for comp in comps:
|
||||
self.checkCreateDb(keep, chunk, comp)
|
||||
|
||||
|
||||
# --checks3
|
||||
idx = 1
|
||||
taosd = sc.taosdFile(idx)
|
||||
cfg = sc.dnodeCfgPath(idx)
|
||||
cmd = f"{taosd} -c {cfg} --checks3"
|
||||
|
||||
eos.exe(cmd)
|
||||
#output, error = eos.run(cmd)
|
||||
#print(lines)
|
||||
|
||||
'''
|
||||
tips = [
|
||||
"put object s3test.txt: success",
|
||||
"listing bucket ci-bucket: success",
|
||||
"get object s3test.txt: success",
|
||||
"delete object s3test.txt: success"
|
||||
]
|
||||
pos = 0
|
||||
for tip in tips:
|
||||
pos = output.find(tip, pos)
|
||||
#if pos == -1:
|
||||
# tdLog.exit(f"checks3 failed not found {tip}. cmd={cmd} output={output}")
|
||||
'''
|
||||
|
||||
# except
|
||||
self.checkExcept()
|
||||
|
||||
#
|
||||
def preDb(self, vgroups):
|
||||
|
||||
cnt = int(time.time())%3 + 1
|
||||
for i in range(cnt):
|
||||
vg = int(time.time()*1000)%10 + 1
|
||||
sql = f"create database predb vgroups {vg}"
|
||||
tdSql.execute(sql, show=True)
|
||||
sql = "drop database predb"
|
||||
tdSql.execute(sql, show=True)
|
||||
|
||||
# history
|
||||
def insertHistory(self):
|
||||
tdLog.info(f"insert history data.")
|
||||
# taosBenchmark run
|
||||
json = etool.curFile(__file__, "s3Basic1.json")
|
||||
etool.benchMark(json=json)
|
||||
|
||||
# come from s3_basic.json
|
||||
self.insert_rows += self.insert_rows/4
|
||||
self.timestamp_step = 500
|
||||
|
||||
# delete
|
||||
def checkDelete(self):
|
||||
# del 1000 rows
|
||||
start = 1600000000000
|
||||
drows = 200
|
||||
for i in range(1, drows, 2):
|
||||
sql = f"from {self.db}.{self.stb} where ts = {start + i*500}"
|
||||
tdSql.execute("delete " + sql, show=True)
|
||||
tdSql.query("select * " + sql)
|
||||
tdSql.checkRows(0)
|
||||
|
||||
# delete all 500 step
|
||||
self.flushDb()
|
||||
self.compactDb()
|
||||
self.insert_rows -= drows/2
|
||||
sql = f"select count(*) from {self.db}.{self.stb}"
|
||||
tdSql.checkAgg(sql, self.insert_rows * self.childtable_count)
|
||||
|
||||
# delete 10W rows from 100000
|
||||
drows = 100000
|
||||
sdel = start + 100000 * self.timestamp_step
|
||||
edel = start + 100000 * self.timestamp_step + drows * self.timestamp_step
|
||||
sql = f"from {self.db}.{self.stb} where ts >= {sdel} and ts < {edel}"
|
||||
tdSql.execute("delete " + sql, show=True)
|
||||
tdSql.query("select * " + sql)
|
||||
tdSql.checkRows(0)
|
||||
|
||||
self.insert_rows -= drows
|
||||
sql = f"select count(*) from {self.db}.{self.stb}"
|
||||
tdSql.checkAgg(sql, self.insert_rows * self.childtable_count)
|
||||
|
||||
|
||||
# run
|
||||
def run(self):
|
||||
tdLog.debug(f"start to excute {__file__}")
|
||||
self.sname = "stream1"
|
||||
if eos.isArm64Cpu():
|
||||
tdLog.success(f"{__file__} arm64 ignore executed")
|
||||
else:
|
||||
|
||||
self.preDb(10)
|
||||
|
||||
# insert data
|
||||
self.insertData()
|
||||
|
||||
# creat stream
|
||||
self.createStream(self.sname)
|
||||
|
||||
# check insert data correct
|
||||
#self.checkInsertCorrect()
|
||||
|
||||
# save
|
||||
self.snapshotAgg()
|
||||
|
||||
# do action
|
||||
self.doAction()
|
||||
|
||||
# check save agg result correct
|
||||
self.checkAggCorrect()
|
||||
|
||||
# check insert correct again
|
||||
self.checkInsertCorrect()
|
||||
|
||||
# checkBasic
|
||||
self.checkBasic()
|
||||
|
||||
# check stream correct and drop stream
|
||||
#self.checkStreamCorrect()
|
||||
|
||||
# drop stream
|
||||
self.dropStream(self.sname)
|
||||
|
||||
# insert history disorder data
|
||||
self.insertHistory()
|
||||
#self.checkInsertCorrect()
|
||||
self.snapshotAgg()
|
||||
self.doAction()
|
||||
self.checkAggCorrect()
|
||||
self.checkInsertCorrect(difCnt=self.childtable_count*999999)
|
||||
self.checkDelete()
|
||||
self.doAction()
|
||||
|
||||
# drop database and free s3 file
|
||||
self.dropDb()
|
||||
|
||||
|
||||
tdLog.success(f"{__file__} successfully executed")
|
||||
|
||||
|
||||
|
||||
tdCases.addLinux(__file__, TDTestCase())
|
||||
tdCases.addWindows(__file__, TDTestCase())
|
|
@ -0,0 +1,66 @@
|
|||
{
|
||||
"filetype": "insert",
|
||||
"cfgdir": "/etc/taos",
|
||||
"host": "127.0.0.1",
|
||||
"port": 6030,
|
||||
"user": "root",
|
||||
"password": "taosdata",
|
||||
"connection_pool_size": 8,
|
||||
"num_of_records_per_req": 5000,
|
||||
"prepared_rand": 500,
|
||||
"thread_count": 4,
|
||||
"create_table_thread_count": 1,
|
||||
"confirm_parameter_prompt": "no",
|
||||
"databases": [
|
||||
{
|
||||
"dbinfo": {
|
||||
"name": "db",
|
||||
"drop": "no",
|
||||
"vgroups": 2,
|
||||
"replica": 1,
|
||||
"duration":"10d",
|
||||
"s3_keeplocal":"30d",
|
||||
"s3_chunksize":"131072",
|
||||
"tsdb_pagesize":"1",
|
||||
"s3_compact":"1",
|
||||
"wal_retention_size":"1",
|
||||
"wal_retention_period":"1",
|
||||
"flush_each_batch":"no",
|
||||
"keep": "3650d"
|
||||
},
|
||||
"super_tables": [
|
||||
{
|
||||
"name": "stb",
|
||||
"child_table_exists": "yes",
|
||||
"childtable_count": 10,
|
||||
"insert_rows": 1000000,
|
||||
"childtable_prefix": "d",
|
||||
"insert_mode": "taosc",
|
||||
"timestamp_step": 500,
|
||||
"start_timestamp": 1600000000000,
|
||||
"columns": [
|
||||
{ "type": "bool", "name": "bc"},
|
||||
{ "type": "float", "name": "fc" },
|
||||
{ "type": "double", "name": "dc"},
|
||||
{ "type": "tinyint", "name": "ti"},
|
||||
{ "type": "smallint", "name": "si" },
|
||||
{ "type": "int", "name": "ic" ,"max": 1,"min": 1},
|
||||
{ "type": "bigint", "name": "bi" },
|
||||
{ "type": "utinyint", "name": "uti"},
|
||||
{ "type": "usmallint", "name": "usi"},
|
||||
{ "type": "uint", "name": "ui" },
|
||||
{ "type": "ubigint", "name": "ubi"},
|
||||
{ "type": "binary", "name": "bin", "len": 32},
|
||||
{ "type": "nchar", "name": "nch", "len": 64}
|
||||
],
|
||||
"tags": [
|
||||
{"type": "tinyint", "name": "groupid","max": 10,"min": 1},
|
||||
{"name": "location","type": "binary", "len": 16, "values":
|
||||
["San Francisco", "Los Angles", "San Diego", "San Jose", "Palo Alto", "Campbell", "Mountain View","Sunnyvale", "Santa Clara", "Cupertino"]
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
|
@ -1,157 +0,0 @@
|
|||
###################################################################
|
||||
# Copyright (c) 2016 by TAOS Technologies, Inc.
|
||||
# All rights reserved.
|
||||
#
|
||||
# This file is proprietary and confidential to TAOS Technologies.
|
||||
# No part of this file may be reproduced, stored, transmitted,
|
||||
# disclosed or used in any form or by any means other than as
|
||||
# expressly provided by the written permission from Jianhui Tao
|
||||
#
|
||||
###################################################################
|
||||
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import sys
|
||||
import time
|
||||
|
||||
import taos
|
||||
import frame
|
||||
import frame.etool
|
||||
import frame.eos
|
||||
|
||||
from frame.log import *
|
||||
from frame.cases import *
|
||||
from frame.sql import *
|
||||
from frame.caseBase import *
|
||||
from frame.srvCtl import *
|
||||
from frame import *
|
||||
from frame.eos import *
|
||||
|
||||
#
|
||||
# 192.168.1.52 MINIO S3
|
||||
#
|
||||
|
||||
'''
|
||||
s3EndPoint http://192.168.1.52:9000
|
||||
s3AccessKey 'zOgllR6bSnw2Ah3mCNel:cdO7oXAu3Cqdb1rUdevFgJMi0LtRwCXdWKQx4bhX'
|
||||
s3BucketName ci-bucket
|
||||
s3UploadDelaySec 60
|
||||
'''
|
||||
|
||||
|
||||
class TDTestCase(TBase):
|
||||
updatecfgDict = {
|
||||
's3EndPoint': 'http://192.168.1.52:9000',
|
||||
's3AccessKey': 'zOgllR6bSnw2Ah3mCNel:cdO7oXAu3Cqdb1rUdevFgJMi0LtRwCXdWKQx4bhX',
|
||||
's3BucketName': 'ci-bucket',
|
||||
's3BlockSize': '10240',
|
||||
's3BlockCacheSize': '320',
|
||||
's3PageCacheSize': '10240',
|
||||
's3UploadDelaySec':'60'
|
||||
}
|
||||
|
||||
def insertData(self):
|
||||
tdLog.info(f"insert data.")
|
||||
# taosBenchmark run
|
||||
json = etool.curFile(__file__, "s3_basic.json")
|
||||
etool.benchMark(json=json)
|
||||
|
||||
tdSql.execute(f"use {self.db}")
|
||||
# come from s3_basic.json
|
||||
self.childtable_count = 2
|
||||
self.insert_rows = 2000000
|
||||
self.timestamp_step = 1000
|
||||
|
||||
def createStream(self, sname):
|
||||
sql = f"create stream {sname} fill_history 1 into stm1 as select count(*) from {self.db}.{self.stb} interval(10s);"
|
||||
tdSql.execute(sql)
|
||||
|
||||
def doAction(self):
|
||||
tdLog.info(f"do action.")
|
||||
|
||||
self.flushDb()
|
||||
self.compactDb()
|
||||
|
||||
# sleep 70s
|
||||
tdLog.info(f"wait 65s ...")
|
||||
time.sleep(65)
|
||||
self.trimDb(True)
|
||||
|
||||
rootPath = sc.clusterRootPath()
|
||||
cmd = f"ls {rootPath}/dnode1/data2*/vnode/vnode*/tsdb/*.data"
|
||||
tdLog.info(cmd)
|
||||
loop = 0
|
||||
rets = []
|
||||
while loop < 180:
|
||||
time.sleep(3)
|
||||
rets = eos.runRetList(cmd)
|
||||
cnt = len(rets)
|
||||
if cnt == 0:
|
||||
tdLog.info("All data file upload to server over.")
|
||||
break
|
||||
self.trimDb(True)
|
||||
tdLog.info(f"loop={loop} no upload {cnt} data files wait 3s retry ...")
|
||||
if loop == 0:
|
||||
sc.dnodeStop(1)
|
||||
time.sleep(2)
|
||||
sc.dnodeStart(1)
|
||||
loop += 1
|
||||
|
||||
if len(rets) > 0:
|
||||
tdLog.exit(f"s3 can not upload all data to server. data files cnt={len(rets)} list={rets}")
|
||||
|
||||
def checkStreamCorrect(self):
|
||||
sql = f"select count(*) from {self.db}.stm1"
|
||||
count = 0
|
||||
for i in range(120):
|
||||
tdSql.query(sql)
|
||||
count = tdSql.getData(0, 0)
|
||||
if count == 100000 or count == 100001:
|
||||
return True
|
||||
time.sleep(1)
|
||||
|
||||
tdLog.exit(f"stream count is not expect . expect = 100000 or 100001 real={count} . sql={sql}")
|
||||
|
||||
# run
|
||||
def run(self):
|
||||
tdLog.debug(f"start to excute {__file__}")
|
||||
self.sname = "stream1"
|
||||
if eos.isArm64Cpu():
|
||||
tdLog.success(f"{__file__} arm64 ignore executed")
|
||||
else:
|
||||
# insert data
|
||||
self.insertData()
|
||||
|
||||
# creat stream
|
||||
self.createStream(self.sname)
|
||||
|
||||
# check insert data correct
|
||||
self.checkInsertCorrect()
|
||||
|
||||
# save
|
||||
self.snapshotAgg()
|
||||
|
||||
# do action
|
||||
self.doAction()
|
||||
|
||||
# check save agg result correct
|
||||
self.checkAggCorrect()
|
||||
|
||||
# check insert correct again
|
||||
self.checkInsertCorrect()
|
||||
|
||||
# check stream correct and drop stream
|
||||
#self.checkStreamCorrect()
|
||||
|
||||
# drop stream
|
||||
self.dropStream(self.sname)
|
||||
|
||||
# drop database and free s3 file
|
||||
self.dropDb()
|
||||
|
||||
tdLog.success(f"{__file__} successfully executed")
|
||||
|
||||
|
||||
|
||||
tdCases.addLinux(__file__, TDTestCase())
|
||||
tdCases.addWindows(__file__, TDTestCase())
|
|
@ -129,7 +129,7 @@ class TBase:
|
|||
#
|
||||
|
||||
# basic
|
||||
def checkInsertCorrect(self):
|
||||
def checkInsertCorrect(self, difCnt = 0):
|
||||
# check count
|
||||
sql = f"select count(*) from {self.stb}"
|
||||
tdSql.checkAgg(sql, self.childtable_count * self.insert_rows)
|
||||
|
@ -139,9 +139,8 @@ class TBase:
|
|||
tdSql.checkAgg(sql, self.childtable_count)
|
||||
|
||||
# check step
|
||||
sql = f"select * from (select diff(ts) as dif from {self.stb} partition by tbname order by ts desc) where dif != {self.timestamp_step}"
|
||||
tdSql.query(sql)
|
||||
tdSql.checkRows(0)
|
||||
sql = f"select count(*) from (select diff(ts) as dif from {self.stb} partition by tbname order by ts desc) where dif != {self.timestamp_step}"
|
||||
#tdSql.checkAgg(sql, difCnt)
|
||||
|
||||
# save agg result
|
||||
def snapshotAgg(self):
|
||||
|
|
|
@ -146,6 +146,10 @@ class TDDnodes:
|
|||
if index < 1 or index > 10:
|
||||
tdLog.exit("index:%d should on a scale of [1, 10]" % (index))
|
||||
|
||||
def taosdFile(self, index):
|
||||
self.check(index)
|
||||
return self.dnodes[index - 1].getPath()
|
||||
|
||||
def StopAllSigint(self):
|
||||
tdLog.info("stop all dnodes sigint, asan:%d" % self.asan)
|
||||
if self.asan:
|
||||
|
|
|
@ -658,6 +658,7 @@ class TDSql:
|
|||
def checkAgg(self, sql, expectCnt):
|
||||
self.query(sql)
|
||||
self.checkData(0, 0, expectCnt)
|
||||
tdLog.info(f"{sql} expect {expectCnt} ok.")
|
||||
|
||||
# expect first value
|
||||
def checkFirstValue(self, sql, expect):
|
||||
|
|
|
@ -62,6 +62,15 @@ class srvCtl:
|
|||
return clusterDnodes.getDnodesRootDir()
|
||||
|
||||
return tdDnodes.getDnodesRootDir()
|
||||
|
||||
# get taosd path
|
||||
def taosdFile(self, idx):
|
||||
if clusterDnodes.getModel() == 'cluster':
|
||||
return clusterDnodes.taosdFile(idx)
|
||||
|
||||
return tdDnodes.taosdFile(idx)
|
||||
|
||||
|
||||
|
||||
# return dnode data files list
|
||||
def dnodeDataFiles(self, idx):
|
||||
|
|
|
@ -114,7 +114,7 @@ if __name__ == "__main__":
|
|||
level = 1
|
||||
disk = 1
|
||||
|
||||
opts, args = getopt.gnu_getopt(sys.argv[1:], 'f:p:m:l:scghrd:k:e:N:M:Q:C:RWU:n:i:aP:L:D:', [
|
||||
opts, args = getopt.gnu_getopt(sys.argv[1:], 'f:p:m:l:scghrd:k:e:N:M:Q:C:RWU:n:i:aPL:D:', [
|
||||
'file=', 'path=', 'master', 'logSql', 'stop', 'cluster', 'valgrind', 'help', 'restart', 'updateCfgDict', 'killv', 'execCmd','dnodeNums','mnodeNums',
|
||||
'queryPolicy','createDnodeNums','restful','websocket','adaptercfgupdate','replicaVar','independentMnode',"asan",'previous','level','disk'])
|
||||
for key, value in opts:
|
||||
|
|
|
@ -11,7 +11,7 @@
|
|||
# army-test
|
||||
#
|
||||
,,y,army,./pytest.sh python3 ./test.py -f enterprise/multi-level/mlevel_basic.py -N 3 -L 3 -D 2
|
||||
,,y,army,./pytest.sh python3 ./test.py -f enterprise/s3/s3_basic.py -L 3 -D 1
|
||||
,,y,army,./pytest.sh python3 ./test.py -f enterprise/s3/s3Basic.py -N 3
|
||||
,,y,army,./pytest.sh python3 ./test.py -f community/cluster/snapshot.py -N 3 -L 3 -D 2
|
||||
,,y,army,./pytest.sh python3 ./test.py -f community/query/function/test_func_elapsed.py
|
||||
,,y,army,./pytest.sh python3 ./test.py -f community/query/fill/fill_desc.py -N 3 -L 3 -D 2
|
||||
|
@ -128,7 +128,7 @@
|
|||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeDb0.py -N 3 -n 3
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/ins_topics_test.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqMaxTopic.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqParamsTest.py
|
||||
#,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqParamsTest.py
|
||||
#,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqParamsTest.py -R
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqClientConsLog.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqMaxGroupIds.py
|
||||
|
|
|
@ -1209,7 +1209,7 @@ class TDTestCase:
|
|||
self.test_ddl()
|
||||
self.test_query_with_tsma()
|
||||
# bug to fix
|
||||
# self.test_flush_query()
|
||||
self.test_flush_query()
|
||||
|
||||
#cluster test
|
||||
cluster_dnode_list = tdSql.get_cluseter_dnodes()
|
||||
|
@ -1231,14 +1231,22 @@ class TDTestCase:
|
|||
# self.test_drop_ctable()
|
||||
self.test_drop_db()
|
||||
|
||||
def wait_query(self, sql: str, expected_row_num: int, timeout_in_seconds: float):
|
||||
def wait_query(self, sql: str, expected_row_num: int, timeout_in_seconds: float, is_expect_row = None):
|
||||
timeout = timeout_in_seconds
|
||||
tdSql.query(sql)
|
||||
while timeout > 0 and tdSql.getRows() != expected_row_num:
|
||||
tdLog.debug(f'start to wait query: {sql} to return {expected_row_num}, got: {tdSql.getRows()}, remain: {timeout_in_seconds - timeout}')
|
||||
rows: int = 0
|
||||
for row in tdSql.queryResult:
|
||||
if is_expect_row is None or is_expect_row(row):
|
||||
rows = rows + 1
|
||||
while timeout > 0 and rows != expected_row_num:
|
||||
tdLog.debug(f'start to wait query: {sql} to return {expected_row_num}, got: {str(tdSql.queryResult)} useful rows: {rows}, remain: {timeout_in_seconds - timeout}')
|
||||
time.sleep(1)
|
||||
timeout = timeout - 1
|
||||
tdSql.query(sql)
|
||||
rows = 0
|
||||
for row in tdSql.queryResult:
|
||||
if is_expect_row is None or is_expect_row(row):
|
||||
rows = rows + 1
|
||||
if timeout <= 0:
|
||||
tdLog.exit(f'failed to wait query: {sql} to return {expected_row_num} rows timeout: {timeout_in_seconds}s')
|
||||
else:
|
||||
|
@ -1255,7 +1263,7 @@ class TDTestCase:
|
|||
tdSql.error('drop tsma test.tsma1', -2147482491)
|
||||
tdSql.execute('drop tsma test.tsma2', queryTimes=1)
|
||||
tdSql.execute('drop tsma test.tsma1', queryTimes=1)
|
||||
self.wait_query('show transactions', 0, 10)
|
||||
self.wait_query('show transactions', 0, 10, lambda row: row[3] != 'stream-checkpo')
|
||||
tdSql.execute('drop database test', queryTimes=1)
|
||||
|
||||
self.init_data()
|
||||
|
@ -1296,7 +1304,7 @@ class TDTestCase:
|
|||
'create tsma tsma1 on nsdb.meters function(avg(c1), avg(c2), avg(t3)) interval(5m)', -2147471096)
|
||||
|
||||
tdSql.execute('alter table nsdb.meters drop tag t3', queryTimes=1)
|
||||
self.wait_query('show transactions', 0, 10)
|
||||
self.wait_query('show transactions', 0, 10, lambda row: row[3] != 'stream-checkpo')
|
||||
tdSql.execute('drop database nsdb')
|
||||
|
||||
# drop norm table
|
||||
|
@ -1323,7 +1331,7 @@ class TDTestCase:
|
|||
# test drop stream
|
||||
tdSql.error('drop stream tsma1', -2147471088) ## TSMA must be dropped first
|
||||
|
||||
self.wait_query('show transactions', 0, 10)
|
||||
self.wait_query('show transactions', 0, 10, lambda row: row[3] != 'stream-checkpo')
|
||||
tdSql.execute('drop database test', queryTimes=1)
|
||||
self.init_data()
|
||||
|
||||
|
@ -1424,7 +1432,7 @@ class TDTestCase:
|
|||
|
||||
tdSql.error(
|
||||
'create tsma tsma1 on test.meters function(avg(c1), avg(c2)) interval(2h)', -2147471097)
|
||||
self.wait_query('show transactions', 0, 10)
|
||||
self.wait_query('show transactions', 0, 10, lambda row: row[3] != 'stream-checkpo')
|
||||
tdSql.execute('drop database nsdb')
|
||||
|
||||
def test_create_tsma_on_norm_table(self):
|
||||
|
@ -1569,7 +1577,7 @@ class TDTestCase:
|
|||
tdSql.error('create tsma tsma_illegal on test.meters function(avg(c8)) interval(5m)',-2147473406)
|
||||
|
||||
def test_flush_query(self):
|
||||
tdSql.execute('insert into test.norm_tb (ts,c1_new,c2) values (now,1,2)(now+1s,2,3)(now+2s,2,3)(now+3s,2,3) (now+4s,1,2)(now+5s,2,3)(now+6s,2,3)(now+7s,2,3); select /*+ skip_tsma()*/ avg(c1_new),avg(c2) from test.norm_tb interval(10m);select avg(c1_new),avg(c2) from test.norm_tb interval(10m);select * from information_schema.ins_stream_tasks;', queryTimes=1)
|
||||
tdSql.execute('insert into test.norm_tb (ts,c1,c2) values (now,1,2)(now+1s,2,3)(now+2s,2,3)(now+3s,2,3) (now+4s,1,2)(now+5s,2,3)(now+6s,2,3)(now+7s,2,3); select /*+ skip_tsma()*/ avg(c1),avg(c2) from test.norm_tb interval(10m);select avg(c1),avg(c2) from test.norm_tb interval(10m);select * from information_schema.ins_stream_tasks;', queryTimes=1)
|
||||
tdSql.execute('flush database test', queryTimes=1)
|
||||
tdSql.query('select count(*) from test.meters', queryTimes=1)
|
||||
tdSql.checkData(0,0,100000)
|
||||
|
|
|
@ -61,8 +61,8 @@ python3 ./test.py -f 7-tmq/subscribeStb3.py
|
|||
python3 ./test.py -f 7-tmq/subscribeDb0.py -N 3 -n 3
|
||||
python3 ./test.py -f 7-tmq/ins_topics_test.py
|
||||
python3 ./test.py -f 7-tmq/tmqMaxTopic.py
|
||||
python3 ./test.py -f 7-tmq/tmqParamsTest.py
|
||||
python3 ./test.py -f 7-tmq/tmqParamsTest.py -R
|
||||
#python3 ./test.py -f 7-tmq/tmqParamsTest.py
|
||||
#python3 ./test.py -f 7-tmq/tmqParamsTest.py -R
|
||||
python3 ./test.py -f 7-tmq/tmqClientConsLog.py
|
||||
python3 ./test.py -f 7-tmq/tmqMaxGroupIds.py
|
||||
python3 ./test.py -f 7-tmq/tmqConsumeDiscontinuousData.py
|
||||
|
|
Loading…
Reference in New Issue