Merge pull request #26912 from taosdata/fix/create_tb
fix(stream): compare vg replica according to different db.
This commit is contained in:
commit
8be9337659
|
@ -88,17 +88,45 @@ void destroyStreamTaskIter(SStreamTaskIter* pIter) {
|
||||||
taosMemoryFree(pIter);
|
taosMemoryFree(pIter);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady, SArray** pList) {
|
static bool checkStatusForEachReplica(SVgObj *pVgroup) {
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
for (int32_t i = 0; i < pVgroup->replica; ++i) {
|
||||||
void *pIter = NULL;
|
if (!pVgroup->vnodeGid[i].syncRestore) {
|
||||||
SVgObj *pVgroup = NULL;
|
mInfo("vgId:%d not restored, not ready for checkpoint or other operations", pVgroup->vgId);
|
||||||
int32_t replica = -1; // do the replica check
|
return false;
|
||||||
int32_t code = 0;
|
}
|
||||||
|
|
||||||
*allReady = true;
|
ESyncState state = pVgroup->vnodeGid[i].syncState;
|
||||||
SArray *pVgroupList = taosArrayInit(4, sizeof(SNodeEntry));
|
if (state == TAOS_SYNC_STATE_OFFLINE || state == TAOS_SYNC_STATE_ERROR || state == TAOS_SYNC_STATE_LEARNER ||
|
||||||
|
state == TAOS_SYNC_STATE_CANDIDATE) {
|
||||||
|
mInfo("vgId:%d state:%d , not ready for checkpoint or other operations, not check other vgroups", pVgroup->vgId,
|
||||||
|
state);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady, SArray **pList) {
|
||||||
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
|
void *pIter = NULL;
|
||||||
|
SVgObj *pVgroup = NULL;
|
||||||
|
int32_t code = 0;
|
||||||
|
SArray *pVgroupList = NULL;
|
||||||
|
SHashObj *pHash = NULL;
|
||||||
|
|
||||||
|
pVgroupList = taosArrayInit(4, sizeof(SNodeEntry));
|
||||||
if (pVgroupList == NULL) {
|
if (pVgroupList == NULL) {
|
||||||
return terrno;
|
mError("failed to prepare arraylist during take vgroup snapshot, code:%s", tstrerror(terrno));
|
||||||
|
code = terrno;
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
pHash = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
||||||
|
if (pHash == NULL) {
|
||||||
|
mError("failed to prepare hashmap during take vgroup snapshot, code:%s", tstrerror(terrno));
|
||||||
|
code = terrno;
|
||||||
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
|
@ -110,44 +138,39 @@ int32_t mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady, SArray** pList) {
|
||||||
SNodeEntry entry = {.nodeId = pVgroup->vgId, .hbTimestamp = pVgroup->updateTime};
|
SNodeEntry entry = {.nodeId = pVgroup->vgId, .hbTimestamp = pVgroup->updateTime};
|
||||||
entry.epset = mndGetVgroupEpset(pMnode, pVgroup);
|
entry.epset = mndGetVgroupEpset(pMnode, pVgroup);
|
||||||
|
|
||||||
if (replica == -1) {
|
int8_t *pReplica = taosHashGet(pHash, &pVgroup->dbUid, sizeof(pVgroup->dbUid));
|
||||||
replica = pVgroup->replica;
|
if (pReplica == NULL) { // not exist, add it into hash map
|
||||||
} else {
|
code = taosHashPut(pHash, &pVgroup->dbUid, sizeof(pVgroup->dbUid), &pVgroup->replica, sizeof(pVgroup->replica));
|
||||||
if (replica != pVgroup->replica) {
|
if (code) {
|
||||||
mInfo("vgId:%d replica:%d inconsistent with other vgroups replica:%d, not ready for stream operations",
|
mError("failed to put info into hashmap during task vgroup snapshot, code:%s", tstrerror(code));
|
||||||
pVgroup->vgId, pVgroup->replica, replica);
|
|
||||||
*allReady = false;
|
|
||||||
sdbRelease(pSdb, pVgroup);
|
sdbRelease(pSdb, pVgroup);
|
||||||
break;
|
sdbCancelFetch(pSdb, pIter);
|
||||||
|
goto _err; // take snapshot failed, and not all ready
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (*pReplica != pVgroup->replica) {
|
||||||
|
mInfo("vgId:%d replica:%d inconsistent with other vgroups replica:%d, not ready for stream operations",
|
||||||
|
pVgroup->vgId, pVgroup->replica, *pReplica);
|
||||||
|
*allReady = false; // task snap success, but not all ready
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// if not all ready till now, no need to check the remaining vgroups.
|
// if not all ready till now, no need to check the remaining vgroups.
|
||||||
|
// but still we need to put the info of the existed vgroups into the snapshot list
|
||||||
if (*allReady) {
|
if (*allReady) {
|
||||||
for (int32_t i = 0; i < pVgroup->replica; ++i) {
|
*allReady = checkStatusForEachReplica(pVgroup);
|
||||||
if (!pVgroup->vnodeGid[i].syncRestore) {
|
|
||||||
mInfo("vgId:%d not restored, not ready for checkpoint or other operations", pVgroup->vgId);
|
|
||||||
*allReady = false;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
ESyncState state = pVgroup->vnodeGid[i].syncState;
|
|
||||||
if (state == TAOS_SYNC_STATE_OFFLINE || state == TAOS_SYNC_STATE_ERROR || state == TAOS_SYNC_STATE_LEARNER ||
|
|
||||||
state == TAOS_SYNC_STATE_CANDIDATE) {
|
|
||||||
mInfo("vgId:%d state:%d , not ready for checkpoint or other operations, not check other vgroups",
|
|
||||||
pVgroup->vgId, state);
|
|
||||||
*allReady = false;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
char buf[256] = {0};
|
char buf[256] = {0};
|
||||||
(void) epsetToStr(&entry.epset, buf, tListLen(buf));
|
(void)epsetToStr(&entry.epset, buf, tListLen(buf));
|
||||||
|
|
||||||
void* p = taosArrayPush(pVgroupList, &entry);
|
void *p = taosArrayPush(pVgroupList, &entry);
|
||||||
if (p == NULL) {
|
if (p == NULL) {
|
||||||
mError("failed to put entry in vgroup list, nodeId:%d code:out of memory", entry.nodeId);
|
mError("failed to put entry in vgroup list, nodeId:%d code:out of memory", entry.nodeId);
|
||||||
|
code = terrno;
|
||||||
|
sdbRelease(pSdb, pVgroup);
|
||||||
|
sdbCancelFetch(pSdb, pIter);
|
||||||
|
goto _err;
|
||||||
} else {
|
} else {
|
||||||
mDebug("take node snapshot, nodeId:%d %s", entry.nodeId, buf);
|
mDebug("take node snapshot, nodeId:%d %s", entry.nodeId, buf);
|
||||||
}
|
}
|
||||||
|
@ -166,15 +189,21 @@ int32_t mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady, SArray** pList) {
|
||||||
code = addEpIntoEpSet(&entry.epset, pObj->pDnode->fqdn, pObj->pDnode->port);
|
code = addEpIntoEpSet(&entry.epset, pObj->pDnode->fqdn, pObj->pDnode->port);
|
||||||
if (code) {
|
if (code) {
|
||||||
sdbRelease(pSdb, pObj);
|
sdbRelease(pSdb, pObj);
|
||||||
continue;
|
sdbCancelFetch(pSdb, pIter);
|
||||||
|
mError("failed to extract epset for fqdn:%s during task vgroup snapshot", pObj->pDnode->fqdn);
|
||||||
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
char buf[256] = {0};
|
char buf[256] = {0};
|
||||||
(void) epsetToStr(&entry.epset, buf, tListLen(buf));
|
(void)epsetToStr(&entry.epset, buf, tListLen(buf));
|
||||||
|
|
||||||
void* p = taosArrayPush(pVgroupList, &entry);
|
void *p = taosArrayPush(pVgroupList, &entry);
|
||||||
if (p == NULL) {
|
if (p == NULL) {
|
||||||
mError("failed to put entry in vgroup list, nodeId:%d code:out of memory", entry.nodeId);
|
code = terrno;
|
||||||
|
sdbRelease(pSdb, pObj);
|
||||||
|
sdbCancelFetch(pSdb, pIter);
|
||||||
|
mError("failed to put entry in vgroup list, nodeId:%d code:%s", entry.nodeId, tstrerror(code));
|
||||||
|
goto _err;
|
||||||
} else {
|
} else {
|
||||||
mDebug("take snode snapshot, nodeId:%d %s", entry.nodeId, buf);
|
mDebug("take snode snapshot, nodeId:%d %s", entry.nodeId, buf);
|
||||||
}
|
}
|
||||||
|
@ -183,6 +212,14 @@ int32_t mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady, SArray** pList) {
|
||||||
}
|
}
|
||||||
|
|
||||||
*pList = pVgroupList;
|
*pList = pVgroupList;
|
||||||
|
taosHashCleanup(pHash);
|
||||||
|
return code;
|
||||||
|
|
||||||
|
_err:
|
||||||
|
*allReady = false;
|
||||||
|
taosArrayDestroy(pVgroupList);
|
||||||
|
taosHashCleanup(pHash);
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -599,12 +636,9 @@ static int32_t doBuildStreamTaskUpdateMsg(void **pBuf, int32_t *pLen, SVgroupCha
|
||||||
static int32_t doSetUpdateTaskAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask, SVgroupChangeInfo *pInfo) {
|
static int32_t doSetUpdateTaskAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask, SVgroupChangeInfo *pInfo) {
|
||||||
void *pBuf = NULL;
|
void *pBuf = NULL;
|
||||||
int32_t len = 0;
|
int32_t len = 0;
|
||||||
int32_t code = streamTaskUpdateEpsetInfo(pTask, pInfo->pUpdateNodeList);
|
(void)streamTaskUpdateEpsetInfo(pTask, pInfo->pUpdateNodeList);
|
||||||
if (code) {
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
code = doBuildStreamTaskUpdateMsg(&pBuf, &len, pInfo, pTask->info.nodeId, &pTask->id, pTrans->id);
|
int32_t code = doBuildStreamTaskUpdateMsg(&pBuf, &len, pInfo, pTask->info.nodeId, &pTask->id, pTrans->id);
|
||||||
if (code) {
|
if (code) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
|
@ -253,6 +253,7 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(pActiveInfo->pReadyMsgList); ++i) {
|
for (int32_t i = 0; i < taosArrayGetSize(pActiveInfo->pReadyMsgList); ++i) {
|
||||||
STaskCheckpointReadyInfo* p = taosArrayGet(pActiveInfo->pReadyMsgList, i);
|
STaskCheckpointReadyInfo* p = taosArrayGet(pActiveInfo->pReadyMsgList, i);
|
||||||
if (p == NULL) {
|
if (p == NULL) {
|
||||||
|
streamMutexUnlock(&pTask->lock);
|
||||||
return TSDB_CODE_INVALID_PARA;
|
return TSDB_CODE_INVALID_PARA;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -390,6 +391,7 @@ int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask, int64_t checkpointId
|
||||||
for (int32_t i = 0; i < size; ++i) {
|
for (int32_t i = 0; i < size; ++i) {
|
||||||
STaskDownstreamReadyInfo* p = taosArrayGet(pInfo->pCheckpointReadyRecvList, i);
|
STaskDownstreamReadyInfo* p = taosArrayGet(pInfo->pCheckpointReadyRecvList, i);
|
||||||
if (p == NULL) {
|
if (p == NULL) {
|
||||||
|
streamMutexUnlock(&pInfo->lock);
|
||||||
return TSDB_CODE_INVALID_PARA;
|
return TSDB_CODE_INVALID_PARA;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -433,6 +435,7 @@ int32_t streamTaskProcessCheckpointReadyRsp(SStreamTask* pTask, int32_t upstream
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pReadyMsgList); ++i) {
|
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pReadyMsgList); ++i) {
|
||||||
STaskCheckpointReadyInfo* pReadyInfo = taosArrayGet(pInfo->pReadyMsgList, i);
|
STaskCheckpointReadyInfo* pReadyInfo = taosArrayGet(pInfo->pReadyMsgList, i);
|
||||||
if (pReadyInfo == NULL) {
|
if (pReadyInfo == NULL) {
|
||||||
|
streamMutexUnlock(&pInfo->lock);
|
||||||
return TSDB_CODE_INVALID_PARA;
|
return TSDB_CODE_INVALID_PARA;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -447,6 +450,7 @@ int32_t streamTaskProcessCheckpointReadyRsp(SStreamTask* pTask, int32_t upstream
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pReadyMsgList); ++i) {
|
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pReadyMsgList); ++i) {
|
||||||
STaskCheckpointReadyInfo* pReadyInfo = taosArrayGet(pInfo->pReadyMsgList, i);
|
STaskCheckpointReadyInfo* pReadyInfo = taosArrayGet(pInfo->pReadyMsgList, i);
|
||||||
if (pReadyInfo == NULL) {
|
if (pReadyInfo == NULL) {
|
||||||
|
streamMutexUnlock(&pInfo->lock);
|
||||||
return TSDB_CODE_INVALID_PARA;
|
return TSDB_CODE_INVALID_PARA;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -830,6 +834,7 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) {
|
||||||
if (pNotSendList == NULL) {
|
if (pNotSendList == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
stDebug("s-task:%s start to triggerMonitor, reason:%s", id, tstrerror(terrno));
|
stDebug("s-task:%s start to triggerMonitor, reason:%s", id, tstrerror(terrno));
|
||||||
|
streamMutexUnlock(&pActiveInfo->lock);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -938,13 +943,14 @@ bool streamTaskAlreadySendTrigger(SStreamTask* pTask, int32_t downstreamNodeId)
|
||||||
|
|
||||||
streamMutexLock(&pInfo->lock);
|
streamMutexLock(&pInfo->lock);
|
||||||
if (!pInfo->dispatchTrigger) {
|
if (!pInfo->dispatchTrigger) {
|
||||||
streamMutexUnlock(&pTask->lock);
|
streamMutexUnlock(&pInfo->lock);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pDispatchTriggerList); ++i) {
|
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pDispatchTriggerList); ++i) {
|
||||||
STaskTriggerSendInfo* pSendInfo = taosArrayGet(pInfo->pDispatchTriggerList, i);
|
STaskTriggerSendInfo* pSendInfo = taosArrayGet(pInfo->pDispatchTriggerList, i);
|
||||||
if (pSendInfo == NULL) {
|
if (pSendInfo == NULL) {
|
||||||
|
streamMutexUnlock(&pInfo->lock);
|
||||||
return TSDB_CODE_INVALID_PARA;
|
return TSDB_CODE_INVALID_PARA;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -964,11 +970,11 @@ bool streamTaskAlreadySendTrigger(SStreamTask* pTask, int32_t downstreamNodeId)
|
||||||
id, pSendInfo->sendTs, before, pInfo->activeId, pInfo->transId);
|
id, pSendInfo->sendTs, before, pInfo->activeId, pInfo->transId);
|
||||||
}
|
}
|
||||||
|
|
||||||
streamMutexUnlock(&pTask->lock);
|
streamMutexUnlock(&pInfo->lock);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
ASSERT(0);
|
streamMutexUnlock(&pInfo->lock);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1028,6 +1034,7 @@ int32_t streamTaskGetNumOfConfirmed(SStreamTask* pTask) {
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pDispatchTriggerList); ++i) {
|
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pDispatchTriggerList); ++i) {
|
||||||
STaskTriggerSendInfo* p = taosArrayGet(pInfo->pDispatchTriggerList, i);
|
STaskTriggerSendInfo* p = taosArrayGet(pInfo->pDispatchTriggerList, i);
|
||||||
if (p == NULL) {
|
if (p == NULL) {
|
||||||
|
streamMutexUnlock(&pInfo->lock);
|
||||||
return num;
|
return num;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -907,8 +907,8 @@ int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask) {
|
||||||
STaskCheckpointReadyInfo* pInfo = taosArrayGet(pList, i);
|
STaskCheckpointReadyInfo* pInfo = taosArrayGet(pList, i);
|
||||||
|
|
||||||
SRpcMsg msg = {0};
|
SRpcMsg msg = {0};
|
||||||
int32_t code = initCheckpointReadyMsg(pTask, pInfo->upstreamNodeId, pInfo->upstreamTaskId, pInfo->childId, pInfo->checkpointId,
|
int32_t code = initCheckpointReadyMsg(pTask, pInfo->upstreamNodeId, pInfo->upstreamTaskId, pInfo->childId,
|
||||||
&msg);
|
pInfo->checkpointId, &msg);
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
code = tmsgSendReq(&pInfo->upstreamNodeEpset, &msg);
|
code = tmsgSendReq(&pInfo->upstreamNodeEpset, &msg);
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
|
|
|
@ -1199,14 +1199,17 @@ void streamMetaWUnLock(SStreamMeta* pMeta) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta, SArray** pList) {
|
int32_t streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta, SArray** pList) {
|
||||||
*pList = NULL;
|
QRY_OPTR_CHECK(pList);
|
||||||
|
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SArray* pTaskList = taosArrayDup(pMeta->pTaskList, NULL);
|
SArray* pTaskList = taosArrayDup(pMeta->pTaskList, NULL);
|
||||||
if (pTaskList == NULL) {
|
if (pTaskList == NULL) {
|
||||||
stError("failed to generate the task list during send hbMsg to mnode, vgId:%d, code: out of memory", pMeta->vgId);
|
stError("failed to generate the task list during send hbMsg to mnode, vgId:%d, code: out of memory", pMeta->vgId);
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
*pList = pTaskList;
|
||||||
|
|
||||||
bool sendMsg = pMeta->sendMsgBeforeClosing;
|
bool sendMsg = pMeta->sendMsgBeforeClosing;
|
||||||
if (!sendMsg) {
|
if (!sendMsg) {
|
||||||
stDebug("vgId:%d no need to send msg to mnode before closing tasks", pMeta->vgId);
|
stDebug("vgId:%d no need to send msg to mnode before closing tasks", pMeta->vgId);
|
||||||
|
@ -1239,9 +1242,9 @@ int32_t streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta, SArray** pList) {
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
}
|
}
|
||||||
|
|
||||||
code = streamMetaSendHbHelper(pMeta);
|
(void)streamMetaSendHbHelper(pMeta);
|
||||||
pMeta->sendMsgBeforeClosing = false;
|
pMeta->sendMsgBeforeClosing = false;
|
||||||
return code;
|
return TSDB_CODE_SUCCESS; // always return true
|
||||||
}
|
}
|
||||||
|
|
||||||
void streamMetaUpdateStageRole(SStreamMeta* pMeta, int64_t stage, bool isLeader) {
|
void streamMetaUpdateStageRole(SStreamMeta* pMeta, int64_t stage, bool isLeader) {
|
||||||
|
|
|
@ -602,9 +602,9 @@ int32_t streamTaskStop(SStreamTask* pTask) {
|
||||||
stError("failed to handle STOP event, s-task:%s", id);
|
stError("failed to handle STOP event, s-task:%s", id);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
|
if (pTask->info.taskLevel != TASK_LEVEL__SINK && pTask->exec.pExecutor != NULL) {
|
||||||
code = qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS);
|
code = qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS);
|
||||||
if (code) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
stError("s-task:%s failed to kill task related query handle", id);
|
stError("s-task:%s failed to kill task related query handle", id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -41,8 +41,9 @@ class TDTestCase:
|
||||||
time.sleep(10)
|
time.sleep(10)
|
||||||
tdSql.execute("use test", queryTimes=100)
|
tdSql.execute("use test", queryTimes=100)
|
||||||
tdSql.query("create stream if not exists s1 trigger at_once ignore expired 0 ignore update 0 fill_history 1 into st1 as select _wstart,sum(voltage),groupid from meters partition by groupid interval(2s)")
|
tdSql.query("create stream if not exists s1 trigger at_once ignore expired 0 ignore update 0 fill_history 1 into st1 as select _wstart,sum(voltage),groupid from meters partition by groupid interval(2s)")
|
||||||
tdLog.debug("========create stream and insert data ok========")
|
time.sleep(5)
|
||||||
|
|
||||||
|
tdLog.debug("========create stream and insert data ok========")
|
||||||
tdSql.query("select _wstart,sum(voltage),groupid from meters partition by groupid interval(2s) order by groupid,_wstart")
|
tdSql.query("select _wstart,sum(voltage),groupid from meters partition by groupid interval(2s) order by groupid,_wstart")
|
||||||
rowCnt = tdSql.getRows()
|
rowCnt = tdSql.getRows()
|
||||||
results_meters = tdSql.queryResult
|
results_meters = tdSql.queryResult
|
||||||
|
|
Loading…
Reference in New Issue