Merge pull request #25598 from taosdata/fix/3_liaohj
refactor(stream): opt stream sink perf.
This commit is contained in:
commit
f2bc93b78d
|
@ -173,6 +173,7 @@ static const SSysDbTableSchema streamSchema[] = {
|
||||||
{.name = "watermark", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false},
|
{.name = "watermark", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false},
|
||||||
{.name = "trigger", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
{.name = "trigger", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||||
{.name = "sink_quota", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
{.name = "sink_quota", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||||
|
{.name = "checkpoint_interval", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||||
{.name = "checkpoint_backup", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
{.name = "checkpoint_backup", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||||
{.name = "history_scan_idle", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
{.name = "history_scan_idle", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
|
||||||
};
|
};
|
||||||
|
|
|
@ -1213,27 +1213,31 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
|
||||||
|
|
||||||
SMDropStreamReq dropReq = {0};
|
SMDropStreamReq dropReq = {0};
|
||||||
if (tDeserializeSMDropStreamReq(pReq->pCont, pReq->contLen, &dropReq) < 0) {
|
if (tDeserializeSMDropStreamReq(pReq->pCont, pReq->contLen, &dropReq) < 0) {
|
||||||
|
mError("invalid drop stream msg recv, discarded");
|
||||||
terrno = TSDB_CODE_INVALID_MSG;
|
terrno = TSDB_CODE_INVALID_MSG;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
pStream = mndAcquireStream(pMnode, dropReq.name);
|
mDebug("recv drop stream:%s msg", dropReq.name);
|
||||||
|
|
||||||
|
pStream = mndAcquireStream(pMnode, dropReq.name);
|
||||||
if (pStream == NULL) {
|
if (pStream == NULL) {
|
||||||
if (dropReq.igNotExists) {
|
if (dropReq.igNotExists) {
|
||||||
mInfo("stream:%s not exist, ignore not exist is set", dropReq.name);
|
mInfo("stream:%s not exist, ignore not exist is set, drop stream exec done with success", dropReq.name);
|
||||||
sdbRelease(pMnode->pSdb, pStream);
|
sdbRelease(pMnode->pSdb, pStream);
|
||||||
tFreeMDropStreamReq(&dropReq);
|
tFreeMDropStreamReq(&dropReq);
|
||||||
return 0;
|
return 0;
|
||||||
} else {
|
} else {
|
||||||
terrno = TSDB_CODE_MND_STREAM_NOT_EXIST;
|
terrno = TSDB_CODE_MND_STREAM_NOT_EXIST;
|
||||||
mError("stream:%s not exist failed to drop", dropReq.name);
|
mError("stream:%s not exist failed to drop it", dropReq.name);
|
||||||
tFreeMDropStreamReq(&dropReq);
|
tFreeMDropStreamReq(&dropReq);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pStream->smaId != 0) {
|
if (pStream->smaId != 0) {
|
||||||
|
mDebug("stream:%s, uid:0x%"PRIx64" try to drop sma related stream", dropReq.name, pStream->uid);
|
||||||
|
|
||||||
void *pIter = NULL;
|
void *pIter = NULL;
|
||||||
SSmaObj *pSma = NULL;
|
SSmaObj *pSma = NULL;
|
||||||
pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void**)&pSma);
|
pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void**)&pSma);
|
||||||
|
@ -1241,13 +1245,21 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
|
||||||
if (pSma && pSma->uid == pStream->smaId) {
|
if (pSma && pSma->uid == pStream->smaId) {
|
||||||
sdbRelease(pMnode->pSdb, pSma);
|
sdbRelease(pMnode->pSdb, pSma);
|
||||||
sdbRelease(pMnode->pSdb, pStream);
|
sdbRelease(pMnode->pSdb, pStream);
|
||||||
|
|
||||||
sdbCancelFetch(pMnode->pSdb, pIter);
|
sdbCancelFetch(pMnode->pSdb, pIter);
|
||||||
tFreeMDropStreamReq(&dropReq);
|
tFreeMDropStreamReq(&dropReq);
|
||||||
terrno = TSDB_CODE_TSMA_MUST_BE_DROPPED;
|
terrno = TSDB_CODE_TSMA_MUST_BE_DROPPED;
|
||||||
|
|
||||||
|
mError("try to drop sma-related stream:%s, uid:0x%" PRIx64 " code:%s only allowed to be dropped along with sma",
|
||||||
|
dropReq.name, pStream->uid, tstrerror(terrno));
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
if (pSma) sdbRelease(pMnode->pSdb, pSma);
|
|
||||||
pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void**)&pSma);
|
if (pSma) {
|
||||||
|
sdbRelease(pMnode->pSdb, pSma);
|
||||||
|
}
|
||||||
|
|
||||||
|
pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void **)&pSma);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1267,7 +1279,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
|
||||||
|
|
||||||
STrans *pTrans = doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_DROP_NAME, "drop stream");
|
STrans *pTrans = doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_DROP_NAME, "drop stream");
|
||||||
if (pTrans == NULL) {
|
if (pTrans == NULL) {
|
||||||
mError("stream:%s, failed to drop since %s", dropReq.name, terrstr());
|
mError("stream:%s uid:0x%"PRIx64" failed to drop since %s", dropReq.name, pStream->uid, terrstr());
|
||||||
sdbRelease(pMnode->pSdb, pStream);
|
sdbRelease(pMnode->pSdb, pStream);
|
||||||
tFreeMDropStreamReq(&dropReq);
|
tFreeMDropStreamReq(&dropReq);
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -1277,7 +1289,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
|
||||||
|
|
||||||
// drop all tasks
|
// drop all tasks
|
||||||
if (mndStreamSetDropAction(pMnode, pTrans, pStream) < 0) {
|
if (mndStreamSetDropAction(pMnode, pTrans, pStream) < 0) {
|
||||||
mError("stream:%s, failed to drop task since %s", dropReq.name, terrstr());
|
mError("stream:%s uid:0x%" PRIx64 " failed to drop task since %s", dropReq.name, pStream->uid, terrstr());
|
||||||
sdbRelease(pMnode->pSdb, pStream);
|
sdbRelease(pMnode->pSdb, pStream);
|
||||||
mndTransDrop(pTrans);
|
mndTransDrop(pTrans);
|
||||||
tFreeMDropStreamReq(&dropReq);
|
tFreeMDropStreamReq(&dropReq);
|
||||||
|
@ -1303,10 +1315,13 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
|
||||||
// kill the related checkpoint trans
|
// kill the related checkpoint trans
|
||||||
int32_t transId = mndStreamGetRelTrans(pMnode, pStream->uid);
|
int32_t transId = mndStreamGetRelTrans(pMnode, pStream->uid);
|
||||||
if (transId != 0) {
|
if (transId != 0) {
|
||||||
mDebug("drop active related transId:%d due to stream:%s dropped", transId, pStream->name);
|
mDebug("drop active transId:%d due to stream:%s uid:0x%" PRIx64 " dropped", transId, pStream->name, pStream->uid);
|
||||||
mndKillTransImpl(pMnode, transId, pStream->sourceDb);
|
mndKillTransImpl(pMnode, transId, pStream->sourceDb);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
mDebug("stream:%s uid:0x%" PRIx64 " transId:%d start to drop related task when dropping stream", dropReq.name,
|
||||||
|
pStream->uid, transId);
|
||||||
|
|
||||||
removeStreamTasksInBuf(pStream, &execInfo);
|
removeStreamTasksInBuf(pStream, &execInfo);
|
||||||
|
|
||||||
SName name = {0};
|
SName name = {0};
|
||||||
|
@ -1488,6 +1503,7 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
colDataSetVal(pColInfo, numOfRows, (const char *)&trigger, false);
|
colDataSetVal(pColInfo, numOfRows, (const char *)&trigger, false);
|
||||||
|
|
||||||
|
// sink_quota
|
||||||
char sinkQuota[20 + VARSTR_HEADER_SIZE] = {0};
|
char sinkQuota[20 + VARSTR_HEADER_SIZE] = {0};
|
||||||
sinkQuota[0] = '0';
|
sinkQuota[0] = '0';
|
||||||
char dstStr[20] = {0};
|
char dstStr[20] = {0};
|
||||||
|
@ -1495,6 +1511,14 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
colDataSetVal(pColInfo, numOfRows, (const char *)dstStr, false);
|
colDataSetVal(pColInfo, numOfRows, (const char *)dstStr, false);
|
||||||
|
|
||||||
|
// checkpoint interval
|
||||||
|
char tmp[20 + VARSTR_HEADER_SIZE] = {0};
|
||||||
|
sprintf(varDataVal(tmp), "%d sec", tsStreamCheckpointInterval);
|
||||||
|
varDataSetLen(tmp, strlen(varDataVal(tmp)));
|
||||||
|
|
||||||
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
colDataSetVal(pColInfo, numOfRows, (const char *)tmp, false);
|
||||||
|
|
||||||
// checkpoint backup type
|
// checkpoint backup type
|
||||||
char backup[20 + VARSTR_HEADER_SIZE] = {0};
|
char backup[20 + VARSTR_HEADER_SIZE] = {0};
|
||||||
STR_TO_VARSTR(backup, "none")
|
STR_TO_VARSTR(backup, "none")
|
||||||
|
|
|
@ -333,8 +333,9 @@ int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData* pNew, c
|
||||||
}
|
}
|
||||||
|
|
||||||
while (j < newLen && k < oldLen) {
|
while (j < newLen && k < oldLen) {
|
||||||
SRow* pNewRow = taosArrayGetP(pNew->aRowP, j);
|
SRow* pNewRow = *(SRow**)TARRAY_GET_ELEM(pNew->aRowP, j);
|
||||||
SRow* pOldRow = taosArrayGetP(pExisted->aRowP, k);
|
SRow* pOldRow = *(SRow**)TARRAY_GET_ELEM(pExisted->aRowP, k);
|
||||||
|
|
||||||
if (pNewRow->ts < pOldRow->ts) {
|
if (pNewRow->ts < pOldRow->ts) {
|
||||||
taosArrayPush(pFinal, &pNewRow);
|
taosArrayPush(pFinal, &pNewRow);
|
||||||
j += 1;
|
j += 1;
|
||||||
|
@ -373,12 +374,12 @@ int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData* pNew, c
|
||||||
}
|
}
|
||||||
|
|
||||||
while (j < newLen) {
|
while (j < newLen) {
|
||||||
SRow* pRow = taosArrayGetP(pNew->aRowP, j++);
|
SRow* pRow = *(SRow**)TARRAY_GET_ELEM(pNew->aRowP, j++);
|
||||||
taosArrayPush(pFinal, &pRow);
|
taosArrayPush(pFinal, &pRow);
|
||||||
}
|
}
|
||||||
|
|
||||||
while (k < oldLen) {
|
while (k < oldLen) {
|
||||||
SRow* pRow = taosArrayGetP(pExisted->aRowP, k++);
|
SRow* pRow = *(SRow**)TARRAY_GET_ELEM(pExisted->aRowP, k++);
|
||||||
taosArrayPush(pFinal, &pRow);
|
taosArrayPush(pFinal, &pRow);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -35,6 +35,7 @@ static int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId,
|
||||||
static void setCheckDownstreamReqInfo(SStreamTaskCheckReq* pReq, int64_t reqId, int32_t dstTaskId, int32_t dstNodeId);
|
static void setCheckDownstreamReqInfo(SStreamTaskCheckReq* pReq, int64_t reqId, int32_t dstTaskId, int32_t dstNodeId);
|
||||||
static void getCheckRspStatus(STaskCheckInfo* pInfo, int64_t el, int32_t* numOfReady, int32_t* numOfFault,
|
static void getCheckRspStatus(STaskCheckInfo* pInfo, int64_t el, int32_t* numOfReady, int32_t* numOfFault,
|
||||||
int32_t* numOfNotRsp, SArray* pTimeoutList, SArray* pNotReadyList, const char* id);
|
int32_t* numOfNotRsp, SArray* pTimeoutList, SArray* pNotReadyList, const char* id);
|
||||||
|
static int32_t addDownstreamFailedStatusResultAsync(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int32_t taskId);
|
||||||
static SDownstreamStatusInfo* findCheckRspStatus(STaskCheckInfo* pInfo, int32_t taskId);
|
static SDownstreamStatusInfo* findCheckRspStatus(STaskCheckInfo* pInfo, int32_t taskId);
|
||||||
|
|
||||||
// check status
|
// check status
|
||||||
|
@ -383,6 +384,8 @@ int32_t streamTaskAddReqInfo(STaskCheckInfo* pInfo, int64_t reqId, int32_t taskI
|
||||||
}
|
}
|
||||||
|
|
||||||
void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) {
|
void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) {
|
||||||
|
const char* id = pTask->id.idStr;
|
||||||
|
|
||||||
SStreamTaskCheckReq req = {
|
SStreamTaskCheckReq req = {
|
||||||
.streamId = pTask->id.streamId,
|
.streamId = pTask->id.streamId,
|
||||||
.upstreamTaskId = pTask->id.taskId,
|
.upstreamTaskId = pTask->id.taskId,
|
||||||
|
@ -397,7 +400,7 @@ void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) {
|
||||||
setCheckDownstreamReqInfo(&req, p->reqId, pDispatch->taskId, pDispatch->taskId);
|
setCheckDownstreamReqInfo(&req, p->reqId, pDispatch->taskId, pDispatch->taskId);
|
||||||
|
|
||||||
stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " re-send check downstream task:0x%x(vgId:%d) reqId:0x%" PRIx64,
|
stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " re-send check downstream task:0x%x(vgId:%d) reqId:0x%" PRIx64,
|
||||||
pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, req.reqId);
|
id, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, req.reqId);
|
||||||
|
|
||||||
streamSendCheckMsg(pTask, &req, pOutputInfo->fixedDispatcher.nodeId, &pOutputInfo->fixedDispatcher.epSet);
|
streamSendCheckMsg(pTask, &req, pOutputInfo->fixedDispatcher.nodeId, &pOutputInfo->fixedDispatcher.epSet);
|
||||||
} else if (pOutputInfo->type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
} else if (pOutputInfo->type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||||
|
@ -412,8 +415,7 @@ void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) {
|
||||||
|
|
||||||
stDebug("s-task:%s (vgId:%d) stage:%" PRId64
|
stDebug("s-task:%s (vgId:%d) stage:%" PRId64
|
||||||
" re-send check downstream task:0x%x(vgId:%d) (shuffle), idx:%d reqId:0x%" PRIx64,
|
" re-send check downstream task:0x%x(vgId:%d) (shuffle), idx:%d reqId:0x%" PRIx64,
|
||||||
pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i,
|
id, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i, p->reqId);
|
||||||
p->reqId);
|
|
||||||
streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
|
streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -524,7 +526,7 @@ void handleNotReadyDownstreamTask(SStreamTask* pTask, SArray* pNotReadyList) {
|
||||||
// the action of add status may incur the restart procedure, which should NEVER be executed in the timer thread.
|
// the action of add status may incur the restart procedure, which should NEVER be executed in the timer thread.
|
||||||
// The restart of all tasks requires that all tasks should not have active timer for now. Therefore, the execution
|
// The restart of all tasks requires that all tasks should not have active timer for now. Therefore, the execution
|
||||||
// of restart in timer thread will result in a dead lock.
|
// of restart in timer thread will result in a dead lock.
|
||||||
static int32_t addDownstreamFailedStatusResultAsync(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int32_t taskId) {
|
int32_t addDownstreamFailedStatusResultAsync(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int32_t taskId) {
|
||||||
SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
|
SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
|
||||||
if (pRunReq == NULL) {
|
if (pRunReq == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
@ -614,8 +616,8 @@ void rspMonitorFn(void* param, void* tmrId) {
|
||||||
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
||||||
stDebug(
|
stDebug(
|
||||||
"s-task:%s status:%s vgId:%d all rsp. quit from monitor rsp tmr, since vnode-transfer/leader-change/restart "
|
"s-task:%s status:%s vgId:%d all rsp. quit from monitor rsp tmr, since vnode-transfer/leader-change/restart "
|
||||||
"detected, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d ref:%d",
|
"detected, total:%d, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d ref:%d",
|
||||||
id, pStat->name, vgId, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref);
|
id, pStat->name, vgId, total, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref);
|
||||||
|
|
||||||
streamTaskCompleteCheckRsp(pInfo, false, id);
|
streamTaskCompleteCheckRsp(pInfo, false, id);
|
||||||
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
||||||
|
@ -630,9 +632,9 @@ void rspMonitorFn(void* param, void* tmrId) {
|
||||||
if (pInfo->stopCheckProcess == 1) {
|
if (pInfo->stopCheckProcess == 1) {
|
||||||
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
||||||
stDebug(
|
stDebug(
|
||||||
"s-task:%s status:%s vgId:%d stopped by other threads to check downstream process, notRsp:%d, notReady:%d, "
|
"s-task:%s status:%s vgId:%d stopped by other threads to check downstream process, total:%d, notRsp:%d, "
|
||||||
"fault:%d, timeout:%d, ready:%d ref:%d",
|
"notReady:%d, fault:%d, timeout:%d, ready:%d ref:%d",
|
||||||
id, pStat->name, vgId, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref);
|
id, pStat->name, vgId, total, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref);
|
||||||
|
|
||||||
streamTaskCompleteCheckRsp(pInfo, false, id);
|
streamTaskCompleteCheckRsp(pInfo, false, id);
|
||||||
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
||||||
|
|
|
@ -222,7 +222,7 @@ class TDTestCase:
|
||||||
|
|
||||||
tdSql.query("select * from information_schema.ins_columns where db_name ='information_schema'")
|
tdSql.query("select * from information_schema.ins_columns where db_name ='information_schema'")
|
||||||
tdLog.info(len(tdSql.queryResult))
|
tdLog.info(len(tdSql.queryResult))
|
||||||
tdSql.checkEqual(True, len(tdSql.queryResult) in range(253, 254))
|
tdSql.checkEqual(True, len(tdSql.queryResult) in range(254, 255))
|
||||||
|
|
||||||
tdSql.query("select * from information_schema.ins_columns where db_name ='performance_schema'")
|
tdSql.query("select * from information_schema.ins_columns where db_name ='performance_schema'")
|
||||||
tdSql.checkEqual(54, len(tdSql.queryResult))
|
tdSql.checkEqual(54, len(tdSql.queryResult))
|
||||||
|
|
Loading…
Reference in New Issue