other: merge enh/rocksdbstate
This commit is contained in:
commit
236926b9ee
|
@ -352,4 +352,4 @@ TDengine 提供了丰富的应用程序开发接口,其中包括 C/C++、Java
|
|||
|
||||
# 加入技术交流群
|
||||
|
||||
TDengine 官方社群「物联网大数据群」对外开放,欢迎您加入讨论。搜索微信号 "tdengine",加小 T 为好友,即可入群。
|
||||
TDengine 官方社群「物联网大数据群」对外开放,欢迎您加入讨论。搜索微信号 "tdengine1",加小 T 为好友,即可入群。
|
||||
|
|
|
@ -246,6 +246,11 @@ if(${BUILD_WITH_ROCKSDB})
|
|||
option(WITH_MD_LIBRARY "build with MD" OFF)
|
||||
set(SYSTEM_LIBS ${SYSTEM_LIBS} shlwapi.lib rpcrt4.lib)
|
||||
endif(${TD_WINDOWS})
|
||||
|
||||
if(${TD_WINDOWS})
|
||||
option(WITH_MD_LIBRARY "build with MD" OFF)
|
||||
set(SYSTEM_LIBS ${SYSTEM_LIBS} shlwapi.lib rpcrt4.lib)
|
||||
endif(${TD_WINDOWS})
|
||||
|
||||
|
||||
option(WITH_FALLOCATE "" OFF)
|
||||
|
|
|
@ -207,8 +207,6 @@ static FORCE_INLINE void streamQueueProcessFail(SStreamQueue* queue) {
|
|||
atomic_store_8(&queue->status, STREAM_QUEUE__FAILED);
|
||||
}
|
||||
|
||||
static FORCE_INLINE void* streamQueueCurItem(SStreamQueue* queue) { return queue->qItem; }
|
||||
|
||||
void* streamQueueNextItem(SStreamQueue* queue);
|
||||
|
||||
SStreamDataSubmit2* streamDataSubmitNew(SPackedData submit, int32_t type);
|
||||
|
@ -241,6 +239,7 @@ typedef struct {
|
|||
void* vnode; // not available to encoder and decoder
|
||||
FTbSink* tbSinkFunc;
|
||||
STSchema* pTSchema;
|
||||
SSHashObj* pTblInfo;
|
||||
} STaskSinkTb;
|
||||
|
||||
typedef void FSmaSink(void* vnode, int64_t smaId, const SArray* data);
|
||||
|
|
|
@ -55,7 +55,7 @@ else
|
|||
exit $?
|
||||
fi
|
||||
while true; do
|
||||
es=$(taos -h $FIRST_EP_HOST -P $FIRST_EP_PORT --check)
|
||||
es=$(taos -h $FIRST_EP_HOST -P $FIRST_EP_PORT --check | grep "^[0-9]*:")
|
||||
echo ${es}
|
||||
if [ "${es%%:*}" -eq 2 ]; then
|
||||
echo "execute create dnode"
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
#!/bin/sh
|
||||
es=$(taos --check)
|
||||
es=$(taos --check | grep "^[0-9]*:")
|
||||
code=${es%%:*}
|
||||
if [ "$code" -ne "0" ] && [ "$code" -ne "4" ]; then
|
||||
exit 0
|
||||
|
|
|
@ -1359,7 +1359,7 @@ static int32_t smlInsertData(SSmlHandle *info) {
|
|||
}
|
||||
taosArrayPush(info->pRequest->tableList, &pName);
|
||||
|
||||
tstrncpy(pName.tname, tableData->childTableName, strlen(tableData->childTableName) + 1);
|
||||
strcpy(pName.tname, tableData->childTableName);
|
||||
|
||||
SRequestConnInfo conn = {0};
|
||||
conn.pTrans = info->taos->pAppInfo->pTransporter;
|
||||
|
|
|
@ -84,7 +84,7 @@ bool tsMonitorComp = false;
|
|||
// telem
|
||||
bool tsEnableTelem = true;
|
||||
int32_t tsTelemInterval = 43200;
|
||||
char tsTelemServer[TSDB_FQDN_LEN] = "telemetry.taosdata.com";
|
||||
char tsTelemServer[TSDB_FQDN_LEN] = "telemetry.tdengine.com";
|
||||
uint16_t tsTelemPort = 80;
|
||||
char *tsTelemUri = "/report";
|
||||
|
||||
|
|
|
@ -87,18 +87,6 @@ static void dmStopDnode(int signum, void *sigInfo, void *context) {
|
|||
}
|
||||
|
||||
void dmLogCrash(int signum, void *sigInfo, void *context) {
|
||||
taosIgnSignal(SIGTERM);
|
||||
taosIgnSignal(SIGHUP);
|
||||
taosIgnSignal(SIGINT);
|
||||
taosIgnSignal(SIGBREAK);
|
||||
|
||||
#ifndef WINDOWS
|
||||
taosIgnSignal(SIGBUS);
|
||||
#endif
|
||||
taosIgnSignal(SIGABRT);
|
||||
taosIgnSignal(SIGFPE);
|
||||
taosIgnSignal(SIGSEGV);
|
||||
|
||||
char *pMsg = NULL;
|
||||
const char *flags = "UTL FATAL ";
|
||||
ELogLevel level = DEBUG_FATAL;
|
||||
|
|
|
@ -256,10 +256,13 @@ static int32_t mndProcessConnectReq(SRpcMsg *pReq) {
|
|||
snprintf(db, TSDB_DB_FNAME_LEN, "%d%s%s", pUser->acctId, TS_PATH_DELIMITER, connReq.db);
|
||||
pDb = mndAcquireDb(pMnode, db);
|
||||
if (pDb == NULL) {
|
||||
terrno = TSDB_CODE_MND_INVALID_DB;
|
||||
mGError("user:%s, failed to login from %s while use db:%s since %s", pReq->info.conn.user, ip, connReq.db,
|
||||
terrstr());
|
||||
goto _OVER;
|
||||
if (0 != strcmp(connReq.db, TSDB_INFORMATION_SCHEMA_DB) &&
|
||||
(0 != strcmp(connReq.db, TSDB_PERFORMANCE_SCHEMA_DB))) {
|
||||
terrno = TSDB_CODE_MND_INVALID_DB;
|
||||
mGError("user:%s, failed to login from %s while use db:%s since %s", pReq->info.conn.user, ip, connReq.db,
|
||||
terrstr());
|
||||
goto _OVER;
|
||||
}
|
||||
}
|
||||
|
||||
if (mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_READ_OR_WRITE_DB, pDb) != 0) {
|
||||
|
|
|
@ -261,13 +261,13 @@ int32_t tqReaderAddTbUidList(STqReader *pReader, const SArray *pTableUidList);
|
|||
int32_t tqReaderRemoveTbUidList(STqReader *pReader, const SArray *tbUidList);
|
||||
|
||||
int32_t tqSeekVer(STqReader *pReader, int64_t ver, const char *id);
|
||||
int32_t tqNextBlockInWal(STqReader *pReader);
|
||||
bool tqNextBlockImpl(STqReader *pReader);
|
||||
int32_t tqNextBlockInWal(STqReader* pReader);
|
||||
bool tqNextBlockImpl(STqReader *pReader, const char* idstr);
|
||||
|
||||
int32_t extractSubmitMsgFromWal(SWalReader *pReader, SPackedData *pPackedData);
|
||||
int32_t tqReaderSetSubmitMsg(STqReader *pReader, void *msgStr, int32_t msgLen, int64_t ver);
|
||||
bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids);
|
||||
int32_t tqRetrieveDataBlock(STqReader *pReader, SSubmitTbData **pSubmitTbDataRet);
|
||||
int32_t tqRetrieveDataBlock(STqReader *pReader, const char* idstr);
|
||||
int32_t tqRetrieveTaosxBlock(STqReader *pReader, SArray *blocks, SArray *schemas, SSubmitTbData **pSubmitTbDataRet);
|
||||
|
||||
int32_t vnodeEnqueueStreamMsg(SVnode *pVnode, SRpcMsg *pMsg);
|
||||
|
|
|
@ -101,6 +101,8 @@ typedef struct {
|
|||
STqPushHandle pushHandle; // push
|
||||
STqExecHandle execHandle; // exec
|
||||
SRpcMsg* msg;
|
||||
int32_t noDataPollCnt;
|
||||
int8_t exec;
|
||||
} STqHandle;
|
||||
|
||||
typedef struct {
|
||||
|
|
|
@ -753,6 +753,10 @@ end:
|
|||
return ret;
|
||||
}
|
||||
|
||||
void freePtr(void *ptr) {
|
||||
taosMemoryFree(*(void**)ptr);
|
||||
}
|
||||
|
||||
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
|
||||
int32_t vgId = TD_VID(pTq->pVnode);
|
||||
pTask->id.idStr = createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId);
|
||||
|
@ -789,6 +793,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId);
|
||||
} else if (pTask->taskLevel == TASK_LEVEL__AGG) {
|
||||
pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1);
|
||||
if (pTask->pState == NULL) {
|
||||
|
@ -802,10 +807,11 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
|
|||
if (pTask->exec.pExecutor == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId);
|
||||
}
|
||||
|
||||
// sink
|
||||
/*pTask->ahandle = pTq->pVnode;*/
|
||||
if (pTask->outputType == TASK_OUTPUT__SMA) {
|
||||
pTask->smaSink.vnode = pTq->pVnode;
|
||||
pTask->smaSink.smaSink = smaHandleRes;
|
||||
|
@ -825,6 +831,8 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
|
|||
if (pTask->tbSink.pTSchema == NULL) {
|
||||
return -1;
|
||||
}
|
||||
pTask->tbSink.pTblInfo = tSimpleHashInit(10240, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
|
||||
tSimpleHashSetFreeFp(pTask->tbSink.pTblInfo, freePtr);
|
||||
}
|
||||
|
||||
if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
|
||||
|
|
|
@ -310,6 +310,7 @@ int32_t tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg) {
|
|||
memcpy(pHandle->msg, pMsg, sizeof(SRpcMsg));
|
||||
pHandle->msg->pCont = rpcMallocCont(pMsg->contLen);
|
||||
} else {
|
||||
tqPushDataRsp(pTq, pHandle);
|
||||
void* tmp = pHandle->msg->pCont;
|
||||
memcpy(pHandle->msg, pMsg, sizeof(SRpcMsg));
|
||||
pHandle->msg->pCont = tmp;
|
||||
|
|
|
@ -419,15 +419,15 @@ int32_t tqReaderSetSubmitMsg(STqReader* pReader, void* msgStr, int32_t msgLen, i
|
|||
return 0;
|
||||
}
|
||||
|
||||
bool tqNextBlockImpl(STqReader* pReader) {
|
||||
bool tqNextBlockImpl(STqReader* pReader, const char* idstr) {
|
||||
if (pReader->msg.msgStr == NULL) {
|
||||
return false;
|
||||
}
|
||||
|
||||
int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData);
|
||||
while (pReader->nextBlk < blockSz) {
|
||||
tqDebug("tq reader next data block %p, %d %" PRId64 " %d", pReader->msg.msgStr, pReader->msg.msgLen,
|
||||
pReader->msg.ver, pReader->nextBlk);
|
||||
int32_t numOfBlocks = taosArrayGetSize(pReader->submit.aSubmitTbData);
|
||||
while (pReader->nextBlk < numOfBlocks) {
|
||||
tqDebug("tq reader next data block, len:%d ver:%" PRId64 " index:%d/%d, %s", pReader->msg.msgLen,
|
||||
pReader->msg.ver, pReader->nextBlk, numOfBlocks, idstr);
|
||||
|
||||
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
|
||||
if (pReader->tbIdHash == NULL) {
|
||||
|
@ -503,13 +503,11 @@ int32_t tqMaskBlock(SSchemaWrapper* pDst, SSDataBlock* pBlock, const SSchemaWrap
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t tqRetrieveDataBlock(STqReader* pReader, SSubmitTbData** pSubmitTbDataRet) {
|
||||
tqDebug("tq reader retrieve data block %p, index:%d", pReader->msg.msgStr, pReader->nextBlk);
|
||||
int32_t tqRetrieveDataBlock(STqReader* pReader, const char* idstr) {
|
||||
tqDebug("tq reader retrieve data block %p, index:%d/%d, %s", pReader->msg.msgStr, pReader->nextBlk,
|
||||
(int32_t)taosArrayGetSize(pReader->submit.aSubmitTbData), idstr);
|
||||
|
||||
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk++);
|
||||
if (pSubmitTbDataRet) {
|
||||
*pSubmitTbDataRet = pSubmitTbData;
|
||||
}
|
||||
|
||||
SSDataBlock* pBlock = pReader->pResBlock;
|
||||
blockDataCleanup(pBlock);
|
||||
|
@ -674,12 +672,10 @@ int32_t tqRetrieveDataBlock(STqReader* pReader, SSubmitTbData** pSubmitTbDataRet
|
|||
SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, j);
|
||||
while (1) {
|
||||
SColVal colVal;
|
||||
tqDebug("start to extract column id:%d, index:%d", pColData->info.colId, sourceIdx);
|
||||
|
||||
tRowGet(pRow, pTSchema, sourceIdx, &colVal);
|
||||
if (colVal.cid < pColData->info.colId) {
|
||||
tqDebug("colIndex:%d column id:%d in row, ignore, the required colId:%d, total cols in schema:%d",
|
||||
sourceIdx, colVal.cid, pColData->info.colId, pTSchema->numOfCols);
|
||||
// tqDebug("colIndex:%d column id:%d in row, ignore, the required colId:%d, total cols in schema:%d",
|
||||
// sourceIdx, colVal.cid, pColData->info.colId, pTSchema->numOfCols);
|
||||
sourceIdx++;
|
||||
continue;
|
||||
} else if (colVal.cid == pColData->info.colId) {
|
||||
|
|
|
@ -57,29 +57,6 @@ int32_t tqStreamTasksScanWal(STQ* pTq) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
<<<<<<< HEAD
|
||||
static SArray* extractTaskIdList(SStreamMeta* pStreamMeta, int32_t numOfTasks) {
|
||||
SArray* pTaskIdList = taosArrayInit(numOfTasks, sizeof(int32_t));
|
||||
=======
|
||||
int32_t streamTaskReplayWal(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetStore, bool* pScanIdle) {
|
||||
>>>>>>> enh/3.0
|
||||
void* pIter = NULL;
|
||||
|
||||
taosWLockLatch(&pStreamMeta->lock);
|
||||
while (1) {
|
||||
pIter = taosHashIterate(pStreamMeta->pTasks, pIter);
|
||||
if (pIter == NULL) {
|
||||
break;
|
||||
}
|
||||
|
||||
SStreamTask* pTask = *(SStreamTask**)pIter;
|
||||
taosArrayPush(pTaskIdList, &pTask->id.taskId);
|
||||
}
|
||||
|
||||
taosWUnLockLatch(&pStreamMeta->lock);
|
||||
return pTaskIdList;
|
||||
}
|
||||
|
||||
int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
|
||||
*pScanIdle = true;
|
||||
bool noNewDataInWal = true;
|
||||
|
@ -143,6 +120,8 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
|
|||
continue;
|
||||
}
|
||||
|
||||
|
||||
|
||||
// append the data for the stream
|
||||
tqDebug("vgId:%d s-task:%s wal reader seek to ver:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.currentVer);
|
||||
} else {
|
||||
|
|
|
@ -205,7 +205,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
|
|||
if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
|
||||
STqReader* pReader = pExec->pTqReader;
|
||||
tqReaderSetSubmitMsg(pReader, submit.msgStr, submit.msgLen, submit.ver);
|
||||
while (tqNextBlockImpl(pReader)) {
|
||||
while (tqNextBlockImpl(pReader, NULL)) {
|
||||
taosArrayClear(pBlocks);
|
||||
taosArrayClear(pSchemas);
|
||||
SSubmitTbData* pSubmitTbDataRet = NULL;
|
||||
|
|
|
@ -17,6 +17,13 @@
|
|||
#include "tmsg.h"
|
||||
#include "tq.h"
|
||||
|
||||
#define MAX_CATCH_NUM 10240
|
||||
|
||||
typedef struct STblInfo {
|
||||
uint64_t uid;
|
||||
char tbName[TSDB_TABLE_NAME_LEN];
|
||||
} STblInfo;
|
||||
|
||||
int32_t tqBuildDeleteReq(const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq,
|
||||
const char* pIdStr) {
|
||||
int32_t totalRows = pDataBlock->info.rows;
|
||||
|
@ -90,6 +97,22 @@ end:
|
|||
return ret;
|
||||
}
|
||||
|
||||
int32_t tqGetTableInfo(SSHashObj* tblInfo ,uint64_t groupId, STblInfo** pTbl) {
|
||||
void* pVal = tSimpleHashGet(tblInfo, &groupId, sizeof(uint64_t));
|
||||
if (pVal) {
|
||||
*pTbl = *(STblInfo**)pVal;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
int32_t tqPutTableInfo(SSHashObj* tblInfo ,uint64_t groupId, STblInfo* pTbl) {
|
||||
if (tSimpleHashGetSize(tblInfo) > MAX_CATCH_NUM) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
return tSimpleHashPut(tblInfo, &groupId, sizeof(uint64_t), &pTbl, POINTER_BYTES);
|
||||
}
|
||||
|
||||
int32_t tqPutReqToQueue(SVnode* pVnode, SVCreateTbBatchReq* pReqs) {
|
||||
void* buf = NULL;
|
||||
int32_t tlen = 0;
|
||||
|
@ -260,100 +283,112 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d
|
|||
tbData.suid = suid;
|
||||
tbData.uid = 0; // uid is assigned by vnode
|
||||
tbData.sver = pTSchema->version;
|
||||
STblInfo* pTblMeta = NULL;
|
||||
|
||||
char* ctbName = NULL;
|
||||
tqDebug("vgId:%d, stream write into %s, table auto created", TD_VID(pVnode), pDataBlock->info.parTbName);
|
||||
if (pDataBlock->info.parTbName[0]) {
|
||||
ctbName = taosStrdup(pDataBlock->info.parTbName);
|
||||
} else {
|
||||
ctbName = buildCtbNameByGroupId(stbFullName, pDataBlock->info.id.groupId);
|
||||
int32_t res = tqGetTableInfo(pTask->tbSink.pTblInfo, pDataBlock->info.id.groupId, &pTblMeta);
|
||||
if (res != TSDB_CODE_SUCCESS) {
|
||||
pTblMeta = taosMemoryCalloc(1, sizeof(STblInfo));
|
||||
}
|
||||
|
||||
SMetaReader mr = {0};
|
||||
metaReaderInit(&mr, pVnode->pMeta, 0);
|
||||
if (metaGetTableEntryByName(&mr, ctbName) < 0) {
|
||||
metaReaderClear(&mr);
|
||||
tqDebug("vgId:%d, stream write into %s, table auto created", TD_VID(pVnode), ctbName);
|
||||
|
||||
SVCreateTbReq* pCreateTbReq = NULL;
|
||||
|
||||
if (!(pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateStbReq)))) {
|
||||
taosMemoryFree(ctbName);
|
||||
goto _end;
|
||||
};
|
||||
|
||||
// set const
|
||||
pCreateTbReq->flags = 0;
|
||||
pCreateTbReq->type = TSDB_CHILD_TABLE;
|
||||
pCreateTbReq->ctb.suid = suid;
|
||||
|
||||
// set super table name
|
||||
SName name = {0};
|
||||
tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
||||
pCreateTbReq->ctb.stbName = taosStrdup((char*)tNameGetTableName(&name)); // taosStrdup(stbFullName);
|
||||
|
||||
// set tag content
|
||||
tagArray = taosArrayInit(1, sizeof(STagVal));
|
||||
if (!tagArray) {
|
||||
taosMemoryFree(ctbName);
|
||||
tdDestroySVCreateTbReq(pCreateTbReq);
|
||||
goto _end;
|
||||
char* ctbName = pDataBlock->info.parTbName;
|
||||
if (!ctbName[0]) {
|
||||
if (res == TSDB_CODE_SUCCESS) {
|
||||
memcpy(ctbName, pTblMeta->tbName, strlen(pTblMeta->tbName));
|
||||
} else {
|
||||
char* tmp = buildCtbNameByGroupId(stbFullName, pDataBlock->info.id.groupId);
|
||||
memcpy(ctbName, tmp, strlen(tmp));
|
||||
memcpy(pTblMeta->tbName, tmp, strlen(tmp));
|
||||
taosMemoryFree(tmp);
|
||||
tqDebug("vgId:%d, gropuid:%" PRIu64 " datablock tabel name is null", TD_VID(pVnode),
|
||||
pDataBlock->info.id.groupId);
|
||||
}
|
||||
STagVal tagVal = {
|
||||
.cid = pTSchema->numOfCols + 1,
|
||||
.type = TSDB_DATA_TYPE_UBIGINT,
|
||||
.i64 = (int64_t)pDataBlock->info.id.groupId,
|
||||
};
|
||||
taosArrayPush(tagArray, &tagVal);
|
||||
pCreateTbReq->ctb.tagNum = taosArrayGetSize(tagArray);
|
||||
}
|
||||
|
||||
STag* pTag = NULL;
|
||||
tTagNew(tagArray, 1, false, &pTag);
|
||||
tagArray = taosArrayDestroy(tagArray);
|
||||
if (pTag == NULL) {
|
||||
taosMemoryFree(ctbName);
|
||||
tdDestroySVCreateTbReq(pCreateTbReq);
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
taosMemoryFree(ctbName);
|
||||
tdDestroySVCreateTbReq(pCreateTbReq);
|
||||
goto _end;
|
||||
}
|
||||
pCreateTbReq->ctb.pTag = (uint8_t*)pTag;
|
||||
|
||||
// set tag name
|
||||
SArray* tagName = taosArrayInit(1, TSDB_COL_NAME_LEN);
|
||||
char tagNameStr[TSDB_COL_NAME_LEN] = {0};
|
||||
strcpy(tagNameStr, "group_id");
|
||||
taosArrayPush(tagName, tagNameStr);
|
||||
pCreateTbReq->ctb.tagName = tagName;
|
||||
|
||||
// set table name
|
||||
pCreateTbReq->name = ctbName;
|
||||
ctbName = NULL;
|
||||
|
||||
tbData.pCreateTbReq = pCreateTbReq;
|
||||
tbData.flags = SUBMIT_REQ_AUTO_CREATE_TABLE;
|
||||
if (res == TSDB_CODE_SUCCESS) {
|
||||
tbData.uid = pTblMeta->uid;
|
||||
} else {
|
||||
if (mr.me.type != TSDB_CHILD_TABLE) {
|
||||
tqError("vgId:%d, failed to write into %s, since table type incorrect, type %d", TD_VID(pVnode), ctbName,
|
||||
mr.me.type);
|
||||
SMetaReader mr = {0};
|
||||
metaReaderInit(&mr, pVnode->pMeta, 0);
|
||||
if (metaGetTableEntryByName(&mr, ctbName) < 0) {
|
||||
metaReaderClear(&mr);
|
||||
taosMemoryFree(ctbName);
|
||||
continue;
|
||||
}
|
||||
taosMemoryFree(pTblMeta);
|
||||
tqDebug("vgId:%d, stream write into %s, table auto created", TD_VID(pVnode), ctbName);
|
||||
|
||||
if (mr.me.ctbEntry.suid != suid) {
|
||||
tqError("vgId:%d, failed to write into %s, since suid mismatch, expect suid: %" PRId64
|
||||
", actual suid %" PRId64 "",
|
||||
TD_VID(pVnode), ctbName, suid, mr.me.ctbEntry.suid);
|
||||
SVCreateTbReq* pCreateTbReq = NULL;
|
||||
|
||||
if (!(pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateStbReq)))) {
|
||||
goto _end;
|
||||
};
|
||||
|
||||
// set const
|
||||
pCreateTbReq->flags = 0;
|
||||
pCreateTbReq->type = TSDB_CHILD_TABLE;
|
||||
pCreateTbReq->ctb.suid = suid;
|
||||
|
||||
// set super table name
|
||||
SName name = {0};
|
||||
tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
||||
pCreateTbReq->ctb.stbName = taosStrdup((char*)tNameGetTableName(&name)); // taosStrdup(stbFullName);
|
||||
|
||||
// set tag content
|
||||
tagArray = taosArrayInit(1, sizeof(STagVal));
|
||||
if (!tagArray) {
|
||||
tdDestroySVCreateTbReq(pCreateTbReq);
|
||||
goto _end;
|
||||
}
|
||||
STagVal tagVal = {
|
||||
.cid = pTSchema->numOfCols + 1,
|
||||
.type = TSDB_DATA_TYPE_UBIGINT,
|
||||
.i64 = (int64_t)pDataBlock->info.id.groupId,
|
||||
};
|
||||
taosArrayPush(tagArray, &tagVal);
|
||||
pCreateTbReq->ctb.tagNum = taosArrayGetSize(tagArray);
|
||||
|
||||
STag* pTag = NULL;
|
||||
tTagNew(tagArray, 1, false, &pTag);
|
||||
tagArray = taosArrayDestroy(tagArray);
|
||||
if (pTag == NULL) {
|
||||
tdDestroySVCreateTbReq(pCreateTbReq);
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _end;
|
||||
}
|
||||
pCreateTbReq->ctb.pTag = (uint8_t*)pTag;
|
||||
|
||||
// set tag name
|
||||
SArray* tagName = taosArrayInit(1, TSDB_COL_NAME_LEN);
|
||||
char tagNameStr[TSDB_COL_NAME_LEN] = {0};
|
||||
strcpy(tagNameStr, "group_id");
|
||||
taosArrayPush(tagName, tagNameStr);
|
||||
pCreateTbReq->ctb.tagName = tagName;
|
||||
|
||||
// set table name
|
||||
pCreateTbReq->name = taosStrdup(ctbName);
|
||||
|
||||
tbData.pCreateTbReq = pCreateTbReq;
|
||||
tbData.flags = SUBMIT_REQ_AUTO_CREATE_TABLE;
|
||||
} else {
|
||||
if (mr.me.type != TSDB_CHILD_TABLE) {
|
||||
tqError("vgId:%d, failed to write into %s, since table type incorrect, type %d", TD_VID(pVnode), ctbName,
|
||||
mr.me.type);
|
||||
metaReaderClear(&mr);
|
||||
taosMemoryFree(pTblMeta);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (mr.me.ctbEntry.suid != suid) {
|
||||
tqError("vgId:%d, failed to write into %s, since suid mismatch, expect suid: %" PRId64
|
||||
", actual suid %" PRId64 "",
|
||||
TD_VID(pVnode), ctbName, suid, mr.me.ctbEntry.suid);
|
||||
metaReaderClear(&mr);
|
||||
taosMemoryFree(pTblMeta);
|
||||
continue;
|
||||
}
|
||||
|
||||
tbData.uid = mr.me.uid;
|
||||
pTblMeta->uid = mr.me.uid;
|
||||
tqPutTableInfo(pTask->tbSink.pTblInfo, pDataBlock->info.id.groupId, pTblMeta);
|
||||
metaReaderClear(&mr);
|
||||
taosMemoryFree(ctbName);
|
||||
continue;
|
||||
}
|
||||
|
||||
tbData.uid = mr.me.uid;
|
||||
metaReaderClear(&mr);
|
||||
taosMemoryFreeClear(ctbName);
|
||||
}
|
||||
|
||||
// rows
|
||||
|
|
|
@ -16,12 +16,13 @@
|
|||
#include "tq.h"
|
||||
|
||||
#define IS_OFFSET_RESET_TYPE(_t) ((_t) < 0)
|
||||
#define NO_POLL_CNT 5
|
||||
|
||||
static int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqMetaRsp* pRsp, int32_t vgId);
|
||||
|
||||
char* createStreamTaskIdStr(int64_t streamId, int32_t taskId) {
|
||||
char buf[128] = {0};
|
||||
sprintf(buf, "0x%" PRIx64 "-%d", streamId, taskId);
|
||||
sprintf(buf, "0x%" PRIx64 "-0x%x", streamId, taskId);
|
||||
return taosStrdup(buf);
|
||||
}
|
||||
|
||||
|
@ -234,6 +235,10 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
|
|||
return 0;
|
||||
}
|
||||
|
||||
static bool isHandleExecuting(STqHandle* pHandle){
|
||||
return 1 == atomic_load_8(&pHandle->exec);
|
||||
}
|
||||
|
||||
static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
|
||||
SRpcMsg* pMsg, STqOffsetVal* pOffset) {
|
||||
char buf[80] = {0};
|
||||
|
@ -251,6 +256,12 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
|
|||
return code;
|
||||
}
|
||||
|
||||
while(isHandleExecuting(pHandle)){
|
||||
tqInfo("sub is executing, pHandle:%p", pHandle);
|
||||
taosMsleep(5);
|
||||
}
|
||||
atomic_store_8(&pHandle->exec, 1);
|
||||
|
||||
qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId);
|
||||
code = tqScanData(pTq, pHandle, &dataRsp, pOffset);
|
||||
if(code != 0) {
|
||||
|
@ -260,17 +271,23 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
|
|||
// till now, all data has been transferred to consumer, new data needs to push client once arrived.
|
||||
if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG &&
|
||||
dataRsp.reqOffset.version == dataRsp.rspOffset.version && pHandle->consumerId == pRequest->consumerId) {
|
||||
// lock
|
||||
taosWLockLatch(&pTq->lock);
|
||||
code = tqRegisterPushHandle(pTq, pHandle, pMsg);
|
||||
taosWUnLockLatch(&pTq->lock);
|
||||
tDeleteSMqDataRsp(&dataRsp);
|
||||
return code;
|
||||
if(pHandle->noDataPollCnt >= NO_POLL_CNT){ // send poll result to client if no data 5 times to avoid lost data
|
||||
pHandle->noDataPollCnt = 0;
|
||||
// lock
|
||||
taosWLockLatch(&pTq->lock);
|
||||
code = tqRegisterPushHandle(pTq, pHandle, pMsg);
|
||||
taosWUnLockLatch(&pTq->lock);
|
||||
tDeleteSMqDataRsp(&dataRsp);
|
||||
atomic_store_8(&pHandle->exec, 0);
|
||||
return code;
|
||||
}
|
||||
else{
|
||||
pHandle->noDataPollCnt++;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_RSP);
|
||||
|
||||
// NOTE: this pHandle->consumerId may have been changed already.
|
||||
|
||||
end:
|
||||
|
@ -279,45 +296,15 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
|
|||
tFormatOffset(buf, 80, &dataRsp.rspOffset);
|
||||
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, rsp offset type:%s, reqId:0x%" PRIx64 " code:%d",
|
||||
consumerId, pHandle->subKey, vgId, dataRsp.blockNum, buf, pRequest->reqId, code);
|
||||
// taosWUnLockLatch(&pTq->lock);
|
||||
tDeleteSMqDataRsp(&dataRsp);
|
||||
tDeleteMqDataRsp(&dataRsp);
|
||||
}
|
||||
=======
|
||||
tqInitDataRsp(&dataRsp, pRequest);
|
||||
atomic_store_8(&pHandle->exec, 0);
|
||||
|
||||
// lock
|
||||
taosWLockLatch(&pTq->lock);
|
||||
|
||||
qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId);
|
||||
|
||||
int code = tqScanData(pTq, pHandle, &dataRsp, pOffset);
|
||||
if (code == 0) {
|
||||
|
||||
// till now, all data has been transferred to consumer, new data needs to push client once arrived.
|
||||
if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG &&
|
||||
dataRsp.reqOffset.version == dataRsp.rspOffset.version && pHandle->consumerId == pRequest->consumerId) {
|
||||
code = tqRegisterPushHandle(pTq, pHandle, pRequest, pMsg, &dataRsp, TMQ_MSG_TYPE__POLL_RSP);
|
||||
taosWUnLockLatch(&pTq->lock);
|
||||
return code;
|
||||
}
|
||||
|
||||
// NOTE: this pHandle->consumerId may have been changed already.
|
||||
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_RSP, vgId);
|
||||
}
|
||||
|
||||
tFormatOffset(buf, 80, &dataRsp.rspOffset);
|
||||
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, rsp offset type:%s, reqId:0x%" PRIx64
|
||||
" code:%d",
|
||||
consumerId, pHandle->subKey, vgId, dataRsp.blockNum, buf, pRequest->reqId, code);
|
||||
taosWUnLockLatch(&pTq->lock);
|
||||
tDeleteMqDataRsp(&dataRsp);
|
||||
|
||||
>>>>>>> enh/3.0
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg, STqOffsetVal *offset) {
|
||||
int code = 0;
|
||||
int code = 0;
|
||||
int32_t vgId = TD_VID(pTq->pVnode);
|
||||
SWalCkHead* pCkHead = NULL;
|
||||
SMqMetaRsp metaRsp = {0};
|
||||
|
@ -330,10 +317,16 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
|
|||
return code;
|
||||
}
|
||||
|
||||
while(isHandleExecuting(pHandle)){
|
||||
tqInfo("sub is executing, pHandle:%p", pHandle);
|
||||
taosMsleep(5);
|
||||
}
|
||||
atomic_store_8(&pHandle->exec, 1);
|
||||
|
||||
if (offset->type != TMQ_OFFSET__LOG) {
|
||||
if (tqScanTaosx(pTq, pHandle, &taosxRsp, &metaRsp, offset) < 0) {
|
||||
tDeleteSTaosxRsp(&taosxRsp);
|
||||
return -1;
|
||||
code = -1;
|
||||
goto end;
|
||||
}
|
||||
|
||||
if (metaRsp.metaRspLen > 0) {
|
||||
|
@ -341,16 +334,14 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
|
|||
tqDebug("tmq poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send meta offset type:%d,uid:%" PRId64 ",ts:%" PRId64,
|
||||
pRequest->consumerId, pHandle->subKey, vgId, metaRsp.rspOffset.type, metaRsp.rspOffset.uid, metaRsp.rspOffset.ts);
|
||||
taosMemoryFree(metaRsp.metaRsp);
|
||||
tDeleteSTaosxRsp(&taosxRsp);
|
||||
return code;
|
||||
goto end;
|
||||
}
|
||||
|
||||
tqDebug("taosx poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send data blockNum:%d, offset type:%d,uid:%" PRId64
|
||||
",ts:%" PRId64,pRequest->consumerId, pHandle->subKey, vgId, taosxRsp.blockNum, taosxRsp.rspOffset.type, taosxRsp.rspOffset.uid,taosxRsp.rspOffset.ts);
|
||||
if (taosxRsp.blockNum > 0) {
|
||||
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP, vgId);
|
||||
tDeleteSTaosxRsp(&taosxRsp);
|
||||
return code;
|
||||
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
|
||||
goto end;
|
||||
}else {
|
||||
*offset = taosxRsp.rspOffset;
|
||||
}
|
||||
|
@ -361,9 +352,9 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
|
|||
int64_t fetchVer = offset->version + 1;
|
||||
pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048);
|
||||
if (pCkHead == NULL) {
|
||||
tDeleteSTaosxRsp(&taosxRsp);
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
code = -1;
|
||||
goto end;
|
||||
}
|
||||
walSetReaderCapacity(pHandle->pWalReader, 2048);
|
||||
int totalRows = 0;
|
||||
|
@ -378,10 +369,8 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
|
|||
|
||||
if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead, pRequest->reqId) < 0) {
|
||||
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
|
||||
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP, vgId);
|
||||
tDeleteSTaosxRsp(&taosxRsp);
|
||||
taosMemoryFreeClear(pCkHead);
|
||||
return code;
|
||||
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
|
||||
goto end;
|
||||
}
|
||||
|
||||
SWalCont* pHead = &pCkHead->head;
|
||||
|
@ -392,10 +381,8 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
|
|||
if (pHead->msgType != TDMT_VND_SUBMIT) {
|
||||
if(totalRows > 0) {
|
||||
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer - 1);
|
||||
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP, vgId);
|
||||
tDeleteSTaosxRsp(&taosxRsp);
|
||||
taosMemoryFreeClear(pCkHead);
|
||||
return code;
|
||||
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
|
||||
goto end;
|
||||
}
|
||||
|
||||
tqDebug("fetch meta msg, ver:%" PRId64 ", type:%s", pHead->version, TMSG_INFO(pHead->msgType));
|
||||
|
@ -403,17 +390,8 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
|
|||
metaRsp.resMsgType = pHead->msgType;
|
||||
metaRsp.metaRspLen = pHead->bodyLen;
|
||||
metaRsp.metaRsp = pHead->body;
|
||||
if (tqSendMetaPollRsp(pHandle, pMsg, pRequest, &metaRsp, vgId) < 0) {
|
||||
code = -1;
|
||||
taosMemoryFreeClear(pCkHead);
|
||||
tDeleteSTaosxRsp(&taosxRsp);
|
||||
return code;
|
||||
}
|
||||
|
||||
code = 0;
|
||||
taosMemoryFreeClear(pCkHead);
|
||||
tDeleteSTaosxRsp(&taosxRsp);
|
||||
return code;
|
||||
code = tqSendMetaPollRsp(pTq, pMsg, pRequest, &metaRsp);
|
||||
goto end;
|
||||
}
|
||||
|
||||
// process data
|
||||
|
@ -423,29 +401,28 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
|
|||
.ver = pHead->version,
|
||||
};
|
||||
|
||||
if (tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp, &totalRows) < 0) {
|
||||
tqError("tmq poll: tqTaosxScanLog error %" PRId64 ", in vgId:%d, subkey %s", pRequest->consumerId, vgId,
|
||||
pRequest->subKey);
|
||||
taosMemoryFreeClear(pCkHead);
|
||||
tDeleteSTaosxRsp(&taosxRsp);
|
||||
return -1;
|
||||
code = tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp, &totalRows);
|
||||
if (code < 0) {
|
||||
tqError("tmq poll: tqTaosxScanLog error %" PRId64 ", in vgId:%d, subkey %s", pRequest->consumerId, vgId, pRequest->subKey);
|
||||
goto end;
|
||||
}
|
||||
|
||||
if (totalRows >= 4096 || taosxRsp.createTableNum > 0) {
|
||||
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
|
||||
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP, vgId);
|
||||
tDeleteSTaosxRsp(&taosxRsp);
|
||||
taosMemoryFreeClear(pCkHead);
|
||||
return code;
|
||||
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
|
||||
goto end;
|
||||
} else {
|
||||
fetchVer++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
end:
|
||||
atomic_store_8(&pHandle->exec, 0);
|
||||
|
||||
tDeleteSTaosxRsp(&taosxRsp);
|
||||
taosMemoryFreeClear(pCkHead);
|
||||
return 0;
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg) {
|
||||
|
|
|
@ -1437,12 +1437,12 @@ _return:
|
|||
SMetaRes* pRes = taosArrayGet(ctx->pResList, pFetch->resIdx);
|
||||
pRes->code = code;
|
||||
pRes->pRes = NULL;
|
||||
ctgTaskError("Get table %d.%s.%s meta failed with error %s", pName->acctId, pName->dbname, pName->tname,
|
||||
tstrerror(code));
|
||||
if (0 == atomic_sub_fetch_32(&ctx->fetchNum, 1)) {
|
||||
TSWAP(pTask->res, ctx->pResList);
|
||||
taskDone = true;
|
||||
}
|
||||
ctgTaskError("Get table %d.%s.%s meta failed with error %s", pName->acctId, pName->dbname, pName->tname,
|
||||
tstrerror(code));
|
||||
}
|
||||
|
||||
if (pTask->res && taskDone) {
|
||||
|
|
|
@ -1484,14 +1484,23 @@ static int32_t setSelectValueColumnInfo(SqlFunctionCtx* pCtx, int32_t numOfOutpu
|
|||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
SHashObj *pSelectFuncs = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
|
||||
for (int32_t i = 0; i < numOfOutput; ++i) {
|
||||
const char* pName = pCtx[i].pExpr->pExpr->_function.functionName;
|
||||
if ((strcmp(pName, "_select_value") == 0) || (strcmp(pName, "_group_key") == 0)) {
|
||||
pValCtx[num++] = &pCtx[i];
|
||||
} else if (fmIsSelectFunc(pCtx[i].functionId)) {
|
||||
p = &pCtx[i];
|
||||
void* data = taosHashGet(pSelectFuncs, pName, strlen(pName));
|
||||
if (taosHashGetSize(pSelectFuncs) != 0 && data == NULL) {
|
||||
p = NULL;
|
||||
break;
|
||||
} else {
|
||||
taosHashPut(pSelectFuncs, pName, strlen(pName), &num, sizeof(num));
|
||||
p = &pCtx[i];
|
||||
}
|
||||
}
|
||||
}
|
||||
taosHashCleanup(pSelectFuncs);
|
||||
|
||||
if (p != NULL) {
|
||||
p->subsidiaries.pCtx = pValCtx;
|
||||
|
|
|
@ -112,7 +112,7 @@ void resetTaskInfo(qTaskInfo_t tinfo) {
|
|||
clearStreamBlock(pTaskInfo->pRoot);
|
||||
}
|
||||
|
||||
static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, char* id) {
|
||||
static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, const char* id) {
|
||||
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
|
||||
if (pOperator->numOfDownstream == 0) {
|
||||
qError("failed to find stream scan operator to set the input data block, %s" PRIx64, id);
|
||||
|
@ -129,7 +129,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
|
|||
pOperator->status = OP_NOT_OPENED;
|
||||
|
||||
SStreamScanInfo* pInfo = pOperator->info;
|
||||
qDebug("s-task set source blocks:%d %s", (int32_t)numOfBlocks, id);
|
||||
qDebug("s-task:%s set source blocks:%d", id, (int32_t)numOfBlocks);
|
||||
ASSERT(pInfo->validBlockIndex == 0 && taosArrayGetSize(pInfo->pBlockLists) == 0);
|
||||
|
||||
if (type == STREAM_INPUT__MERGED_SUBMIT) {
|
||||
|
@ -144,9 +144,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
|
|||
} else if (type == STREAM_INPUT__DATA_BLOCK) {
|
||||
for (int32_t i = 0; i < numOfBlocks; ++i) {
|
||||
SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i];
|
||||
SPackedData tmp = {
|
||||
.pDataBlock = pDataBlock,
|
||||
};
|
||||
SPackedData tmp = { .pDataBlock = pDataBlock };
|
||||
taosArrayPush(pInfo->pBlockLists, &tmp);
|
||||
}
|
||||
pInfo->blockType = STREAM_INPUT__DATA_BLOCK;
|
||||
|
@ -162,9 +160,11 @@ void doSetTaskId(SOperatorInfo* pOperator) {
|
|||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
|
||||
SStreamScanInfo* pStreamScanInfo = pOperator->info;
|
||||
STableScanInfo* pScanInfo = pStreamScanInfo->pTableScanOp->info;
|
||||
if (pScanInfo->base.dataReader != NULL) {
|
||||
tsdbReaderSetId(pScanInfo->base.dataReader, pTaskInfo->id.str);
|
||||
if (pStreamScanInfo->pTableScanOp != NULL) {
|
||||
STableScanInfo* pScanInfo = pStreamScanInfo->pTableScanOp->info;
|
||||
if (pScanInfo->base.dataReader != NULL) {
|
||||
tsdbReaderSetId(pScanInfo->base.dataReader, pTaskInfo->id.str);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
doSetTaskId(pOperator->pDownstream[0]);
|
||||
|
|
|
@ -1583,7 +1583,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
|
|||
// currently only the tbname pseudo column
|
||||
if (pInfo->numOfPseudoExpr > 0) {
|
||||
int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes,
|
||||
pInfo->pRes->info.rows, GET_TASKID(pTaskInfo), NULL);
|
||||
pInfo->pRes->info.rows, GET_TASKID(pTaskInfo), &pTableScanInfo->base.metaCache);
|
||||
// ignore the table not exists error, since this table may have been dropped during the scan procedure.
|
||||
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_PAR_TABLE_NOT_EXIST) {
|
||||
blockDataFreeRes((SSDataBlock*)pBlock);
|
||||
|
@ -1626,7 +1626,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
|
|||
blockDataCleanup(pInfo->pRes);
|
||||
SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
|
||||
|
||||
while (tqNextBlockImpl(pInfo->tqReader)) {
|
||||
while (tqNextBlockImpl(pInfo->tqReader, NULL)) {
|
||||
int32_t code = tqRetrieveDataBlock(pInfo->tqReader, NULL);
|
||||
if (code != TSDB_CODE_SUCCESS || pInfo->tqReader->pResBlock->info.rows == 0) {
|
||||
continue;
|
||||
|
@ -2046,17 +2046,18 @@ FETCH_NEXT_BLOCK:
|
|||
return pInfo->pUpdateRes;
|
||||
}
|
||||
|
||||
const char* id = GET_TASKID(pTaskInfo);
|
||||
SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
|
||||
|
||||
int32_t totBlockNum = taosArrayGetSize(pInfo->pBlockLists);
|
||||
int32_t totalBlocks = taosArrayGetSize(pInfo->pBlockLists);
|
||||
|
||||
NEXT_SUBMIT_BLK:
|
||||
while (1) {
|
||||
if (pInfo->tqReader->msg.msgStr == NULL) {
|
||||
if (pInfo->validBlockIndex >= totBlockNum) {
|
||||
if (pInfo->validBlockIndex >= totalBlocks) {
|
||||
updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo);
|
||||
doClearBufferedBlocks(pInfo);
|
||||
qDebug("stream scan return empty, consume block %d", totBlockNum);
|
||||
|
||||
qDebug("stream scan return empty, all %d submit blocks consumed, %s", totalBlocks, id);
|
||||
void* buff = NULL;
|
||||
// int32_t len = streamScanOperatorEncode(pInfo, &buff);
|
||||
// if (len > 0) {
|
||||
|
@ -2068,17 +2069,18 @@ FETCH_NEXT_BLOCK:
|
|||
|
||||
int32_t current = pInfo->validBlockIndex++;
|
||||
SPackedData* pSubmit = taosArrayGet(pInfo->pBlockLists, current);
|
||||
|
||||
qDebug("set %d/%d as the input submit block, %s", current, totalBlocks, id);
|
||||
if (tqReaderSetSubmitMsg(pInfo->tqReader, pSubmit->msgStr, pSubmit->msgLen, pSubmit->ver) < 0) {
|
||||
qError("submit msg messed up when initing stream submit block %p, current %d, total %d", pSubmit, current,
|
||||
totBlockNum);
|
||||
qError("submit msg messed up when initializing stream submit block %p, current %d/%d, %s", pSubmit, current, totalBlocks, id);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
blockDataCleanup(pInfo->pRes);
|
||||
|
||||
while (tqNextBlockImpl(pInfo->tqReader)) {
|
||||
int32_t code = tqRetrieveDataBlock(pInfo->tqReader, NULL);
|
||||
while (tqNextBlockImpl(pInfo->tqReader, id)) {
|
||||
int32_t code = tqRetrieveDataBlock(pInfo->tqReader, id);
|
||||
if (code != TSDB_CODE_SUCCESS || pInfo->tqReader->pResBlock->info.rows == 0) {
|
||||
continue;
|
||||
}
|
||||
|
@ -2099,6 +2101,7 @@ FETCH_NEXT_BLOCK:
|
|||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) {
|
||||
break;
|
||||
} else {
|
||||
|
@ -2110,7 +2113,7 @@ FETCH_NEXT_BLOCK:
|
|||
pInfo->numOfExec++;
|
||||
pOperator->resultInfo.totalRows += pBlockInfo->rows;
|
||||
|
||||
qDebug("scan rows: %" PRId64, pBlockInfo->rows);
|
||||
qDebug("stream scan get source rows:%" PRId64", %s", pBlockInfo->rows, id);
|
||||
if (pBlockInfo->rows > 0) {
|
||||
return pInfo->pRes;
|
||||
}
|
||||
|
|
|
@ -2564,7 +2564,9 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
|||
doDeleteWindows(pOperator, &pInfo->interval, pBlock, delWins, pInfo->pUpdatedMap);
|
||||
if (IS_FINAL_OP(pInfo)) {
|
||||
addRetriveWindow(delWins, pInfo);
|
||||
taosArrayAddAll(pInfo->pDelWins, delWins);
|
||||
if (pBlock->info.type != STREAM_CLEAR) {
|
||||
taosArrayAddAll(pInfo->pDelWins, delWins);
|
||||
}
|
||||
taosArrayDestroy(delWins);
|
||||
continue;
|
||||
}
|
||||
|
@ -2576,6 +2578,11 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
|||
if (pInfo->pDelRes->info.rows != 0) {
|
||||
// process the rest of the data
|
||||
printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
|
||||
if (pBlock->info.type == STREAM_CLEAR) {
|
||||
pInfo->pDelRes->info.type = STREAM_CLEAR;
|
||||
} else {
|
||||
pInfo->pDelRes->info.type = STREAM_DELETE_RESULT;
|
||||
}
|
||||
return pInfo->pDelRes;
|
||||
}
|
||||
|
||||
|
|
|
@ -388,6 +388,9 @@ static bool isSetUselessCol(SSetOperator* pSetOp, int32_t index, SExprNode* pPro
|
|||
}
|
||||
|
||||
static int32_t calcConstSetOpProjections(SCalcConstContext* pCxt, SSetOperator* pSetOp, bool subquery) {
|
||||
if (subquery && pSetOp->opType == SET_OP_TYPE_UNION) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
int32_t index = 0;
|
||||
SNode* pProj = NULL;
|
||||
WHERE_EACH(pProj, pSetOp->pProjectionList) {
|
||||
|
|
|
@ -5349,7 +5349,8 @@ static int32_t checkAlterSuperTableBySchema(STranslateContext* pCxt, SAlterTable
|
|||
}
|
||||
|
||||
if (TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES == pStmt->alterType) {
|
||||
if (calcTypeBytes(pStmt->dataType) > TSDB_MAX_FIELD_LEN) {
|
||||
if ((TSDB_DATA_TYPE_VARCHAR == pStmt->dataType.type && calcTypeBytes(pStmt->dataType) > TSDB_MAX_BINARY_LEN) ||
|
||||
(TSDB_DATA_TYPE_NCHAR == pStmt->dataType.type && calcTypeBytes(pStmt->dataType) > TSDB_MAX_NCHAR_LEN)) {
|
||||
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN);
|
||||
}
|
||||
|
||||
|
@ -5374,6 +5375,11 @@ static int32_t checkAlterSuperTableBySchema(STranslateContext* pCxt, SAlterTable
|
|||
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_TOO_MANY_COLUMNS);
|
||||
}
|
||||
|
||||
if ((TSDB_DATA_TYPE_VARCHAR == pStmt->dataType.type && calcTypeBytes(pStmt->dataType) > TSDB_MAX_BINARY_LEN) ||
|
||||
(TSDB_DATA_TYPE_NCHAR == pStmt->dataType.type && calcTypeBytes(pStmt->dataType) > TSDB_MAX_NCHAR_LEN)) {
|
||||
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN);
|
||||
}
|
||||
|
||||
if (pTableMeta->tableInfo.rowSize + calcTypeBytes(pStmt->dataType) > TSDB_MAX_BYTES_PER_ROW) {
|
||||
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_ROW_LENGTH, TSDB_MAX_BYTES_PER_ROW);
|
||||
}
|
||||
|
@ -8321,6 +8327,11 @@ static int32_t buildAddColReq(STranslateContext* pCxt, SAlterTableStmt* pStmt, S
|
|||
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_DUPLICATED_COLUMN);
|
||||
}
|
||||
|
||||
if ((TSDB_DATA_TYPE_VARCHAR == pStmt->dataType.type && calcTypeBytes(pStmt->dataType) > TSDB_MAX_BINARY_LEN) ||
|
||||
(TSDB_DATA_TYPE_NCHAR == pStmt->dataType.type && calcTypeBytes(pStmt->dataType) > TSDB_MAX_NCHAR_LEN)) {
|
||||
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN);
|
||||
}
|
||||
|
||||
if (TSDB_MAX_COLUMNS == pTableMeta->tableInfo.numOfColumns) {
|
||||
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_TOO_MANY_COLUMNS);
|
||||
}
|
||||
|
@ -8373,6 +8384,11 @@ static int32_t buildUpdateColReq(STranslateContext* pCxt, SAlterTableStmt* pStmt
|
|||
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_MODIFY_COL);
|
||||
}
|
||||
|
||||
if ((TSDB_DATA_TYPE_VARCHAR == pStmt->dataType.type && calcTypeBytes(pStmt->dataType) > TSDB_MAX_BINARY_LEN) ||
|
||||
(TSDB_DATA_TYPE_NCHAR == pStmt->dataType.type && calcTypeBytes(pStmt->dataType) > TSDB_MAX_NCHAR_LEN)) {
|
||||
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN);
|
||||
}
|
||||
|
||||
if (pTableMeta->tableInfo.rowSize + pReq->colModBytes - pSchema->bytes > TSDB_MAX_BYTES_PER_ROW) {
|
||||
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_ROW_LENGTH, TSDB_MAX_BYTES_PER_ROW);
|
||||
}
|
||||
|
|
|
@ -19,6 +19,14 @@
|
|||
#include "scalar.h"
|
||||
#include "tglobal.h"
|
||||
|
||||
static void debugPrintNode(SNode* pNode) {
|
||||
char* pStr = NULL;
|
||||
nodesNodeToString(pNode, false, &pStr, NULL);
|
||||
printf("%s\n", pStr);
|
||||
taosMemoryFree(pStr);
|
||||
return;
|
||||
}
|
||||
|
||||
static void dumpQueryPlan(SQueryPlan* pPlan) {
|
||||
if (!tsQueryPlannerTrace) {
|
||||
return;
|
||||
|
|
|
@ -299,9 +299,8 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) {
|
|||
int32_t numOfBlocks = taosQueueItemSize(pTask->inputQueue->queue) + 1;
|
||||
double size = taosQueueMemorySize(pTask->inputQueue->queue) / 1048576.0;
|
||||
|
||||
qDebug("s-task:%s submit enqueue %p %p msgLen:%d ver:%" PRId64 ", total in queue:%d, size:%.2fMiB", pTask->id.idStr,
|
||||
pItem, pSubmitBlock->submit.msgStr, pSubmitBlock->submit.msgLen,
|
||||
pSubmitBlock->submit.ver, numOfBlocks, size);
|
||||
qDebug("s-task:%s submit enqueue msgLen:%d ver:%" PRId64 ", total in queue:%d, size:%.2fMiB", pTask->id.idStr,
|
||||
pSubmitBlock->submit.msgLen, pSubmitBlock->submit.ver, numOfBlocks, size);
|
||||
|
||||
if ((pTask->taskLevel == TASK_LEVEL__SOURCE) &&
|
||||
(numOfBlocks > STREAM_TASK_INPUT_QUEUEU_CAPACITY || (size >= STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE))) {
|
||||
|
@ -345,6 +344,8 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static void* streamQueueCurItem(SStreamQueue* queue) { return queue->qItem; }
|
||||
|
||||
void* streamQueueNextItem(SStreamQueue* queue) {
|
||||
int8_t dequeueFlag = atomic_exchange_8(&queue->status, STREAM_QUEUE__PROCESSING);
|
||||
if (dequeueFlag == STREAM_QUEUE__FAILED) {
|
||||
|
|
|
@ -159,7 +159,8 @@ SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem*
|
|||
return dst;
|
||||
} else if (dst->type == STREAM_INPUT__DATA_SUBMIT && pElem->type == STREAM_INPUT__DATA_SUBMIT) {
|
||||
SStreamMergedSubmit2* pMerged = streamMergedSubmitNew();
|
||||
ASSERT(pMerged);
|
||||
// todo handle error
|
||||
|
||||
streamMergeSubmit(pMerged, (SStreamDataSubmit2*)dst);
|
||||
streamMergeSubmit(pMerged, (SStreamDataSubmit2*)pElem);
|
||||
taosFreeQitem(dst);
|
||||
|
|
|
@ -15,7 +15,8 @@
|
|||
|
||||
#include "streamInc.h"
|
||||
|
||||
#define MAX_STREAM_EXEC_BATCH_NUM 10240
|
||||
// maximum allowed processed block batches. One block may include several submit blocks
|
||||
#define MAX_STREAM_EXEC_BATCH_NUM 128
|
||||
#define MIN_STREAM_EXEC_BATCH_NUM 16
|
||||
|
||||
bool streamTaskShouldStop(const SStreamStatus* pStatus) {
|
||||
|
@ -66,7 +67,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray*
|
|||
|
||||
SArray* pBlockList = pMerged->submits;
|
||||
int32_t numOfBlocks = taosArrayGetSize(pBlockList);
|
||||
qDebug("st-task:%s %p set submit input (merged), batch num:%d", pTask->id.idStr, pTask, numOfBlocks);
|
||||
qDebug("s-task:%s %p set submit input (merged), numOfblocks:%d", pTask->id.idStr, pTask, numOfBlocks);
|
||||
qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__MERGED_SUBMIT);
|
||||
} else if (pItem->type == STREAM_INPUT__REF_DATA_BLOCK) {
|
||||
const SStreamRefDataBlock* pRefBlock = (const SStreamRefDataBlock*)data;
|
||||
|
@ -259,9 +260,10 @@ int32_t streamExecForAll(SStreamTask* pTask) {
|
|||
int32_t code = 0;
|
||||
while (1) {
|
||||
int32_t batchSize = 1;
|
||||
void* pInput = NULL;
|
||||
int16_t times = 0;
|
||||
|
||||
SStreamQueueItem* pInput = NULL;
|
||||
|
||||
// merge multiple input data if possible in the input queue.
|
||||
qDebug("s-task:%s start to extract data block from inputQ", pTask->id.idStr);
|
||||
|
||||
|
@ -271,10 +273,11 @@ int32_t streamExecForAll(SStreamTask* pTask) {
|
|||
if (pTask->taskLevel == TASK_LEVEL__SOURCE && batchSize < MIN_STREAM_EXEC_BATCH_NUM && times < 5) {
|
||||
times++;
|
||||
taosMsleep(1);
|
||||
qDebug("===stream===try agian batchSize:%d", batchSize);
|
||||
qDebug("===stream===try again batchSize:%d", batchSize);
|
||||
continue;
|
||||
}
|
||||
|
||||
qDebug("===stream===break batchSize:%d", batchSize);
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -285,6 +288,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
|
|||
break;
|
||||
}
|
||||
} else {
|
||||
// todo we need to sort the data block, instead of just appending into the array list.
|
||||
void* newRet = NULL;
|
||||
if ((newRet = streamMergeQueueItem(pInput, qItem)) == NULL) {
|
||||
streamQueueProcessFail(pTask->inputQueue);
|
||||
|
@ -294,6 +298,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
|
|||
pInput = newRet;
|
||||
streamQueueProcessSuccess(pTask->inputQueue);
|
||||
if (batchSize > MAX_STREAM_EXEC_BATCH_NUM) {
|
||||
qDebug("maximum batch limit:%d reached, processing, %s", MAX_STREAM_EXEC_BATCH_NUM, pTask->id.idStr);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -304,6 +309,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
|
|||
if (pInput) {
|
||||
streamFreeQitem(pInput);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -312,14 +318,14 @@ int32_t streamExecForAll(SStreamTask* pTask) {
|
|||
}
|
||||
|
||||
if (pTask->taskLevel == TASK_LEVEL__SINK) {
|
||||
ASSERT(((SStreamQueueItem*)pInput)->type == STREAM_INPUT__DATA_BLOCK);
|
||||
ASSERT(pInput->type == STREAM_INPUT__DATA_BLOCK);
|
||||
qDebug("s-task:%s sink node start to sink result. numOfBlocks:%d", pTask->id.idStr, batchSize);
|
||||
streamTaskOutput(pTask, pInput);
|
||||
streamTaskOutput(pTask, (SStreamDataBlock*)pInput);
|
||||
continue;
|
||||
}
|
||||
|
||||
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
|
||||
qDebug("s-task:%s start to execute, numOfBlocks:%d", pTask->id.idStr, batchSize);
|
||||
qDebug("s-task:%s start to execute, block batches:%d", pTask->id.idStr, batchSize);
|
||||
|
||||
streamTaskExecImpl(pTask, pInput, pRes);
|
||||
|
||||
|
|
|
@ -217,8 +217,12 @@ int32_t streamMetaAddDeployedTask(SStreamMeta* pMeta, int64_t ver, SStreamTask*
|
|||
return -1;
|
||||
}
|
||||
|
||||
taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(int32_t), &pTask, POINTER_BYTES);
|
||||
taosArrayPush(pMeta->pTaskList, &pTask->id.taskId);
|
||||
void* p = taosHashGet(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId));
|
||||
if (p == NULL) {
|
||||
taosArrayPush(pMeta->pTaskList, &pTask->id.taskId);
|
||||
}
|
||||
|
||||
taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId), &pTask, POINTER_BYTES);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -357,15 +361,18 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
if (taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(int32_t), &pTask, sizeof(void*)) < 0) {
|
||||
void* p = taosHashGet(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId));
|
||||
if (p == NULL) {
|
||||
taosArrayPush(pMeta->pTaskList, &pTask->id.taskId);
|
||||
}
|
||||
|
||||
if (taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId), &pTask, sizeof(void*)) < 0) {
|
||||
tdbFree(pKey);
|
||||
tdbFree(pVal);
|
||||
tdbTbcClose(pCur);
|
||||
return -1;
|
||||
}
|
||||
|
||||
taosArrayPush(pMeta->pTaskList, &pTask->id.taskId);
|
||||
|
||||
if (pTask->fillHistory) {
|
||||
pTask->status.taskStatus = TASK_STATUS__WAIT_DOWNSTREAM;
|
||||
streamTaskCheckDownstream(pTask, ver);
|
||||
|
|
|
@ -93,10 +93,6 @@ int stateKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) {
|
|||
|
||||
SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int32_t szPage, int32_t pages) {
|
||||
qWarn("open stream state, %s", path);
|
||||
if (pTask == NULL) {
|
||||
qWarn("failed to open stream state, %s", path);
|
||||
return NULL;
|
||||
}
|
||||
SStreamState* pState = taosMemoryCalloc(1, sizeof(SStreamState));
|
||||
if (pState == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -127,7 +123,6 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int
|
|||
taosMemoryFree(pState);
|
||||
pState = NULL;
|
||||
}
|
||||
qWarn("open stream state2, %s", statePath);
|
||||
pState->pTdbState->pOwner = pTask;
|
||||
pState->pFileState = NULL;
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT);
|
||||
|
|
|
@ -195,6 +195,7 @@ void tFreeStreamTask(SStreamTask* pTask) {
|
|||
if (pTask->outputType == TASK_OUTPUT__TABLE) {
|
||||
tDeleteSchemaWrapper(pTask->tbSink.pSchemaWrapper);
|
||||
taosMemoryFree(pTask->tbSink.pTSchema);
|
||||
tSimpleHashCleanup(pTask->tbSink.pTblInfo);
|
||||
}
|
||||
|
||||
if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||
|
|
|
@ -295,6 +295,36 @@ void walAlignVersions(SWal* pWal) {
|
|||
wInfo("vgId:%d, reset commitVer to %" PRId64, pWal->cfg.vgId, pWal->vers.commitVer);
|
||||
}
|
||||
|
||||
int walRepairLogFileTs(SWal* pWal, bool* updateMeta) {
|
||||
int32_t sz = taosArrayGetSize(pWal->fileInfoSet);
|
||||
int32_t fileIdx = -1;
|
||||
int32_t lastCloseTs = 0;
|
||||
char fnameStr[WAL_FILE_LEN] = {0};
|
||||
|
||||
while (++fileIdx < sz - 1) {
|
||||
SWalFileInfo* pFileInfo = taosArrayGet(pWal->fileInfoSet, fileIdx);
|
||||
if (pFileInfo->closeTs != -1) {
|
||||
lastCloseTs = pFileInfo->closeTs;
|
||||
continue;
|
||||
}
|
||||
|
||||
walBuildLogName(pWal, pFileInfo->firstVer, fnameStr);
|
||||
int32_t mtime = 0;
|
||||
if (taosStatFile(fnameStr, NULL, &mtime) < 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
wError("vgId:%d, failed to stat file due to %s, file:%s", pWal->cfg.vgId, strerror(errno), fnameStr);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (updateMeta != NULL) *updateMeta = true;
|
||||
if (pFileInfo->createTs == -1) pFileInfo->createTs = lastCloseTs;
|
||||
pFileInfo->closeTs = mtime;
|
||||
lastCloseTs = pFileInfo->closeTs;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
bool walLogEntriesComplete(const SWal* pWal) {
|
||||
int32_t sz = taosArrayGetSize(pWal->fileInfoSet);
|
||||
bool complete = true;
|
||||
|
@ -433,15 +463,8 @@ int walCheckAndRepairMeta(SWal* pWal) {
|
|||
wError("failed to scan wal last ver since %s", terrstr());
|
||||
return -1;
|
||||
}
|
||||
// remove the empty wal log, and its idx
|
||||
wInfo("vgId:%d, wal remove empty file %s", pWal->cfg.vgId, fnameStr);
|
||||
taosRemoveFile(fnameStr);
|
||||
walBuildIdxName(pWal, pFileInfo->firstVer, fnameStr);
|
||||
wInfo("vgId:%d, wal remove empty file %s", pWal->cfg.vgId, fnameStr);
|
||||
taosRemoveFile(fnameStr);
|
||||
// remove its meta entry
|
||||
taosArrayRemove(pWal->fileInfoSet, fileIdx);
|
||||
continue;
|
||||
// empty log file
|
||||
lastVer = pFileInfo->firstVer - 1;
|
||||
}
|
||||
|
||||
// update lastVer
|
||||
|
@ -460,6 +483,11 @@ int walCheckAndRepairMeta(SWal* pWal) {
|
|||
}
|
||||
(void)walAlignVersions(pWal);
|
||||
|
||||
// repair ts of files
|
||||
if (walRepairLogFileTs(pWal, &updateMeta) < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
// update meta file
|
||||
if (updateMeta) {
|
||||
(void)walSaveMeta(pWal);
|
||||
|
|
|
@ -74,18 +74,17 @@ int32_t walNextValidMsg(SWalReader *pReader) {
|
|||
int64_t lastVer = walGetLastVer(pReader->pWal);
|
||||
int64_t committedVer = walGetCommittedVer(pReader->pWal);
|
||||
int64_t appliedVer = walGetAppliedVer(pReader->pWal);
|
||||
while(appliedVer < committedVer){ // wait apply ver equal to commit ver, otherwise may lost data when consume data [TD-24010]
|
||||
wDebug("vgId:%d, wal apply ver:%"PRId64" smaller than commit ver:%"PRId64", so sleep 1ms", pReader->pWal->cfg.vgId, appliedVer, committedVer);
|
||||
taosMsleep(1);
|
||||
appliedVer = walGetAppliedVer(pReader->pWal);
|
||||
if(appliedVer < committedVer){ // wait apply ver equal to commit ver, otherwise may lost data when consume data [TD-24010]
|
||||
wDebug("vgId:%d, wal apply ver:%"PRId64" smaller than commit ver:%"PRId64, pReader->pWal->cfg.vgId, appliedVer, committedVer);
|
||||
// taosMsleep(10);
|
||||
}
|
||||
// int64_t endVer = pReader->cond.scanUncommited ? lastVer : committedVer;
|
||||
// endVer = TMIN(appliedVer, endVer);
|
||||
int64_t endVer = TMIN(appliedVer, committedVer);
|
||||
|
||||
wDebug("vgId:%d, wal start to fetch, index:%" PRId64 ", last index:%" PRId64 " commit index:%" PRId64
|
||||
", applied index:%" PRId64,
|
||||
pReader->pWal->cfg.vgId, fetchVer, lastVer, committedVer, appliedVer);
|
||||
while (fetchVer <= committedVer) {
|
||||
", applied index:%" PRId64", end index:%" PRId64,
|
||||
pReader->pWal->cfg.vgId, fetchVer, lastVer, committedVer, appliedVer, endVer);
|
||||
while (fetchVer <= endVer) {
|
||||
if (walFetchHeadNew(pReader, fetchVer) < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
|
|
@ -284,15 +284,15 @@ int32_t walEndSnapshot(SWal *pWal) {
|
|||
if (ver == -1) {
|
||||
code = -1;
|
||||
goto END;
|
||||
};
|
||||
}
|
||||
|
||||
pWal->vers.snapshotVer = ver;
|
||||
int ts = taosGetTimestampSec();
|
||||
|
||||
ver = TMAX(ver - pWal->vers.logRetention, pWal->vers.firstVer - 1);
|
||||
|
||||
// compatible mode for refVer
|
||||
bool hasTopic = false;
|
||||
int64_t refVer = ver;
|
||||
int64_t refVer = INT64_MAX;
|
||||
void *pIter = NULL;
|
||||
while (1) {
|
||||
pIter = taosHashIterate(pWal->pRefHash, pIter);
|
||||
|
@ -300,54 +300,40 @@ int32_t walEndSnapshot(SWal *pWal) {
|
|||
SWalRef *pRef = *(SWalRef **)pIter;
|
||||
if (pRef->refVer == -1) continue;
|
||||
refVer = TMIN(refVer, pRef->refVer - 1);
|
||||
wDebug("vgId:%d, wal found ref %" PRId64 ", refId %" PRId64, pWal->cfg.vgId, pRef->refVer, pRef->refId);
|
||||
hasTopic = true;
|
||||
}
|
||||
// compatible mode
|
||||
if (pWal->cfg.retentionPeriod == 0 && hasTopic) {
|
||||
wInfo("vgId:%d, wal found refVer:%" PRId64 " in compatible mode, ver:%" PRId64, pWal->cfg.vgId, refVer, ver);
|
||||
ver = TMIN(ver, refVer);
|
||||
}
|
||||
|
||||
// find files safe to delete
|
||||
int deleteCnt = 0;
|
||||
int64_t newTotSize = pWal->totSize;
|
||||
SWalFileInfo tmp;
|
||||
SWalFileInfo tmp = {0};
|
||||
tmp.firstVer = ver;
|
||||
// find files safe to delete
|
||||
SWalFileInfo *pInfo = taosArraySearch(pWal->fileInfoSet, &tmp, compareWalFileInfo, TD_LE);
|
||||
|
||||
if (pInfo) {
|
||||
SWalFileInfo *pLastFileInfo = taosArrayGetLast(pWal->fileInfoSet);
|
||||
wDebug("vgId:%d, wal search found file info: first:%" PRId64 " last:%" PRId64, pWal->cfg.vgId, pInfo->firstVer,
|
||||
pInfo->lastVer);
|
||||
if (ver >= pInfo->lastVer) {
|
||||
wDebug("vgId:%d, wal search found file info. ver:%" PRId64 ", first:%" PRId64 " last:%" PRId64, pWal->cfg.vgId, ver,
|
||||
pInfo->firstVer, pInfo->lastVer);
|
||||
ASSERT(ver <= pInfo->lastVer);
|
||||
if (ver == pInfo->lastVer) {
|
||||
pInfo++;
|
||||
wDebug("vgId:%d, wal remove advance one file: first:%" PRId64 " last:%" PRId64, pWal->cfg.vgId, pInfo->firstVer,
|
||||
pInfo->lastVer);
|
||||
}
|
||||
if (pInfo <= pLastFileInfo) {
|
||||
wDebug("vgId:%d, wal end remove for first:%" PRId64 " last:%" PRId64, pWal->cfg.vgId, pInfo->firstVer,
|
||||
pInfo->lastVer);
|
||||
} else {
|
||||
wDebug("vgId:%d, wal no remove", pWal->cfg.vgId);
|
||||
}
|
||||
|
||||
// iterate files, until the searched result
|
||||
// delete according to file size or close time
|
||||
for (SWalFileInfo *iter = pWal->fileInfoSet->pData; iter < pInfo; iter++) {
|
||||
wDebug("vgId:%d, wal check remove file %" PRId64 "(file size %" PRId64 " close ts %" PRId64
|
||||
"), new tot size %" PRId64,
|
||||
pWal->cfg.vgId, iter->firstVer, iter->fileSize, iter->closeTs, newTotSize);
|
||||
if ((pWal->cfg.retentionSize != -1 && pWal->cfg.retentionSize != 0 && newTotSize > pWal->cfg.retentionSize) ||
|
||||
((pWal->cfg.retentionPeriod == 0) || (pWal->cfg.retentionPeriod != -1 && iter->closeTs != -1 &&
|
||||
iter->closeTs + pWal->cfg.retentionPeriod < ts))) {
|
||||
// delete according to file size or close time
|
||||
wDebug("vgId:%d, check pass", pWal->cfg.vgId);
|
||||
if ((pWal->cfg.retentionSize > 0 && newTotSize > pWal->cfg.retentionSize) ||
|
||||
(pWal->cfg.retentionPeriod == 0 ||
|
||||
pWal->cfg.retentionPeriod > 0 && iter->closeTs >= 0 && iter->closeTs + pWal->cfg.retentionPeriod < ts)) {
|
||||
deleteCnt++;
|
||||
newTotSize -= iter->fileSize;
|
||||
taosArrayPush(pWal->toDeleteFiles, iter);
|
||||
}
|
||||
wDebug("vgId:%d, check not pass", pWal->cfg.vgId);
|
||||
}
|
||||
|
||||
UPDATE_META:
|
||||
// make new array, remove files
|
||||
taosArrayPopFrontBatch(pWal->fileInfoSet, deleteCnt);
|
||||
if (taosArrayGetSize(pWal->fileInfoSet) == 0) {
|
||||
|
@ -357,11 +343,12 @@ int32_t walEndSnapshot(SWal *pWal) {
|
|||
pWal->vers.firstVer = ((SWalFileInfo *)taosArrayGet(pWal->fileInfoSet, 0))->firstVer;
|
||||
}
|
||||
}
|
||||
|
||||
// update meta
|
||||
pWal->writeCur = taosArrayGetSize(pWal->fileInfoSet) - 1;
|
||||
pWal->totSize = newTotSize;
|
||||
pWal->vers.verInSnapshotting = -1;
|
||||
|
||||
// save snapshot ver, commit ver
|
||||
code = walSaveMeta(pWal);
|
||||
if (code < 0) {
|
||||
goto END;
|
||||
|
@ -369,23 +356,27 @@ int32_t walEndSnapshot(SWal *pWal) {
|
|||
|
||||
// delete files
|
||||
deleteCnt = taosArrayGetSize(pWal->toDeleteFiles);
|
||||
wDebug("vgId:%d, wal should delete %d files", pWal->cfg.vgId, deleteCnt);
|
||||
char fnameStr[WAL_FILE_LEN];
|
||||
char fnameStr[WAL_FILE_LEN] = {0};
|
||||
pInfo = NULL;
|
||||
|
||||
for (int i = 0; i < deleteCnt; i++) {
|
||||
pInfo = taosArrayGet(pWal->toDeleteFiles, i);
|
||||
|
||||
walBuildLogName(pWal, pInfo->firstVer, fnameStr);
|
||||
wDebug("vgId:%d, wal remove file %s", pWal->cfg.vgId, fnameStr);
|
||||
if (taosRemoveFile(fnameStr) < 0 && errno != ENOENT) {
|
||||
wError("vgId:%d, failed to remove log file %s due to %s", pWal->cfg.vgId, fnameStr, strerror(errno));
|
||||
goto END;
|
||||
}
|
||||
walBuildIdxName(pWal, pInfo->firstVer, fnameStr);
|
||||
wDebug("vgId:%d, wal remove file %s", pWal->cfg.vgId, fnameStr);
|
||||
if (taosRemoveFile(fnameStr) < 0 && errno != ENOENT) {
|
||||
wError("vgId:%d, failed to remove idx file %s due to %s", pWal->cfg.vgId, fnameStr, strerror(errno));
|
||||
goto END;
|
||||
}
|
||||
}
|
||||
if (pInfo != NULL) {
|
||||
wInfo("vgId:%d, wal log files recycled. count:%d, until ver:%" PRId64 ", closeTs:%" PRId64, pWal->cfg.vgId,
|
||||
deleteCnt, pInfo->lastVer, pInfo->closeTs);
|
||||
}
|
||||
taosArrayClear(pWal->toDeleteFiles);
|
||||
|
||||
END:
|
||||
|
|
|
@ -52,8 +52,9 @@ class ConfigureyCluster:
|
|||
dnode.addExtraCfg("secondEp", f"{hostname}:{startPort_sec}")
|
||||
|
||||
# configure dnoe of independent mnodes
|
||||
if num <= self.mnodeNums and self.mnodeNums != 0 and independentMnode == True :
|
||||
dnode.addExtraCfg("supportVnodes", 1024)
|
||||
if num <= self.mnodeNums and self.mnodeNums != 0 and independentMnode == "True" :
|
||||
tdLog.info("set mnode supportVnodes 0")
|
||||
dnode.addExtraCfg("supportVnodes", 0)
|
||||
# print(dnode)
|
||||
self.dnodes.append(dnode)
|
||||
return self.dnodes
|
||||
|
@ -71,6 +72,7 @@ class ConfigureyCluster:
|
|||
tdSql.init(conn.cursor())
|
||||
mnodeNums=int(mnodeNums)
|
||||
for i in range(2,mnodeNums+1):
|
||||
tdLog.info("create mnode on dnode %d"%i)
|
||||
tdSql.execute(" create mnode on dnode %d;"%i)
|
||||
|
||||
|
||||
|
|
|
@ -657,6 +657,34 @@ if $data20 != null then
|
|||
return -1
|
||||
endi
|
||||
|
||||
print =============== error for normal table
|
||||
sql create table tb2023(ts timestamp, f int);
|
||||
sql_error alter table tb2023 add column v varchar(16375);
|
||||
sql_error alter table tb2023 add column v varchar(16385);
|
||||
sql_error alter table tb2023 add column v varchar(33100);
|
||||
sql alter table tb2023 add column v varchar(16374);
|
||||
sql_error alter table tb2023 modify column v varchar(16375);
|
||||
sql desc tb2023
|
||||
sql alter table tb2023 drop column v
|
||||
sql_error alter table tb2023 add column v nchar(4094);
|
||||
sql alter table tb2023 add column v nchar(4093);
|
||||
sql_error alter table tb2023 modify column v nchar(4094);
|
||||
sql desc tb2023
|
||||
|
||||
print =============== error for super table
|
||||
sql create table stb2023(ts timestamp, f int) tags(t1 int);
|
||||
sql_error alter table stb2023 add column v varchar(16375);
|
||||
sql_error alter table stb2023 add column v varchar(16385);
|
||||
sql_error alter table stb2023 add column v varchar(33100);
|
||||
sql alter table stb2023 add column v varchar(16374);
|
||||
sql_error alter table stb2023 modify column v varchar(16375);
|
||||
sql desc stb2023
|
||||
sql alter table stb2023 drop column v
|
||||
sql_error alter table stb2023 add column v nchar(4094);
|
||||
sql alter table stb2023 add column v nchar(4093);
|
||||
sql_error alter table stb2023 modify column v nchar(4094);
|
||||
sql desc stb2023
|
||||
|
||||
print ======= over
|
||||
sql drop database d1
|
||||
sql select * from information_schema.ins_databases
|
||||
|
|
|
@ -48,7 +48,7 @@ sql_error alter table tb modify column c2 binary(10);
|
|||
sql_error alter table tb modify column c2 binary(9);
|
||||
sql_error alter table tb modify column c2 binary(-9);
|
||||
sql_error alter table tb modify column c2 binary(0);
|
||||
sql alter table tb modify column c2 binary(17000);
|
||||
sql_error alter table tb modify column c2 binary(17000);
|
||||
sql_error alter table tb modify column c2 nchar(30);
|
||||
sql_error alter table tb modify column c3 double;
|
||||
sql_error alter table tb modify column c3 nchar(10);
|
||||
|
|
|
@ -25,4 +25,21 @@ if $data05 != @0021001@ then
|
|||
return -1
|
||||
endi
|
||||
|
||||
sql create table st (ts timestamp, f int) tags (t int);
|
||||
sql insert into ct1 using st tags(1) values(now, 1)(now+1s, 2)
|
||||
sql insert into ct2 using st tags(2) values(now+2s, 3)(now+3s, 4)
|
||||
sql select count(*) from (select * from ct1 union all select * from ct2)
|
||||
if $rows != 1 then
|
||||
return -1
|
||||
endi
|
||||
if $data00 != 4 then
|
||||
return -1
|
||||
endi
|
||||
sql select count(*) from (select * from ct1 union select * from ct2)
|
||||
if $rows != 1 then
|
||||
return -1
|
||||
endi
|
||||
if $data00 != 4 then
|
||||
return -1
|
||||
endi
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||
|
|
|
@ -29,6 +29,7 @@ class TDTestCase:
|
|||
self.stbname = 'stb'
|
||||
self.binary_length = 20 # the length of binary for column_dict
|
||||
self.nchar_length = 20 # the length of nchar for column_dict
|
||||
self.dbnames = ['db1', 'db2']
|
||||
self.column_dict = {
|
||||
'ts': 'timestamp',
|
||||
'col1': 'float',
|
||||
|
@ -57,21 +58,25 @@ class TDTestCase:
|
|||
def create_user(self):
|
||||
user_name = 'test'
|
||||
tdSql.execute(f'create user {user_name} pass "test"')
|
||||
tdSql.execute(f'grant read on db.stb with t2 = "Beijing" to {user_name}')
|
||||
tdSql.execute(f'grant read on {self.dbnames[0]}.{self.stbname} with t2 = "Beijing" to {user_name}')
|
||||
tdSql.execute(f'grant write on {self.dbnames[1]}.{self.stbname} with t1 = 2 to {user_name}')
|
||||
|
||||
def prepare_data(self):
|
||||
tdSql.execute(self.setsql.set_create_stable_sql(self.stbname, self.column_dict, self.tag_dict))
|
||||
for i in range(self.tbnum):
|
||||
tdSql.execute(f'create table {self.stbname}_{i} using {self.stbname} tags({self.tag_list[i]})')
|
||||
for j in self.values_list:
|
||||
tdSql.execute(f'insert into {self.stbname}_{i} values({j})')
|
||||
for db in self.dbnames:
|
||||
tdSql.execute(f"create database {db}")
|
||||
tdSql.execute(f"use {db}")
|
||||
tdSql.execute(self.setsql.set_create_stable_sql(self.stbname, self.column_dict, self.tag_dict))
|
||||
for i in range(self.tbnum):
|
||||
tdSql.execute(f'create table {self.stbname}_{i} using {self.stbname} tags({self.tag_list[i]})')
|
||||
for j in self.values_list:
|
||||
tdSql.execute(f'insert into {self.stbname}_{i} values({j})')
|
||||
|
||||
def user_privilege_check(self):
|
||||
def user_read_privilege_check(self, dbname):
|
||||
testconn = taos.connect(user='test', password='test')
|
||||
expectErrNotOccured = False
|
||||
|
||||
try:
|
||||
sql = "select count(*) from db.stb where t2 = 'Beijing'"
|
||||
sql = f"select count(*) from {dbname}.stb where t2 = 'Beijing'"
|
||||
res = testconn.query(sql)
|
||||
data = res.fetch_all()
|
||||
count = data[0][0]
|
||||
|
@ -85,11 +90,30 @@ class TDTestCase:
|
|||
tdLog.exit(f"{sql}, expect result doesn't match")
|
||||
pass
|
||||
|
||||
def user_write_privilege_check(self, dbname):
|
||||
testconn = taos.connect(user='test', password='test')
|
||||
expectErrNotOccured = False
|
||||
|
||||
try:
|
||||
sql = f"insert into {dbname}.stb_1 values(now, 1.1, 200, 0.3)"
|
||||
testconn.execute(sql)
|
||||
except BaseException:
|
||||
expectErrNotOccured = True
|
||||
|
||||
if expectErrNotOccured:
|
||||
caller = inspect.getframeinfo(inspect.stack()[1][0])
|
||||
tdLog.exit(f"{caller.filename}({caller.lineno}) failed: sql:{sql}, expect error not occured")
|
||||
else:
|
||||
pass
|
||||
|
||||
def user_privilege_error_check(self):
|
||||
testconn = taos.connect(user='test', password='test')
|
||||
expectErrNotOccured = False
|
||||
|
||||
sql_list = ["alter talbe db.stb_1 set t2 = 'Wuhan'", "drop table db.stb_1"]
|
||||
sql_list = [f"alter talbe {self.dbnames[0]}.stb_1 set t2 = 'Wuhan'",
|
||||
f"insert into {self.dbnames[0]}.stb_1 values(now, 1.1, 200, 0.3)",
|
||||
f"drop table {self.dbnames[0]}.stb_1",
|
||||
f"select count(*) from {self.dbnames[1]}.stb"]
|
||||
|
||||
for sql in sql_list:
|
||||
try:
|
||||
|
@ -104,11 +128,11 @@ class TDTestCase:
|
|||
tdLog.exit(f"{caller.filename}({caller.lineno}) failed: sql:{sql}, expect error not occured")
|
||||
pass
|
||||
|
||||
def run(self):
|
||||
tdSql.prepare()
|
||||
def run(self):
|
||||
self.prepare_data()
|
||||
self.create_user()
|
||||
self.user_privilege_check()
|
||||
self.user_read_privilege_check(self.dbnames[0])
|
||||
self.user_write_privilege_check(self.dbnames[1])
|
||||
self.user_privilege_error_check()
|
||||
|
||||
def stop(self):
|
||||
|
|
|
@ -207,7 +207,7 @@ class ClusterComCheck:
|
|||
count+=1
|
||||
else:
|
||||
tdLog.debug(tdSql.queryResult)
|
||||
tdLog.exit("stop mnodes on dnode %d failed in 10s ")
|
||||
tdLog.exit(f"stop mnodes on dnode {offlineDnodeNo} failed in 10s ")
|
||||
|
||||
def check3mnode2off(self,mnodeNums=3):
|
||||
count=0
|
||||
|
@ -226,7 +226,45 @@ class ClusterComCheck:
|
|||
count+=1
|
||||
else:
|
||||
tdLog.debug(tdSql.queryResult)
|
||||
tdLog.exit("stop mnodes on dnode %d failed in 10s ")
|
||||
tdLog.exit("stop mnodes on dnode 2 or 3 failed in 10s")
|
||||
|
||||
def check_vgroups_status(self,vgroup_numbers=2,db_replica=3,count_number=10,db_name="db"):
|
||||
""" check vgroups status in 10s after db vgroups status is changed """
|
||||
vgroup_numbers = int(vgroup_numbers)
|
||||
self.db_replica = int(db_replica)
|
||||
tdLog.debug("start to check status of vgroups")
|
||||
count=0
|
||||
last_number=vgroup_numbers-1
|
||||
while count < count_number:
|
||||
time.sleep(1)
|
||||
tdSql.query(f"show {db_name}.vgroups;")
|
||||
if count == 0 :
|
||||
if tdSql.checkRows(vgroup_numbers) :
|
||||
tdLog.success(f"{db_name} has {vgroup_numbers} vgroups" )
|
||||
else:
|
||||
tdLog.exit(f"vgroup number of {db_name} is not correct")
|
||||
if self.db_replica == 1 :
|
||||
if tdSql.queryResult[0][4] == 'leader' and tdSql.queryResult[1][4] == 'leader' and tdSql.queryResult[last_number][4] == 'leader':
|
||||
ready_time= (count + 1)
|
||||
tdLog.success(f"all vgroups of {db_name} are leaders in {count + 1} s")
|
||||
return True
|
||||
count+=1
|
||||
elif self.db_replica == 3 :
|
||||
vgroup_status_first=[tdSql.queryResult[0][4],tdSql.queryResult[0][6],tdSql.queryResult[0][8]]
|
||||
|
||||
vgroup_status_last=[tdSql.queryResult[last_number][4],tdSql.queryResult[last_number][6],tdSql.queryResult[last_number][8]]
|
||||
if vgroup_status_first.count('leader') == 1 and vgroup_status_first.count('follower') == 2:
|
||||
if vgroup_status_last.count('leader') == 1 and vgroup_status_last.count('follower') == 2:
|
||||
ready_time= (count + 1)
|
||||
tdLog.success(f"all vgroups of {db_name} are ready in {ready_time} s")
|
||||
return True
|
||||
count+=1
|
||||
else:
|
||||
tdLog.debug(tdSql.queryResult)
|
||||
tdLog.notice(f"all vgroups leader of {db_name} is selected {count}s ")
|
||||
caller = inspect.getframeinfo(inspect.stack()[1][0])
|
||||
args = (caller.filename, caller.lineno)
|
||||
tdLog.exit("%s(%d) failed " % args)
|
||||
|
||||
|
||||
|
||||
|
|
|
@ -0,0 +1,206 @@
|
|||
import taos
|
||||
import sys
|
||||
import time
|
||||
import os
|
||||
|
||||
from util.log import *
|
||||
from util.sql import *
|
||||
from util.cases import *
|
||||
from util.dnodes import TDDnodes
|
||||
from util.dnodes import TDDnode
|
||||
from util.cluster import *
|
||||
sys.path.append("./6-cluster")
|
||||
from clusterCommonCreate import *
|
||||
from clusterCommonCheck import clusterComCheck
|
||||
|
||||
import time
|
||||
import socket
|
||||
import subprocess
|
||||
from multiprocessing import Process
|
||||
import threading
|
||||
import time
|
||||
import inspect
|
||||
import ctypes
|
||||
|
||||
class TDTestCase:
|
||||
|
||||
def init(self, conn, logSql, replicaVar=1):
|
||||
tdLog.debug(f"start to excute {__file__}")
|
||||
self.TDDnodes = None
|
||||
tdSql.init(conn.cursor())
|
||||
self.host = socket.gethostname()
|
||||
|
||||
|
||||
def getBuildPath(self):
|
||||
selfPath = os.path.dirname(os.path.realpath(__file__))
|
||||
|
||||
if ("community" in selfPath):
|
||||
projPath = selfPath[:selfPath.find("community")]
|
||||
else:
|
||||
projPath = selfPath[:selfPath.find("tests")]
|
||||
|
||||
for root, dirs, files in os.walk(projPath):
|
||||
if ("taosd" in files):
|
||||
rootRealPath = os.path.dirname(os.path.realpath(root))
|
||||
if ("packaging" not in rootRealPath):
|
||||
buildPath = root[:len(root) - len("/build/bin")]
|
||||
break
|
||||
return buildPath
|
||||
|
||||
def _async_raise(self, tid, exctype):
|
||||
"""raises the exception, performs cleanup if needed"""
|
||||
if not inspect.isclass(exctype):
|
||||
exctype = type(exctype)
|
||||
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype))
|
||||
if res == 0:
|
||||
raise ValueError("invalid thread id")
|
||||
elif res != 1:
|
||||
# """if it returns a number greater than one, you're in trouble,
|
||||
# and you should call it again with exc=NULL to revert the effect"""
|
||||
ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None)
|
||||
raise SystemError("PyThreadState_SetAsyncExc failed")
|
||||
|
||||
def stopThread(self,thread):
|
||||
self._async_raise(thread.ident, SystemExit)
|
||||
|
||||
|
||||
def fiveDnodeThreeMnode(self,dnodeNumbers,mnodeNums,restartNumbers,stopRole):
|
||||
tdLog.printNoPrefix("======== test case 1: ")
|
||||
paraDict = {'dbName': 'db0_0',
|
||||
'dropFlag': 1,
|
||||
'event': '',
|
||||
'vgroups': 4,
|
||||
'replica': 1,
|
||||
'stbName': 'stb',
|
||||
'stbNumbers': 2,
|
||||
'colPrefix': 'c',
|
||||
'tagPrefix': 't',
|
||||
'colSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
|
||||
'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
|
||||
'ctbPrefix': 'ctb',
|
||||
'ctbNum': 200,
|
||||
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||
"rowsPerTbl": 1000,
|
||||
"batchNum": 5000
|
||||
}
|
||||
|
||||
dnodeNumbers=int(dnodeNumbers)
|
||||
mnodeNums=int(mnodeNums)
|
||||
vnodeNumbers = int(dnodeNumbers-mnodeNums)
|
||||
allctbNumbers=(paraDict['stbNumbers']*paraDict["ctbNum"])
|
||||
rowsPerStb=paraDict["ctbNum"]*paraDict["rowsPerTbl"]
|
||||
rowsall=rowsPerStb*paraDict['stbNumbers']
|
||||
dbNumbers = 1
|
||||
|
||||
tdLog.info("first check dnode and mnode")
|
||||
tdSql.query("select * from information_schema.ins_dnodes;")
|
||||
tdSql.checkData(0,1,'%s:6030'%self.host)
|
||||
tdSql.checkData(4,1,'%s:6430'%self.host)
|
||||
clusterComCheck.checkDnodes(dnodeNumbers)
|
||||
|
||||
#check mnode status
|
||||
tdLog.info("check mnode status")
|
||||
clusterComCheck.checkMnodeStatus(mnodeNums)
|
||||
|
||||
# add some error operations and
|
||||
tdLog.info("Confirm the status of the dnode again")
|
||||
tdSql.error("create mnode on dnode 2")
|
||||
tdSql.query("select * from information_schema.ins_dnodes;")
|
||||
print(tdSql.queryResult)
|
||||
clusterComCheck.checkDnodes(dnodeNumbers)
|
||||
|
||||
# create database and stable
|
||||
clusterComCreate.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], paraDict["vgroups"],paraDict['replica'])
|
||||
tdLog.info("Take turns stopping Mnodes ")
|
||||
|
||||
tdDnodes=cluster.dnodes
|
||||
stopcount =0
|
||||
threads=[]
|
||||
|
||||
# create stable:stb_0
|
||||
stableName= paraDict['stbName']
|
||||
newTdSql=tdCom.newTdSql()
|
||||
clusterComCreate.create_stables(newTdSql, paraDict["dbName"],stableName,paraDict['stbNumbers'])
|
||||
#create child table:ctb_0
|
||||
for i in range(paraDict['stbNumbers']):
|
||||
stableName= '%s_%d'%(paraDict['stbName'],i)
|
||||
newTdSql=tdCom.newTdSql()
|
||||
clusterComCreate.create_ctable(newTdSql, paraDict["dbName"],stableName,stableName, paraDict['ctbNum'])
|
||||
#insert date
|
||||
for i in range(paraDict['stbNumbers']):
|
||||
stableName= '%s_%d'%(paraDict['stbName'],i)
|
||||
newTdSql=tdCom.newTdSql()
|
||||
threads.append(threading.Thread(target=clusterComCreate.insert_data, args=(newTdSql, paraDict["dbName"],stableName,paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"])))
|
||||
for tr in threads:
|
||||
tr.start()
|
||||
for tr in threads:
|
||||
tr.join()
|
||||
|
||||
while stopcount < restartNumbers:
|
||||
tdLog.info(" restart loop: %d"%stopcount )
|
||||
if stopRole == "mnode":
|
||||
for i in range(mnodeNums):
|
||||
tdDnodes[i].stoptaosd()
|
||||
# sleep(10)
|
||||
tdDnodes[i].starttaosd()
|
||||
# sleep(10)
|
||||
elif stopRole == "vnode":
|
||||
for i in range(vnodeNumbers):
|
||||
tdDnodes[i+mnodeNums].stoptaosd()
|
||||
# sleep(10)
|
||||
tdDnodes[i+mnodeNums].starttaosd()
|
||||
# sleep(10)
|
||||
elif stopRole == "dnode":
|
||||
for i in range(dnodeNumbers):
|
||||
if i == 0 :
|
||||
stableName= '%s_%d'%(paraDict['stbName'],0)
|
||||
newTdSql=tdCom.newTdSql()
|
||||
# newTdSql.execute('alter database db0_0 replica 3')
|
||||
clusterComCreate.alterStbMetaData(newTdSql, paraDict["dbName"],stableName,paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"])
|
||||
tdDnodes[i].stoptaosd()
|
||||
clusterComCheck.checkDbRows(dbNumbers)
|
||||
# sleep(10)
|
||||
tdDnodes[i].starttaosd()
|
||||
if i == 3 :
|
||||
TdSqlEx=tdCom.newTdSql()
|
||||
tdLog.info("alter database db0_0 replica 3")
|
||||
TdSqlEx.execute('alter database db0_0 replica 3')
|
||||
|
||||
|
||||
# dnodeNumbers don't include database of schema
|
||||
if clusterComCheck.checkDnodes(dnodeNumbers):
|
||||
tdLog.info("123")
|
||||
else:
|
||||
print("456")
|
||||
|
||||
self.stopThread(threads)
|
||||
tdLog.exit("one or more of dnodes failed to start ")
|
||||
# self.check3mnode()
|
||||
stopcount+=1
|
||||
|
||||
|
||||
clusterComCheck.checkDnodes(dnodeNumbers)
|
||||
clusterComCheck.checkDbRows(dbNumbers)
|
||||
# clusterComCheck.checkDb(dbNumbers,1,paraDict["dbName"])
|
||||
|
||||
# tdSql.execute("use %s" %(paraDict["dbName"]))
|
||||
tdSql.query("show %s.stables"%(paraDict["dbName"]))
|
||||
tdSql.checkRows(paraDict["stbNumbers"])
|
||||
for i in range(paraDict['stbNumbers']):
|
||||
stableName= '%s.%s_%d'%(paraDict["dbName"],paraDict['stbName'],i)
|
||||
tdSql.query("select count(*) from %s"%stableName)
|
||||
if i == 0 :
|
||||
tdSql.checkData(0,0,rowsPerStb*2)
|
||||
else:
|
||||
tdSql.checkData(0,0,rowsPerStb)
|
||||
clusterComCheck.check_vgroups_status(vgroup_numbers=paraDict["vgroups"],db_replica=3,db_name=paraDict["dbName"],count_number=150)
|
||||
def run(self):
|
||||
# print(self.master_dnode.cfgDict)
|
||||
self.fiveDnodeThreeMnode(dnodeNumbers=6,mnodeNums=3,restartNumbers=1,stopRole='dnode')
|
||||
|
||||
def stop(self):
|
||||
tdSql.close()
|
||||
tdLog.success(f"{__file__} successfully executed")
|
||||
|
||||
tdCases.addLinux(__file__, TDTestCase())
|
||||
tdCases.addWindows(__file__, TDTestCase())
|
|
@ -0,0 +1,206 @@
|
|||
import taos
|
||||
import sys
|
||||
import time
|
||||
import os
|
||||
|
||||
from util.log import *
|
||||
from util.sql import *
|
||||
from util.cases import *
|
||||
from util.dnodes import TDDnodes
|
||||
from util.dnodes import TDDnode
|
||||
from util.cluster import *
|
||||
sys.path.append("./6-cluster")
|
||||
from clusterCommonCreate import *
|
||||
from clusterCommonCheck import clusterComCheck
|
||||
|
||||
import time
|
||||
import socket
|
||||
import subprocess
|
||||
from multiprocessing import Process
|
||||
import threading
|
||||
import time
|
||||
import inspect
|
||||
import ctypes
|
||||
|
||||
class TDTestCase:
|
||||
|
||||
def init(self, conn, logSql, replicaVar=1):
|
||||
tdLog.debug(f"start to excute {__file__}")
|
||||
self.TDDnodes = None
|
||||
tdSql.init(conn.cursor())
|
||||
self.host = socket.gethostname()
|
||||
|
||||
|
||||
def getBuildPath(self):
|
||||
selfPath = os.path.dirname(os.path.realpath(__file__))
|
||||
|
||||
if ("community" in selfPath):
|
||||
projPath = selfPath[:selfPath.find("community")]
|
||||
else:
|
||||
projPath = selfPath[:selfPath.find("tests")]
|
||||
|
||||
for root, dirs, files in os.walk(projPath):
|
||||
if ("taosd" in files):
|
||||
rootRealPath = os.path.dirname(os.path.realpath(root))
|
||||
if ("packaging" not in rootRealPath):
|
||||
buildPath = root[:len(root) - len("/build/bin")]
|
||||
break
|
||||
return buildPath
|
||||
|
||||
def _async_raise(self, tid, exctype):
|
||||
"""raises the exception, performs cleanup if needed"""
|
||||
if not inspect.isclass(exctype):
|
||||
exctype = type(exctype)
|
||||
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype))
|
||||
if res == 0:
|
||||
raise ValueError("invalid thread id")
|
||||
elif res != 1:
|
||||
# """if it returns a number greater than one, you're in trouble,
|
||||
# and you should call it again with exc=NULL to revert the effect"""
|
||||
ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None)
|
||||
raise SystemError("PyThreadState_SetAsyncExc failed")
|
||||
|
||||
def stopThread(self,thread):
|
||||
self._async_raise(thread.ident, SystemExit)
|
||||
|
||||
|
||||
def fiveDnodeThreeMnode(self,dnodeNumbers,mnodeNums,restartNumbers,stopRole):
|
||||
tdLog.printNoPrefix("======== test case 1: ")
|
||||
paraDict = {'dbName': 'db0_0',
|
||||
'dropFlag': 1,
|
||||
'event': '',
|
||||
'vgroups': 4,
|
||||
'replica': 3,
|
||||
'stbName': 'stb',
|
||||
'stbNumbers': 2,
|
||||
'colPrefix': 'c',
|
||||
'tagPrefix': 't',
|
||||
'colSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
|
||||
'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
|
||||
'ctbPrefix': 'ctb',
|
||||
'ctbNum': 200,
|
||||
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||
"rowsPerTbl": 1000,
|
||||
"batchNum": 5000
|
||||
}
|
||||
|
||||
dnodeNumbers=int(dnodeNumbers)
|
||||
mnodeNums=int(mnodeNums)
|
||||
vnodeNumbers = int(dnodeNumbers-mnodeNums)
|
||||
allctbNumbers=(paraDict['stbNumbers']*paraDict["ctbNum"])
|
||||
rowsPerStb=paraDict["ctbNum"]*paraDict["rowsPerTbl"]
|
||||
rowsall=rowsPerStb*paraDict['stbNumbers']
|
||||
dbNumbers = 1
|
||||
|
||||
tdLog.info("first check dnode and mnode")
|
||||
tdSql.query("select * from information_schema.ins_dnodes;")
|
||||
tdSql.checkData(0,1,'%s:6030'%self.host)
|
||||
tdSql.checkData(4,1,'%s:6430'%self.host)
|
||||
clusterComCheck.checkDnodes(dnodeNumbers)
|
||||
|
||||
#check mnode status
|
||||
tdLog.info("check mnode status")
|
||||
clusterComCheck.checkMnodeStatus(mnodeNums)
|
||||
|
||||
# add some error operations and
|
||||
tdLog.info("Confirm the status of the dnode again")
|
||||
tdSql.error("create mnode on dnode 2")
|
||||
tdSql.query("select * from information_schema.ins_dnodes;")
|
||||
print(tdSql.queryResult)
|
||||
clusterComCheck.checkDnodes(dnodeNumbers)
|
||||
|
||||
# create database and stable
|
||||
clusterComCreate.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], paraDict["vgroups"],paraDict['replica'])
|
||||
tdLog.info("Take turns stopping Mnodes ")
|
||||
|
||||
tdDnodes=cluster.dnodes
|
||||
stopcount =0
|
||||
threads=[]
|
||||
|
||||
# create stable:stb_0
|
||||
stableName= paraDict['stbName']
|
||||
newTdSql=tdCom.newTdSql()
|
||||
clusterComCreate.create_stables(newTdSql, paraDict["dbName"],stableName,paraDict['stbNumbers'])
|
||||
#create child table:ctb_0
|
||||
for i in range(paraDict['stbNumbers']):
|
||||
stableName= '%s_%d'%(paraDict['stbName'],i)
|
||||
newTdSql=tdCom.newTdSql()
|
||||
clusterComCreate.create_ctable(newTdSql, paraDict["dbName"],stableName,stableName, paraDict['ctbNum'])
|
||||
#insert date
|
||||
for i in range(paraDict['stbNumbers']):
|
||||
stableName= '%s_%d'%(paraDict['stbName'],i)
|
||||
newTdSql=tdCom.newTdSql()
|
||||
threads.append(threading.Thread(target=clusterComCreate.insert_data, args=(newTdSql, paraDict["dbName"],stableName,paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"])))
|
||||
for tr in threads:
|
||||
tr.start()
|
||||
for tr in threads:
|
||||
tr.join()
|
||||
|
||||
while stopcount < restartNumbers:
|
||||
tdLog.info(" restart loop: %d"%stopcount )
|
||||
if stopRole == "mnode":
|
||||
for i in range(mnodeNums):
|
||||
tdDnodes[i].stoptaosd()
|
||||
# sleep(10)
|
||||
tdDnodes[i].starttaosd()
|
||||
# sleep(10)
|
||||
elif stopRole == "vnode":
|
||||
for i in range(vnodeNumbers):
|
||||
tdDnodes[i+mnodeNums].stoptaosd()
|
||||
# sleep(10)
|
||||
tdDnodes[i+mnodeNums].starttaosd()
|
||||
# sleep(10)
|
||||
elif stopRole == "dnode":
|
||||
for i in range(dnodeNumbers):
|
||||
tdDnodes[i].stoptaosd()
|
||||
clusterComCheck.checkDbRows(dbNumbers)
|
||||
if i == 0 :
|
||||
stableName= '%s_%d'%(paraDict['stbName'],0)
|
||||
newTdSql=tdCom.newTdSql()
|
||||
# newTdSql.execute('alter database db0_0 replica 3')
|
||||
clusterComCreate.alterStbMetaData(newTdSql, paraDict["dbName"],stableName,paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"])
|
||||
# sleep(10)
|
||||
tdDnodes[i].starttaosd()
|
||||
if i == 3 :
|
||||
TdSqlEx=tdCom.newTdSql()
|
||||
tdLog.info("alter database db0_0 replica 1")
|
||||
TdSqlEx.execute('alter database db0_0 replica 1')
|
||||
|
||||
|
||||
# dnodeNumbers don't include database of schema
|
||||
if clusterComCheck.checkDnodes(dnodeNumbers):
|
||||
tdLog.info("123")
|
||||
else:
|
||||
print("456")
|
||||
|
||||
self.stopThread(threads)
|
||||
tdLog.exit("one or more of dnodes failed to start ")
|
||||
# self.check3mnode()
|
||||
stopcount+=1
|
||||
|
||||
|
||||
clusterComCheck.checkDnodes(dnodeNumbers)
|
||||
clusterComCheck.checkDbRows(dbNumbers)
|
||||
# clusterComCheck.checkDb(dbNumbers,1,paraDict["dbName"])
|
||||
|
||||
# tdSql.execute("use %s" %(paraDict["dbName"]))
|
||||
tdSql.query("show %s.stables"%(paraDict["dbName"]))
|
||||
tdSql.checkRows(paraDict["stbNumbers"])
|
||||
for i in range(paraDict['stbNumbers']):
|
||||
stableName= '%s.%s_%d'%(paraDict["dbName"],paraDict['stbName'],i)
|
||||
tdSql.query("select count(*) from %s"%stableName)
|
||||
if i == 0 :
|
||||
tdSql.checkData(0,0,rowsPerStb*2)
|
||||
else:
|
||||
tdSql.checkData(0,0,rowsPerStb)
|
||||
clusterComCheck.check_vgroups_status(vgroup_numbers=paraDict["vgroups"],db_replica=1,db_name=paraDict["dbName"],count_number=150)
|
||||
def run(self):
|
||||
# print(self.master_dnode.cfgDict)
|
||||
self.fiveDnodeThreeMnode(dnodeNumbers=6,mnodeNums=3,restartNumbers=1,stopRole='dnode')
|
||||
|
||||
def stop(self):
|
||||
tdSql.close()
|
||||
tdLog.success(f"{__file__} successfully executed")
|
||||
|
||||
tdCases.addLinux(__file__, TDTestCase())
|
||||
tdCases.addWindows(__file__, TDTestCase())
|
|
@ -0,0 +1,222 @@
|
|||
import taos
|
||||
import sys
|
||||
import time
|
||||
import os
|
||||
|
||||
from util.log import *
|
||||
from util.sql import *
|
||||
from util.cases import *
|
||||
from util.dnodes import TDDnodes
|
||||
from util.dnodes import TDDnode
|
||||
from util.cluster import *
|
||||
sys.path.append("./6-cluster")
|
||||
from clusterCommonCreate import *
|
||||
from clusterCommonCheck import clusterComCheck
|
||||
|
||||
import time
|
||||
import socket
|
||||
import subprocess
|
||||
from multiprocessing import Process
|
||||
import threading
|
||||
import time
|
||||
import inspect
|
||||
import ctypes
|
||||
|
||||
class TDTestCase:
|
||||
|
||||
def init(self, conn, logSql, replicaVar=1):
|
||||
tdLog.debug(f"start to excute {__file__}")
|
||||
self.TDDnodes = None
|
||||
tdSql.init(conn.cursor())
|
||||
self.host = socket.gethostname()
|
||||
|
||||
|
||||
def getBuildPath(self):
|
||||
selfPath = os.path.dirname(os.path.realpath(__file__))
|
||||
|
||||
if ("community" in selfPath):
|
||||
projPath = selfPath[:selfPath.find("community")]
|
||||
else:
|
||||
projPath = selfPath[:selfPath.find("tests")]
|
||||
|
||||
for root, dirs, files in os.walk(projPath):
|
||||
if ("taosd" in files):
|
||||
rootRealPath = os.path.dirname(os.path.realpath(root))
|
||||
if ("packaging" not in rootRealPath):
|
||||
buildPath = root[:len(root) - len("/build/bin")]
|
||||
break
|
||||
return buildPath
|
||||
|
||||
def _async_raise(self, tid, exctype):
|
||||
"""raises the exception, performs cleanup if needed"""
|
||||
if not inspect.isclass(exctype):
|
||||
exctype = type(exctype)
|
||||
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype))
|
||||
if res == 0:
|
||||
raise ValueError("invalid thread id")
|
||||
elif res != 1:
|
||||
# """if it returns a number greater than one, you're in trouble,
|
||||
# and you should call it again with exc=NULL to revert the effect"""
|
||||
ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None)
|
||||
raise SystemError("PyThreadState_SetAsyncExc failed")
|
||||
|
||||
def stopThread(self,thread):
|
||||
self._async_raise(thread.ident, SystemExit)
|
||||
|
||||
|
||||
def insertData(self,countstart,countstop):
|
||||
# fisrt add data : db\stable\childtable\general table
|
||||
|
||||
for couti in range(countstart,countstop):
|
||||
tdLog.debug("drop database if exists db%d" %couti)
|
||||
tdSql.execute("drop database if exists db%d" %couti)
|
||||
print("create database if not exists db%d replica 1 duration 300" %couti)
|
||||
tdSql.execute("create database if not exists db%d replica 1 duration 300" %couti)
|
||||
tdSql.execute("use db%d" %couti)
|
||||
tdSql.execute(
|
||||
'''create table stb1
|
||||
(ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp)
|
||||
tags (t1 int)
|
||||
'''
|
||||
)
|
||||
tdSql.execute(
|
||||
'''
|
||||
create table t1
|
||||
(ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp)
|
||||
'''
|
||||
)
|
||||
for i in range(4):
|
||||
tdSql.execute(f'create table ct{i+1} using stb1 tags ( {i+1} )')
|
||||
|
||||
|
||||
def fiveDnodeThreeMnode(self,dnodeNumbers,mnodeNums,restartNumbers,stopRole):
|
||||
tdLog.printNoPrefix("======== test case 1: ")
|
||||
paraDict = {'dbName': 'db0_0',
|
||||
'dropFlag': 1,
|
||||
'event': '',
|
||||
'vgroups': 4,
|
||||
'replica': 1,
|
||||
'stbName': 'stb',
|
||||
'stbNumbers': 2,
|
||||
'colPrefix': 'c',
|
||||
'tagPrefix': 't',
|
||||
'colSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
|
||||
'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
|
||||
'ctbPrefix': 'ctb',
|
||||
'ctbNum': 1000,
|
||||
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||
"rowsPerTbl": 100,
|
||||
"batchNum": 5000
|
||||
}
|
||||
|
||||
dnodeNumbers = int(dnodeNumbers)
|
||||
mnodeNums = int(mnodeNums)
|
||||
vnodeNumbers = int(dnodeNumbers-mnodeNums)
|
||||
allctbNumbers = (paraDict['stbNumbers']*paraDict["ctbNum"])
|
||||
rowsPerStb = paraDict["ctbNum"]*paraDict["rowsPerTbl"]
|
||||
rowsall = rowsPerStb*paraDict['stbNumbers']
|
||||
dbNumbers = 1
|
||||
replica3 = 3
|
||||
tdLog.info("first check dnode and mnode")
|
||||
tdSql.query("select * from information_schema.ins_dnodes;")
|
||||
tdSql.checkData(0,1,'%s:6030'%self.host)
|
||||
tdSql.checkData(4,1,'%s:6430'%self.host)
|
||||
clusterComCheck.checkDnodes(dnodeNumbers)
|
||||
|
||||
#check mnode status
|
||||
tdLog.info("check mnode status")
|
||||
clusterComCheck.checkMnodeStatus(mnodeNums)
|
||||
|
||||
# add some error operations and
|
||||
tdLog.info("Confirm the status of the dnode again")
|
||||
tdSql.error("create mnode on dnode 2")
|
||||
tdSql.query("select * from information_schema.ins_dnodes;")
|
||||
print(tdSql.queryResult)
|
||||
clusterComCheck.checkDnodes(dnodeNumbers)
|
||||
|
||||
# create database and stable
|
||||
clusterComCreate.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], paraDict["vgroups"],paraDict['replica'])
|
||||
tdLog.info("Take turns stopping Mnodes ")
|
||||
|
||||
tdDnodes=cluster.dnodes
|
||||
stopcount =0
|
||||
threads=[]
|
||||
|
||||
# create stable:stb_0
|
||||
stableName= paraDict['stbName']
|
||||
newTdSql=tdCom.newTdSql()
|
||||
clusterComCreate.create_stables(newTdSql, paraDict["dbName"],stableName,paraDict['stbNumbers'])
|
||||
#create child table:ctb_0
|
||||
for i in range(paraDict['stbNumbers']):
|
||||
stableName= '%s_%d'%(paraDict['stbName'],i)
|
||||
newTdSql=tdCom.newTdSql()
|
||||
clusterComCreate.create_ctable(newTdSql, paraDict["dbName"],stableName,stableName, paraDict['ctbNum'])
|
||||
#insert date
|
||||
for i in range(paraDict['stbNumbers']):
|
||||
stableName= '%s_%d'%(paraDict['stbName'],i)
|
||||
newTdSql=tdCom.newTdSql()
|
||||
threads.append(threading.Thread(target=clusterComCreate.insert_data, args=(newTdSql, paraDict["dbName"],stableName,paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"])))
|
||||
for tr in threads:
|
||||
tr.start()
|
||||
TdSqlEx=tdCom.newTdSql()
|
||||
tdLog.info("alter database db0_0 replica 3")
|
||||
TdSqlEx.execute('alter database db0_0 replica 3')
|
||||
while stopcount < restartNumbers:
|
||||
tdLog.info(" restart loop: %d"%stopcount )
|
||||
if stopRole == "mnode":
|
||||
for i in range(mnodeNums):
|
||||
tdDnodes[i].stoptaosd()
|
||||
# sleep(10)
|
||||
tdDnodes[i].starttaosd()
|
||||
# sleep(10)
|
||||
elif stopRole == "vnode":
|
||||
for i in range(vnodeNumbers):
|
||||
tdDnodes[i+mnodeNums].stoptaosd()
|
||||
# sleep(10)
|
||||
tdDnodes[i+mnodeNums].starttaosd()
|
||||
# sleep(10)
|
||||
elif stopRole == "dnode":
|
||||
for i in range(dnodeNumbers):
|
||||
tdDnodes[i].stoptaosd()
|
||||
# tdLog.info('select cast(c2 as nchar(10)) from db0_0.stb_1;')
|
||||
# TdSqlEx.execute('select cast(c2 as nchar(10)) from db0_0.stb_1;')
|
||||
# tdLog.info('select avg(c1) from db0_0.stb_0 interval(10s);')
|
||||
# TdSqlEx.execute('select avg(c1) from db0_0.stb_0 interval(10s);')
|
||||
# sleep(10)
|
||||
tdDnodes[i].starttaosd()
|
||||
# sleep(10)
|
||||
# dnodeNumbers don't include database of schema
|
||||
if clusterComCheck.checkDnodes(dnodeNumbers):
|
||||
tdLog.info("123")
|
||||
else:
|
||||
print("456")
|
||||
|
||||
self.stopThread(threads)
|
||||
tdLog.exit("one or more of dnodes failed to start ")
|
||||
# self.check3mnode()
|
||||
stopcount+=1
|
||||
|
||||
for tr in threads:
|
||||
tr.join()
|
||||
clusterComCheck.checkDnodes(dnodeNumbers)
|
||||
clusterComCheck.checkDbRows(dbNumbers)
|
||||
# clusterComCheck.checkDb(dbNumbers,1,paraDict["dbName"])
|
||||
|
||||
# tdSql.execute("use %s" %(paraDict["dbName"]))
|
||||
tdSql.query("show %s.stables"%(paraDict["dbName"]))
|
||||
tdSql.checkRows(paraDict["stbNumbers"])
|
||||
# for i in range(paraDict['stbNumbers']):
|
||||
# stableName= '%s.%s_%d'%(paraDict["dbName"],paraDict['stbName'],i)
|
||||
# tdSql.query("select count(*) from %s"%stableName)
|
||||
# tdSql.checkData(0,0,rowsPerStb)
|
||||
clusterComCheck.check_vgroups_status(vgroup_numbers=paraDict["vgroups"],db_replica=replica3,db_name=paraDict["dbName"],count_number=240)
|
||||
def run(self):
|
||||
# print(self.master_dnode.cfgDict)
|
||||
self.fiveDnodeThreeMnode(dnodeNumbers=6,mnodeNums=3,restartNumbers=4,stopRole='dnode')
|
||||
|
||||
def stop(self):
|
||||
tdSql.close()
|
||||
tdLog.success(f"{__file__} successfully executed")
|
||||
|
||||
tdCases.addLinux(__file__, TDTestCase())
|
||||
tdCases.addWindows(__file__, TDTestCase())
|
|
@ -0,0 +1,196 @@
|
|||
import taos
|
||||
import sys
|
||||
import time
|
||||
import os
|
||||
|
||||
from util.log import *
|
||||
from util.sql import *
|
||||
from util.cases import *
|
||||
from util.dnodes import TDDnodes
|
||||
from util.dnodes import TDDnode
|
||||
from util.cluster import *
|
||||
sys.path.append("./6-cluster")
|
||||
from clusterCommonCreate import *
|
||||
from clusterCommonCheck import clusterComCheck
|
||||
|
||||
import time
|
||||
import socket
|
||||
import subprocess
|
||||
from multiprocessing import Process
|
||||
import threading
|
||||
import time
|
||||
import inspect
|
||||
import ctypes
|
||||
|
||||
class TDTestCase:
|
||||
|
||||
def init(self, conn, logSql, replicaVar=1):
|
||||
tdLog.debug(f"start to excute {__file__}")
|
||||
self.TDDnodes = None
|
||||
tdSql.init(conn.cursor())
|
||||
self.host = socket.gethostname()
|
||||
|
||||
|
||||
def getBuildPath(self):
|
||||
selfPath = os.path.dirname(os.path.realpath(__file__))
|
||||
|
||||
if ("community" in selfPath):
|
||||
projPath = selfPath[:selfPath.find("community")]
|
||||
else:
|
||||
projPath = selfPath[:selfPath.find("tests")]
|
||||
|
||||
for root, dirs, files in os.walk(projPath):
|
||||
if ("taosd" in files):
|
||||
rootRealPath = os.path.dirname(os.path.realpath(root))
|
||||
if ("packaging" not in rootRealPath):
|
||||
buildPath = root[:len(root) - len("/build/bin")]
|
||||
break
|
||||
return buildPath
|
||||
|
||||
def _async_raise(self, tid, exctype):
|
||||
"""raises the exception, performs cleanup if needed"""
|
||||
if not inspect.isclass(exctype):
|
||||
exctype = type(exctype)
|
||||
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype))
|
||||
if res == 0:
|
||||
raise ValueError("invalid thread id")
|
||||
elif res != 1:
|
||||
# """if it returns a number greater than one, you're in trouble,
|
||||
# and you should call it again with exc=NULL to revert the effect"""
|
||||
ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None)
|
||||
raise SystemError("PyThreadState_SetAsyncExc failed")
|
||||
|
||||
def stopThread(self,thread):
|
||||
self._async_raise(thread.ident, SystemExit)
|
||||
|
||||
|
||||
def insertData(self,countstart,countstop):
|
||||
# fisrt add data : db\stable\childtable\general table
|
||||
|
||||
for couti in range(countstart,countstop):
|
||||
tdLog.debug("drop database if exists db%d" %couti)
|
||||
tdSql.execute("drop database if exists db%d" %couti)
|
||||
print("create database if not exists db%d replica 1 duration 300" %couti)
|
||||
tdSql.execute("create database if not exists db%d replica 1 duration 300" %couti)
|
||||
tdSql.execute("use db%d" %couti)
|
||||
tdSql.execute(
|
||||
'''create table stb1
|
||||
(ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp)
|
||||
tags (t1 int)
|
||||
'''
|
||||
)
|
||||
tdSql.execute(
|
||||
'''
|
||||
create table t1
|
||||
(ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp)
|
||||
'''
|
||||
)
|
||||
for i in range(4):
|
||||
tdSql.execute(f'create table ct{i+1} using stb1 tags ( {i+1} )')
|
||||
|
||||
|
||||
def fiveDnodeThreeMnode(self,dnodeNumbers,mnodeNums,restartNumbers,stopRole):
|
||||
tdLog.printNoPrefix("======== test case 1: ")
|
||||
paraDict = {'dbName': 'db0_0',
|
||||
'dropFlag': 1,
|
||||
'event': '',
|
||||
'vgroups': 4,
|
||||
'replica': 3,
|
||||
'stbName': 'stb',
|
||||
'stbNumbers': 2,
|
||||
'colPrefix': 'c',
|
||||
'tagPrefix': 't',
|
||||
'colSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
|
||||
'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
|
||||
'ctbPrefix': 'ctb',
|
||||
'ctbNum': 1,
|
||||
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||
"rowsPerTbl": 1,
|
||||
"batchNum": 5000
|
||||
}
|
||||
|
||||
dnodeNumbers=int(dnodeNumbers)
|
||||
mnodeNums=int(mnodeNums)
|
||||
vnodeNumbers = int(dnodeNumbers-mnodeNums)
|
||||
replica1 = 1
|
||||
replica3 = 3
|
||||
allctbNumbers=(paraDict['stbNumbers']*paraDict["ctbNum"])
|
||||
rowsPerStb=paraDict["ctbNum"]*paraDict["rowsPerTbl"]
|
||||
rowsall=rowsPerStb*paraDict['stbNumbers']
|
||||
dbNumbers = 1
|
||||
|
||||
tdLog.info("first check dnode and mnode")
|
||||
tdSql.query("select * from information_schema.ins_dnodes;")
|
||||
tdSql.checkData(0,1,'%s:6030'%self.host)
|
||||
tdSql.checkData(4,1,'%s:6430'%self.host)
|
||||
clusterComCheck.checkDnodes(dnodeNumbers)
|
||||
|
||||
#check mnode status
|
||||
tdLog.info("check mnode status")
|
||||
clusterComCheck.checkMnodeStatus(mnodeNums)
|
||||
|
||||
# add some error operations and
|
||||
tdLog.info("Confirm the status of the dnode again")
|
||||
tdSql.error("create mnode on dnode 2")
|
||||
tdSql.query("select * from information_schema.ins_dnodes;")
|
||||
print(tdSql.queryResult)
|
||||
clusterComCheck.checkDnodes(dnodeNumbers)
|
||||
|
||||
# create database and stable
|
||||
clusterComCreate.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], paraDict["vgroups"],paraDict['replica'])
|
||||
tdLog.info("Take turns stopping Mnodes ")
|
||||
|
||||
tdDnodes=cluster.dnodes
|
||||
stopcount =0
|
||||
threads=[]
|
||||
|
||||
# create stable:stb_0
|
||||
stableName= paraDict['stbName']
|
||||
newTdSql=tdCom.newTdSql()
|
||||
clusterComCreate.create_stables(newTdSql, paraDict["dbName"],stableName,paraDict['stbNumbers'])
|
||||
#create child table:ctb_0
|
||||
for i in range(paraDict['stbNumbers']):
|
||||
stableName= '%s_%d'%(paraDict['stbName'],i)
|
||||
newTdSql=tdCom.newTdSql()
|
||||
clusterComCreate.create_ctable(newTdSql, paraDict["dbName"],stableName,stableName, paraDict['ctbNum'])
|
||||
#insert date
|
||||
for i in range(paraDict['stbNumbers']):
|
||||
stableName= '%s_%d'%(paraDict['stbName'],i)
|
||||
newTdSql=tdCom.newTdSql()
|
||||
threads.append(threading.Thread(target=clusterComCreate.insert_data, args=(newTdSql, paraDict["dbName"],stableName,paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"])))
|
||||
for tr in threads:
|
||||
tr.start()
|
||||
TdSqlEx=tdCom.newTdSql()
|
||||
tdLog.info(f"alter database db0_0 replica {replica1}")
|
||||
TdSqlEx.execute(f'alter database db0_0 replica {replica1}')
|
||||
for tr in threads:
|
||||
tr.join()
|
||||
clusterComCheck.checkDnodes(dnodeNumbers)
|
||||
clusterComCheck.checkDbRows(dbNumbers)
|
||||
# clusterComCheck.checkDb(dbNumbers,1,paraDict["dbName"])
|
||||
|
||||
# tdSql.execute("use %s" %(paraDict["dbName"]))
|
||||
tdSql.query("show %s.stables"%(paraDict["dbName"]))
|
||||
tdSql.checkRows(paraDict["stbNumbers"])
|
||||
for i in range(paraDict['stbNumbers']):
|
||||
stableName= '%s.%s_%d'%(paraDict["dbName"],paraDict['stbName'],i)
|
||||
tdSql.query("select count(*) from %s"%stableName)
|
||||
tdSql.checkData(0,0,rowsPerStb)
|
||||
|
||||
clusterComCheck.check_vgroups_status(vgroup_numbers=paraDict["vgroups"],db_replica=replica1,db_name=paraDict["dbName"],count_number=20)
|
||||
sleep(5)
|
||||
tdLog.info(f"show transactions;alter database db0_0 replica {replica3};")
|
||||
TdSqlEx.execute(f'show transactions;')
|
||||
TdSqlEx.execute(f'alter database db0_0 replica {replica3};')
|
||||
clusterComCheck.check_vgroups_status(vgroup_numbers=paraDict["vgroups"],db_replica=replica3,db_name=paraDict["dbName"],count_number=120)
|
||||
|
||||
def run(self):
|
||||
# print(self.master_dnode.cfgDict)
|
||||
self.fiveDnodeThreeMnode(dnodeNumbers=6,mnodeNums=3,restartNumbers=4,stopRole='dnode')
|
||||
|
||||
def stop(self):
|
||||
tdSql.close()
|
||||
tdLog.success(f"{__file__} successfully executed")
|
||||
|
||||
tdCases.addLinux(__file__, TDTestCase())
|
||||
tdCases.addWindows(__file__, TDTestCase())
|
|
@ -0,0 +1,191 @@
|
|||
import taos
|
||||
import sys
|
||||
import time
|
||||
import os
|
||||
|
||||
from util.log import *
|
||||
from util.sql import *
|
||||
from util.cases import *
|
||||
from util.dnodes import TDDnodes
|
||||
from util.dnodes import TDDnode
|
||||
from util.cluster import *
|
||||
sys.path.append("./6-cluster")
|
||||
from clusterCommonCreate import *
|
||||
from clusterCommonCheck import clusterComCheck
|
||||
|
||||
import time
|
||||
import socket
|
||||
import subprocess
|
||||
from multiprocessing import Process
|
||||
import threading
|
||||
import time
|
||||
import inspect
|
||||
import ctypes
|
||||
|
||||
class TDTestCase:
|
||||
|
||||
def init(self, conn, logSql, replicaVar=1):
|
||||
tdLog.debug(f"start to excute {__file__}")
|
||||
self.TDDnodes = None
|
||||
tdSql.init(conn.cursor())
|
||||
self.host = socket.gethostname()
|
||||
|
||||
|
||||
def getBuildPath(self):
|
||||
selfPath = os.path.dirname(os.path.realpath(__file__))
|
||||
|
||||
if ("community" in selfPath):
|
||||
projPath = selfPath[:selfPath.find("community")]
|
||||
else:
|
||||
projPath = selfPath[:selfPath.find("tests")]
|
||||
|
||||
for root, dirs, files in os.walk(projPath):
|
||||
if ("taosd" in files):
|
||||
rootRealPath = os.path.dirname(os.path.realpath(root))
|
||||
if ("packaging" not in rootRealPath):
|
||||
buildPath = root[:len(root) - len("/build/bin")]
|
||||
break
|
||||
return buildPath
|
||||
|
||||
def _async_raise(self, tid, exctype):
|
||||
"""raises the exception, performs cleanup if needed"""
|
||||
if not inspect.isclass(exctype):
|
||||
exctype = type(exctype)
|
||||
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype))
|
||||
if res == 0:
|
||||
raise ValueError("invalid thread id")
|
||||
elif res != 1:
|
||||
# """if it returns a number greater than one, you're in trouble,
|
||||
# and you should call it again with exc=NULL to revert the effect"""
|
||||
ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None)
|
||||
raise SystemError("PyThreadState_SetAsyncExc failed")
|
||||
|
||||
def stopThread(self,thread):
|
||||
self._async_raise(thread.ident, SystemExit)
|
||||
|
||||
|
||||
def insertData(self,dbname,tableCount,rowsPerCount):
|
||||
# tableCount : create table number
|
||||
# rowsPerCount : rows per table
|
||||
# fisrt add data : db\stable\childtable\general table
|
||||
os.system(f"taosBenchmark -d {dbname} -n {tableCount} -t {rowsPerCount} -z 1 -k 10000 -y ")
|
||||
|
||||
|
||||
def fiveDnodeThreeMnode(self,dnodeNumbers,mnodeNums,restartNumbers,stopRole):
|
||||
tdLog.printNoPrefix("======== test case 1: ")
|
||||
paraDict = {'dbName': 'db0_0',
|
||||
'dropFlag': 1,
|
||||
'event': '',
|
||||
'vgroups': 4,
|
||||
'replica': 1,
|
||||
'stbName': 'stb',
|
||||
'stbNumbers': 2,
|
||||
'colPrefix': 'c',
|
||||
'tagPrefix': 't',
|
||||
'colSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
|
||||
'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
|
||||
'ctbPrefix': 'ctb',
|
||||
'ctbNum': 10000,
|
||||
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||
"rowsPerTbl": 10000,
|
||||
"batchNum": 5000
|
||||
}
|
||||
|
||||
dnodeNumbers=int(dnodeNumbers)
|
||||
mnodeNums=int(mnodeNums)
|
||||
vnodeNumbers = int(dnodeNumbers-mnodeNums)
|
||||
allctbNumbers=(paraDict['stbNumbers']*paraDict["ctbNum"])
|
||||
rowsPerStb=paraDict["ctbNum"]*paraDict["rowsPerTbl"]
|
||||
rowsall=rowsPerStb*paraDict['stbNumbers']
|
||||
dbNumbers = 1
|
||||
|
||||
tdLog.info("first check dnode and mnode")
|
||||
tdSql.query("select * from information_schema.ins_dnodes;")
|
||||
tdSql.checkData(0,1,'%s:6030'%self.host)
|
||||
tdSql.checkData(4,1,'%s:6430'%self.host)
|
||||
clusterComCheck.checkDnodes(dnodeNumbers)
|
||||
|
||||
#check mnode status
|
||||
tdLog.info("check mnode status")
|
||||
clusterComCheck.checkMnodeStatus(mnodeNums)
|
||||
|
||||
# add some error operations and
|
||||
tdLog.info("Confirm the status of the dnode again")
|
||||
tdSql.error("create mnode on dnode 2")
|
||||
tdSql.query("select * from information_schema.ins_dnodes;")
|
||||
print(tdSql.queryResult)
|
||||
clusterComCheck.checkDnodes(dnodeNumbers)
|
||||
|
||||
# create database and stable
|
||||
tdLog.info("Take turns stopping Mnodes ")
|
||||
|
||||
tdDnodes=cluster.dnodes
|
||||
stopcount =0
|
||||
threads=[]
|
||||
|
||||
# create stable:stb_0
|
||||
threads.append(threading.Thread(target=self.insertData, args=(paraDict["dbName"],paraDict["ctbNum"],paraDict["rowsPerTbl"])))
|
||||
for tr in threads:
|
||||
tr.start()
|
||||
TdSqlEx=tdCom.newTdSql()
|
||||
tdLog.info("alter database db0_0 replica 3")
|
||||
TdSqlEx.execute('alter database db0_0 replica 3')
|
||||
while stopcount < restartNumbers:
|
||||
tdLog.info(" restart loop: %d"%stopcount )
|
||||
if stopRole == "mnode":
|
||||
for i in range(mnodeNums):
|
||||
tdDnodes[i].stoptaosd()
|
||||
# sleep(10)
|
||||
tdDnodes[i].starttaosd()
|
||||
# sleep(10)
|
||||
elif stopRole == "vnode":
|
||||
for i in range(vnodeNumbers):
|
||||
tdDnodes[i+mnodeNums].stoptaosd()
|
||||
# sleep(10)
|
||||
tdDnodes[i+mnodeNums].starttaosd()
|
||||
# sleep(10)
|
||||
elif stopRole == "dnode":
|
||||
for i in range(dnodeNumbers):
|
||||
tdDnodes[i].stoptaosd()
|
||||
# tdLog.info('select cast(c2 as nchar(10)) from db0_0.stb_1;')
|
||||
# TdSqlEx.execute('select cast(c2 as nchar(10)) from db0_0.stb_1;')
|
||||
# tdLog.info('select avg(c1) from db0_0.stb_0 interval(10s);')
|
||||
# TdSqlEx.execute('select avg(c1) from db0_0.stb_0 interval(10s);')
|
||||
# sleep(10)
|
||||
tdDnodes[i].starttaosd()
|
||||
# sleep(10)
|
||||
# dnodeNumbers don't include database of schema
|
||||
if clusterComCheck.checkDnodes(dnodeNumbers):
|
||||
tdLog.info("123")
|
||||
else:
|
||||
print("456")
|
||||
|
||||
self.stopThread(threads)
|
||||
tdLog.exit("one or more of dnodes failed to start ")
|
||||
# self.check3mnode()
|
||||
stopcount+=1
|
||||
|
||||
for tr in threads:
|
||||
tr.join()
|
||||
clusterComCheck.checkDnodes(dnodeNumbers)
|
||||
clusterComCheck.checkDbRows(dbNumbers)
|
||||
# clusterComCheck.checkDb(dbNumbers,1,paraDict["dbName"])
|
||||
|
||||
# tdSql.execute("use %s" %(paraDict["dbName"]))
|
||||
tdSql.query("show %s.stables"%(paraDict["dbName"]))
|
||||
tdSql.checkRows(paraDict["stbNumbers"])
|
||||
for i in range(paraDict['stbNumbers']):
|
||||
stableName= '%s.%s_%d'%(paraDict["dbName"],paraDict['stbName'],i)
|
||||
tdSql.query("select count(*) from %s"%stableName)
|
||||
tdSql.checkData(0,0,rowsPerStb)
|
||||
clusterComCheck.check_vgroups_status(vgroup_numbers=paraDict["vgroups"],db_replica=3,db_name=paraDict["dbName"],count_number=240)
|
||||
def run(self):
|
||||
# print(self.master_dnode.cfgDict)
|
||||
self.fiveDnodeThreeMnode(dnodeNumbers=6,mnodeNums=3,restartNumbers=4,stopRole='dnode')
|
||||
|
||||
def stop(self):
|
||||
tdSql.close()
|
||||
tdLog.success(f"{__file__} successfully executed")
|
||||
|
||||
tdCases.addLinux(__file__, TDTestCase())
|
||||
tdCases.addWindows(__file__, TDTestCase())
|
|
@ -336,7 +336,7 @@ class TDTestCase:
|
|||
for i in range(expectRows):
|
||||
totalConsumeRows += resultList[i]
|
||||
|
||||
if totalConsumeRows >= expectrowcnt or totalConsumeRows <= 0:
|
||||
if totalConsumeRows > expectrowcnt or totalConsumeRows <= 0:
|
||||
tdLog.info("act consume rows: %d, expect consume rows between %d and 0"%(totalConsumeRows, expectrowcnt))
|
||||
tdLog.exit("tmq consume rows error!")
|
||||
|
||||
|
|
|
@ -226,12 +226,11 @@ class TDTestCase:
|
|||
self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||
|
||||
tdLog.info("start consume processor")
|
||||
pollDelay = 5
|
||||
pollDelay = 10
|
||||
showMsg = 1
|
||||
showRow = 1
|
||||
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)
|
||||
|
||||
time.sleep(5)
|
||||
self.create_ctables(tdSql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbNum"])
|
||||
self.insert_data(tdSql,\
|
||||
parameterDict["dbName"],\
|
||||
|
@ -307,7 +306,7 @@ class TDTestCase:
|
|||
self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||
|
||||
tdLog.info("start consume processor")
|
||||
pollDelay = 5
|
||||
pollDelay = 10
|
||||
showMsg = 1
|
||||
showRow = 1
|
||||
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)
|
||||
|
|
Loading…
Reference in New Issue