diff --git a/examples/c/stream_demo.c b/examples/c/stream_demo.c index 943fcbdb53..ab59fa5e47 100644 --- a/examples/c/stream_demo.c +++ b/examples/c/stream_demo.c @@ -82,7 +82,8 @@ int32_t create_stream() { /*const char* sql = "select sum(k) from tu1 interval(10m)";*/ /*pRes = tmq_create_stream(pConn, "stream1", "out1", sql);*/ pRes = taos_query( - pConn, "create stream stream1 trigger at_once into outstb as select _wstartts, sum(k) from st1 interval(10m)"); + pConn, + "create stream stream1 trigger max_delay 10s into outstb as select _wstartts, sum(k) from st1 interval(10m)"); if (taos_errno(pRes) != 0) { printf("failed to create stream stream1, reason:%s\n", taos_errstr(pRes)); return -1; diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 288248422b..3f05c27453 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -38,8 +38,10 @@ typedef struct SReadHandle { SMsgCb* pMsgCb; } SReadHandle; -#define STREAM_DATA_TYPE_SUBMIT_BLOCK 0x1 -#define STREAM_DATA_TYPE_SSDATA_BLOCK 0x2 +enum { + STREAM_DATA_TYPE_SUBMIT_BLOCK = 1, + STREAM_DATA_TYPE_SSDATA_BLOCK = 2, +}; typedef enum { OPTR_EXEC_MODEL_BATCH = 0x1, @@ -102,7 +104,8 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, * @param tversion * @return */ -int32_t qGetQueriedTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* tableName, int32_t* sversion, int32_t* tversion); +int32_t qGetQueriedTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* tableName, int32_t* sversion, + int32_t* tversion); /** * The main task execution function, including query on both table and multiple tables, diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 960794792b..2c9d66a828 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -58,6 +58,7 @@ enum { enum { STREAM_INPUT__DATA_SUBMIT = 1, STREAM_INPUT__DATA_BLOCK, + STREAM_INPUT__TRIGGER, STREAM_INPUT__CHECKPOINT, }; @@ -85,6 +86,11 @@ typedef struct { int8_t type; } SStreamCheckpoint; +typedef struct { + int8_t type; + SSDataBlock* pBlock; +} SStreamTrigger; + enum { STREAM_QUEUE__SUCESS = 1, STREAM_QUEUE__FAILED, @@ -98,6 +104,9 @@ typedef struct { int8_t status; } SStreamQueue; +int32_t streamInit(); +void streamCleanUp(); + SStreamQueue* streamQueueOpen(); void streamQueueClose(SStreamQueue* queue); @@ -220,6 +229,11 @@ enum { TASK_INPUT_TYPE__DATA_BLOCK, }; +enum { + TASK_TRIGGER_STATUS__IN_ACTIVE = 1, + TASK_TRIGGER_STATUS__ACTIVE, +}; + struct SStreamTask { int64_t streamId; int32_t taskId; @@ -262,8 +276,16 @@ struct SStreamTask { SStreamQueue* inputQueue; SStreamQueue* outputQueue; + // trigger + int8_t triggerStatus; + int64_t triggerParam; + void* timer; + // application storage // void* ahandle; + + // msg handle + SMsgCb* pMsgCb; }; SStreamTask* tNewSStreamTask(int64_t streamId); @@ -292,6 +314,13 @@ static FORCE_INLINE int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem taosWriteQitem(pTask->inputQueue->queue, pItem); } else if (pItem->type == STREAM_INPUT__CHECKPOINT) { taosWriteQitem(pTask->inputQueue->queue, pItem); + } else if (pItem->type == STREAM_INPUT__TRIGGER) { + taosWriteQitem(pTask->inputQueue->queue, pItem); + } + + if (pItem->type != STREAM_INPUT__TRIGGER && pItem->type != STREAM_INPUT__CHECKPOINT && pTask->triggerParam != 0 && + pTask->triggerStatus == TASK_TRIGGER_STATUS__IN_ACTIVE) { + atomic_store_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__ACTIVE); } // TODO: back pressure @@ -370,7 +399,8 @@ typedef struct { int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq); -int32_t streamTriggerByWrite(SStreamTask* pTask, int32_t vgId, SMsgCb* pMsgCb); +int32_t streamLaunchByWrite(SStreamTask* pTask, int32_t vgId, SMsgCb* pMsgCb); +int32_t streamSetupTrigger(SStreamTask* pTask); int32_t streamTaskRun(SStreamTask* pTask); diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index a369b81d26..2d21b3531a 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -24,7 +24,7 @@ extern "C" { #include "tdef.h" #include "tmsgcb.h" -#define SYNC_INDEX_BEGIN 0 +#define SYNC_INDEX_BEGIN 0 #define SYNC_INDEX_INVALID -1 typedef uint64_t SyncNodeId; @@ -44,14 +44,6 @@ typedef enum { TAOS_SYNC_STATE_ERROR = 103, } ESyncState; -typedef enum { - TAOS_SYNC_PROPOSE_SUCCESS = 0, - TAOS_SYNC_PROPOSE_NOT_LEADER = 1, - TAOS_SYNC_ONLY_ONE_REPLICA = 2, - TAOS_SYNC_NOT_IN_NEW_CONFIG = 3, - TAOS_SYNC_OTHER_ERROR = 100, -} ESyncProposeCode; - typedef enum { TAOS_SYNC_FSM_CB_SUCCESS = 0, TAOS_SYNC_FSM_CB_OTHER_ERROR = 1, diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 30cbbbeb11..6fc84e023d 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -411,6 +411,8 @@ int32_t* taosGetErrno(); #define TSDB_CODE_SYN_INVALID_MSGTYPE TAOS_DEF_ERROR_CODE(0, 0x090A) #define TSDB_CODE_SYN_NOT_LEADER TAOS_DEF_ERROR_CODE(0, 0x0910) +#define TSDB_CODE_SYN_ONE_REPLICA TAOS_DEF_ERROR_CODE(0, 0x0911) +#define TSDB_CODE_SYN_NOT_IN_NEW_CONFIG TAOS_DEF_ERROR_CODE(0, 0x0912) #define TSDB_CODE_SYN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x09FF) // tq diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 10c6c3623a..dd900e6045 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -992,6 +992,90 @@ CREATE_MSG_FAIL: return -1; } +bool tmqUpdateEp2(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) { + bool set = false; + + int32_t topicNumGet = taosArrayGetSize(pRsp->topics); + char vgKey[TSDB_TOPIC_FNAME_LEN + 22]; + tscDebug("consumer %ld update ep epoch %d to epoch %d, topic num: %d", tmq->consumerId, tmq->epoch, epoch, + topicNumGet); + + SArray* newTopics = taosArrayInit(topicNumGet, sizeof(SMqClientTopic)); + if (newTopics == NULL) { + return false; + } + + SHashObj* pHash = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK); + if (pHash == NULL) { + taosArrayDestroy(newTopics); + return false; + } + int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics); + for (int32_t i = 0; i < topicNumCur; i++) { + // find old topic + SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i); + if (pTopicCur->vgs) { + int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs); + tscDebug("consumer %ld new vg num: %d", tmq->consumerId, vgNumCur); + if (vgNumCur == 0) break; + for (int32_t j = 0; j < vgNumCur; j++) { + SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j); + sprintf(vgKey, "%s:%d", pTopicCur->topicName, pVgCur->vgId); + tscDebug("consumer %ld epoch %d vg %d build %s", tmq->consumerId, epoch, pVgCur->vgId, vgKey); + taosHashPut(pHash, vgKey, strlen(vgKey), &pVgCur->currentOffset, sizeof(int64_t)); + } + break; + } + } + + for (int32_t i = 0; i < topicNumGet; i++) { + SMqClientTopic topic = {0}; + SMqSubTopicEp* pTopicEp = taosArrayGet(pRsp->topics, i); + topic.schema = pTopicEp->schema; + taosHashClear(pHash); + topic.topicName = strdup(pTopicEp->topic); + tstrncpy(topic.db, pTopicEp->db, TSDB_DB_FNAME_LEN); + + tscDebug("consumer %ld update topic: %s", tmq->consumerId, topic.topicName); + + int32_t vgNumGet = taosArrayGetSize(pTopicEp->vgs); + topic.vgs = taosArrayInit(vgNumGet, sizeof(SMqClientVg)); + for (int32_t j = 0; j < vgNumGet; j++) { + SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j); + sprintf(vgKey, "%s:%d", topic.topicName, pVgEp->vgId); + int64_t* pOffset = taosHashGet(pHash, vgKey, strlen(vgKey)); + int64_t offset = tmq->resetOffsetCfg; + if (pOffset != NULL) { + offset = *pOffset; + } + + tscDebug("consumer %ld(epoch %d) offset of vg %d updated to %ld", tmq->consumerId, epoch, pVgEp->vgId, offset); + SMqClientVg clientVg = { + .pollCnt = 0, + .currentOffset = offset, + .vgId = pVgEp->vgId, + .epSet = pVgEp->epSet, + .vgStatus = TMQ_VG_STATUS__IDLE, + .vgSkipCnt = 0, + }; + taosArrayPush(topic.vgs, &clientVg); + set = true; + } + taosArrayPush(newTopics, &topic); + } + if (tmq->clientTopics) taosArrayDestroy(tmq->clientTopics); + taosHashCleanup(pHash); + tmq->clientTopics = newTopics; + + if (taosArrayGetSize(tmq->clientTopics) == 0) + atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__NO_TOPIC); + else + atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY); + + atomic_store_32(&tmq->epoch, epoch); + return set; +} + bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) { /*printf("call update ep %d\n", epoch);*/ bool set = false; diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 6dd5d1622c..2e543149c0 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -554,8 +554,8 @@ typedef struct { SVgObj fixedSinkVg; int64_t smaId; // 0 for unused int8_t trigger; - int32_t triggerParam; - int64_t waterMark; + int64_t triggerParam; + int64_t watermark; char* sql; char* physicalPlan; SArray* tasks; // SArray> diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index 15cd9fa043..b5d22cb7a5 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -31,7 +31,7 @@ void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream); SSdbRaw *mndStreamActionEncode(SStreamObj *pStream); SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw); -int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, int8_t triggerType, int64_t watermark, STrans *pTrans); +int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, STrans *pTrans); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index 1e2c61ff89..6eac91bb43 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -33,8 +33,8 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) { if (tEncodeI8(pEncoder, pObj->status) < 0) return -1; if (tEncodeI8(pEncoder, pObj->createdBy) < 0) return -1; if (tEncodeI8(pEncoder, pObj->trigger) < 0) return -1; - if (tEncodeI32(pEncoder, pObj->triggerParam) < 0) return -1; - if (tEncodeI64(pEncoder, pObj->waterMark) < 0) return -1; + if (tEncodeI64(pEncoder, pObj->triggerParam) < 0) return -1; + if (tEncodeI64(pEncoder, pObj->watermark) < 0) return -1; if (tEncodeI32(pEncoder, pObj->fixedSinkVgId) < 0) return -1; if (tEncodeI64(pEncoder, pObj->smaId) < 0) return -1; if (tEncodeCStr(pEncoder, pObj->sql) < 0) return -1; @@ -85,8 +85,8 @@ int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj) { if (tDecodeI8(pDecoder, &pObj->status) < 0) return -1; if (tDecodeI8(pDecoder, &pObj->createdBy) < 0) return -1; if (tDecodeI8(pDecoder, &pObj->trigger) < 0) return -1; - if (tDecodeI32(pDecoder, &pObj->triggerParam) < 0) return -1; - if (tDecodeI64(pDecoder, &pObj->waterMark) < 0) return -1; + if (tDecodeI64(pDecoder, &pObj->triggerParam) < 0) return -1; + if (tDecodeI64(pDecoder, &pObj->watermark) < 0) return -1; if (tDecodeI32(pDecoder, &pObj->fixedSinkVgId) < 0) return -1; if (tDecodeI64(pDecoder, &pObj->smaId) < 0) return -1; if (tDecodeCStrAlloc(pDecoder, &pObj->sql) < 0) return -1; diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index 59599ee134..00118eac1e 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -380,17 +380,19 @@ void mndStop(SMnode *pMnode) { int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { SMnode *pMnode = pMsg->info.node; SSyncMgmt *pMgmt = &pMnode->syncMgmt; - int32_t code = TAOS_SYNC_OTHER_ERROR; + int32_t code = 0; if (!syncEnvIsStart()) { mError("failed to process sync msg:%p type:%s since syncEnv stop", pMsg, TMSG_INFO(pMsg->msgType)); - return TAOS_SYNC_OTHER_ERROR; + terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + return -1; } SSyncNode *pSyncNode = syncNodeAcquire(pMgmt->sync); if (pSyncNode == NULL) { mError("failed to process sync msg:%p type:%s since syncNode is null", pMsg, TMSG_INFO(pMsg->msgType)); - return TAOS_SYNC_OTHER_ERROR; + terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + return -1; } char logBuf[512] = {0}; @@ -451,7 +453,7 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { tmsgSendRsp(&rsp); } else { mError("failed to process msg:%p since invalid type:%s", pMsg, TMSG_INFO(pMsg->msgType)); - code = TAOS_SYNC_OTHER_ERROR; + code = -1; } } else { if (pMsg->msgType == TDMT_SYNC_TIMEOUT) { @@ -492,10 +494,13 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { tmsgSendRsp(&rsp); } else { mError("failed to process msg:%p since invalid type:%s", pMsg, TMSG_INFO(pMsg->msgType)); - code = TAOS_SYNC_OTHER_ERROR; + code = -1; } } + if (code != 0) { + terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + } return code; } diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index a3b7754a4e..5b745f3cea 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -387,6 +387,9 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { // input pFinalTask->inputType = TASK_INPUT_TYPE__DATA_BLOCK; + // trigger + pFinalTask->triggerParam = pStream->triggerParam; + // dispatch if (mndAddDispatcherToInnerTask(pMnode, pTrans, pStream, pFinalTask) < 0) { qDestroyQueryPlan(pPlan); diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c index b1738464b6..fa6616ae96 100644 --- a/source/dnode/mnode/impl/src/mndSma.c +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -561,6 +561,8 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea streamObj.sql = pCreate->sql; streamObj.createdBy = STREAM_CREATED_BY__SMA; streamObj.smaId = smaObj.uid; + streamObj.watermark = 0; + streamObj.trigger = STREAM_TRIGGER_AT_ONCE; if (mndAllocSmaVgroup(pMnode, pDb, &streamObj.fixedSinkVg) != 0) { mError("sma:%s, failed to create since %s", smaObj.name, terrstr()); @@ -583,7 +585,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea if (mndSetUpdateSmaStbCommitLogs(pMnode, pTrans, pStb) != 0) goto _OVER; // if (mndSetCreateSmaRedoActions(pMnode, pTrans, pDb, &smaObj) != 0) goto _OVER; if (mndSetCreateSmaVgroupRedoActions(pMnode, pTrans, pDb, &streamObj.fixedSinkVg, &smaObj) != 0) goto _OVER; - if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, STREAM_TRIGGER_AT_ONCE, 0, pTrans) != 0) goto _OVER; + if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, pTrans) != 0) goto _OVER; if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; code = 0; diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 96d199fcb6..4a7da8f2d1 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -235,16 +235,15 @@ static int32_t mndStreamGetPlanString(const char *ast, int8_t triggerType, int64 } if (TSDB_CODE_SUCCESS == code) { - code = nodesNodeToString((SNode*)pPlan, false, pStr, NULL); + code = nodesNodeToString((SNode *)pPlan, false, pStr, NULL); } nodesDestroyNode(pAst); - nodesDestroyNode((SNode*)pPlan); + nodesDestroyNode((SNode *)pPlan); terrno = code; return code; } -int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, int8_t triggerType, int64_t watermark, - STrans *pTrans) { +int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, STrans *pTrans) { SNode *pAst = NULL; if (nodesStringToNode(ast, &pAst) < 0) { @@ -258,7 +257,6 @@ int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast // free nodesDestroyNode(pAst); - #if 0 printf("|"); for (int i = 0; i < pStream->outputSchema.nCols; i++) { @@ -268,7 +266,7 @@ int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast #endif - if (TSDB_CODE_SUCCESS != mndStreamGetPlanString(ast, triggerType, watermark, &pStream->physicalPlan)) { + if (TSDB_CODE_SUCCESS != mndStreamGetPlanString(ast, pStream->trigger, pStream->watermark, &pStream->physicalPlan)) { mError("topic:%s, failed to get plan since %s", pStream->name, terrstr()); return -1; } @@ -391,7 +389,8 @@ static int32_t mndCreateStream(SMnode *pMnode, SRpcMsg *pReq, SCMCreateStreamReq streamObj.smaId = 0; /*streamObj.physicalPlan = "";*/ streamObj.trigger = pCreate->triggerType; - streamObj.waterMark = pCreate->watermark; + streamObj.watermark = pCreate->watermark; + streamObj.triggerParam = pCreate->maxDelay; if (streamObj.targetSTbName[0]) { pDb = mndAcquireDbByStb(pMnode, streamObj.targetSTbName); @@ -409,7 +408,7 @@ static int32_t mndCreateStream(SMnode *pMnode, SRpcMsg *pReq, SCMCreateStreamReq } mDebug("trans:%d, used to create stream:%s", pTrans->id, pCreate->name); - if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, pCreate->triggerType, pCreate->watermark, pTrans) != 0) { + if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, pTrans) != 0) { mError("trans:%d, failed to add stream since %s", pTrans->id, terrstr()); mndTransDrop(pTrans); return -1; @@ -566,7 +565,7 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB colDataAppend(pColInfo, numOfRows, (const char *)&pStream->targetSTbName, true); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataAppend(pColInfo, numOfRows, (const char *)&pStream->waterMark, false); + colDataAppend(pColInfo, numOfRows, (const char *)&pStream->watermark, false); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataAppend(pColInfo, numOfRows, (const char *)&pStream->trigger, false); diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 9940037356..63708aef8a 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -234,9 +234,9 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) { int32_t code = syncPropose(pMgmt->sync, &rsp, isWeak); if (code == 0) { tsem_wait(&pMgmt->syncSem); - } else if (code == TAOS_SYNC_PROPOSE_NOT_LEADER) { + } else if (code == -1 && terrno == TSDB_CODE_SYN_NOT_LEADER) { terrno = TSDB_CODE_APP_NOT_READY; - } else if (code == TAOS_SYNC_OTHER_ERROR) { + } else if (code == -1 && terrno == TSDB_CODE_SYN_INTERNAL_ERROR) { terrno = TSDB_CODE_SYN_INTERNAL_ERROR; } else { terrno = TSDB_CODE_APP_ERROR; @@ -257,13 +257,13 @@ void mndSyncStart(SMnode *pMnode) { syncStart(pMgmt->sync); mDebug("mnode sync started, id:%" PRId64 " standby:%d", pMgmt->sync, pMgmt->standby); -/* - if (pMgmt->standby) { - syncStartStandBy(pMgmt->sync); - } else { - syncStart(pMgmt->sync); - } -*/ + /* + if (pMgmt->standby) { + syncStartStandBy(pMgmt->sync); + } else { + syncStart(pMgmt->sync); + } + */ } void mndSyncStop(SMnode *pMnode) {} diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 9f34ae39c0..7d2dfeeba5 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -375,6 +375,8 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) { if (pTask->inputQueue == NULL || pTask->outputQueue == NULL) goto FAIL; + pTask->pMsgCb = &pTq->pVnode->msgCb; + // exec if (pTask->execType != TASK_EXEC__NONE) { // expand runners @@ -406,9 +408,12 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) { tdGetSTSChemaFromSSChema(&pTask->tbSink.pSchemaWrapper->pSchema, pTask->tbSink.pSchemaWrapper->nCols); ASSERT(pTask->tbSink.pTSchema); } + + streamSetupTrigger(pTask); + tqInfo("deploy stream task id %d child id %d on vg %d", pTask->taskId, pTask->childId, pTq->pVnode->config.vgId); - taosHashPut(pTq->pStreamTasks, &pTask->taskId, sizeof(int32_t), pTask, sizeof(SStreamTask)); + taosHashPut(pTq->pStreamTasks, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void*)); return 0; FAIL: @@ -431,7 +436,7 @@ int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq) { while (1) { pIter = taosHashIterate(pTq->pStreamTasks, pIter); if (pIter == NULL) break; - SStreamTask* pTask = (SStreamTask*)pIter; + SStreamTask* pTask = *(SStreamTask**)pIter; if (pTask->inputType != STREAM_INPUT__DATA_SUBMIT) continue; if (!failed) { @@ -439,7 +444,7 @@ int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq) { continue; } - if (streamTriggerByWrite(pTask, pTq->pVnode->config.vgId, &pTq->pVnode->msgCb) < 0) { + if (streamLaunchByWrite(pTask, pTq->pVnode->config.vgId, &pTq->pVnode->msgCb) < 0) { continue; } } else { @@ -459,7 +464,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { // SStreamTaskRunReq* pReq = pMsg->pCont; int32_t taskId = pReq->taskId; - SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); + SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); streamTaskProcessRunReq(pTask, &pTq->pVnode->msgCb); return 0; } @@ -473,7 +478,7 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg) { tDecoderInit(&decoder, msgBody, msgLen); tDecodeStreamDispatchReq(&decoder, &req); int32_t taskId = req.taskId; - SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); + SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); SRpcMsg rsp = { .info = pMsg->info, .code = 0, @@ -485,7 +490,7 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg) { SStreamTaskRecoverReq* pReq = pMsg->pCont; int32_t taskId = pReq->taskId; - SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); + SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); streamProcessRecoverReq(pTask, &pTq->pVnode->msgCb, pReq, pMsg); return 0; } @@ -493,7 +498,7 @@ int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) { SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); int32_t taskId = pRsp->taskId; - SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); + SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); streamProcessDispatchRsp(pTask, &pTq->pVnode->msgCb, pRsp); return 0; } @@ -501,7 +506,7 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg) { SStreamTaskRecoverRsp* pRsp = pMsg->pCont; int32_t taskId = pRsp->taskId; - SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); + SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); streamProcessRecoverRsp(pTask, pRsp); return 0; } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index cd64bc8a9c..34ff9ffa0f 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -296,7 +296,7 @@ void vnodeUpdateMetaRsp(SVnode *pVnode, STableMetaRsp *pMetaRsp) { } int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { - int32_t ret = TAOS_SYNC_OTHER_ERROR; + int32_t ret = 0; if (syncEnvIsStart()) { SSyncNode *pSyncNode = syncNodeAcquire(pVnode->sync); @@ -381,15 +381,18 @@ int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { tmsgSendRsp(&rsp); } else { vError("==vnodeProcessSyncReq== error msg type:%d", pRpcMsg->msgType); - ret = TAOS_SYNC_OTHER_ERROR; + ret = -1; } syncNodeRelease(pSyncNode); } else { vError("==vnodeProcessSyncReq== error syncEnv stop"); - ret = TAOS_SYNC_OTHER_ERROR; + ret = -1; } + if (ret != 0) { + terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + } return ret; } diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 17c2c186be..975dff9fb6 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -98,7 +98,8 @@ void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { if (code == 0) { vnodeAccumBlockMsg(pVnode, pMsg->msgType); - } else if (code == TAOS_SYNC_PROPOSE_NOT_LEADER) { + + } else if (code == -1 && terrno == TSDB_CODE_SYN_NOT_LEADER) { SEpSet newEpSet = {0}; syncGetEpSet(pVnode->sync, &newEpSet); SEp *pEp = &newEpSet.eps[newEpSet.inUse]; @@ -247,29 +248,17 @@ static void vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg); } -int32_t vnodeSnapshotStartRead(struct SSyncFSM *pFsm, void **ppReader) { - return 0; -} +int32_t vnodeSnapshotStartRead(struct SSyncFSM *pFsm, void **ppReader) { return 0; } -int32_t vnodeSnapshotStopRead(struct SSyncFSM *pFsm, void *pReader) { - return 0; -} +int32_t vnodeSnapshotStopRead(struct SSyncFSM *pFsm, void *pReader) { return 0; } -int32_t vnodeSnapshotDoRead(struct SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) { - return 0; -} +int32_t vnodeSnapshotDoRead(struct SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) { return 0; } -int32_t vnodeSnapshotStartWrite(struct SSyncFSM *pFsm, void **ppWriter) { - return 0; -} +int32_t vnodeSnapshotStartWrite(struct SSyncFSM *pFsm, void **ppWriter) { return 0; } -int32_t vnodeSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool isApply) { - return 0; -} +int32_t vnodeSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool isApply) { return 0; } -int32_t vnodeSnapshotDoWrite(struct SSyncFSM *pFsm, void *pWriter, void *pBuf, int32_t len) { - return 0; -} +int32_t vnodeSnapshotDoWrite(struct SSyncFSM *pFsm, void *pWriter, void *pBuf, int32_t len) { return 0; } static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) { SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM)); diff --git a/source/libs/catalog/src/ctgCache.c b/source/libs/catalog/src/ctgCache.c index 62890b8326..277516686b 100644 --- a/source/libs/catalog/src/ctgCache.c +++ b/source/libs/catalog/src/ctgCache.c @@ -1899,7 +1899,7 @@ _return: void ctgUpdateThreadUnexpectedStopped(void) { - if (CTG_IS_LOCKED(&gCtgMgmt.lock) > 0) CTG_UNLOCK(CTG_READ, &gCtgMgmt.lock); + if (!atomic_load_8((int8_t*)&gCtgMgmt.exit) && CTG_IS_LOCKED(&gCtgMgmt.lock) > 0) CTG_UNLOCK(CTG_READ, &gCtgMgmt.lock); } void ctgCleanupCacheQueue(void) { diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index fd62849e56..bebfc75f17 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -19,7 +19,8 @@ #include "tdatablock.h" #include "vnode.h" -static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, bool assignUid, char* id) { +static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, bool assignUid, + char* id) { ASSERT(pOperator != NULL); if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { if (pOperator->numOfDownstream == 0) { @@ -43,6 +44,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu if (pInfo->blockType == 0) { pInfo->blockType = type; } else if (pInfo->blockType != type) { + ASSERT(0); return TSDB_CODE_QRY_APP_ERROR; } @@ -51,7 +53,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu qError("submit msg messed up when initing stream block, %s" PRIx64, id); return TSDB_CODE_QRY_APP_ERROR; } - } else { + } else if (type == STREAM_DATA_TYPE_SSDATA_BLOCK) { for (int32_t i = 0; i < numOfBlocks; ++i) { SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i]; @@ -62,6 +64,8 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu taosArrayAddAll(p->pDataBlock, pDataBlock->pDataBlock); taosArrayPush(pInfo->pBlockLists, &p); } + } else { + ASSERT(0); } return TSDB_CODE_SUCCESS; @@ -83,7 +87,8 @@ int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numO SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; - int32_t code = doSetStreamBlock(pTaskInfo->pRoot, (void**)pBlocks, numOfBlocks, type, assignUid, GET_TASKID(pTaskInfo)); + int32_t code = + doSetStreamBlock(pTaskInfo->pRoot, (void**)pBlocks, numOfBlocks, type, assignUid, GET_TASKID(pTaskInfo)); if (code != TSDB_CODE_SUCCESS) { qError("%s failed to set the stream block data", GET_TASKID(pTaskInfo)); } else { @@ -162,7 +167,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo pInfo = pInfo->pDownstream[0]; } - int32_t code = 0; + int32_t code = 0; SStreamBlockScanInfo* pScanInfo = pInfo->info; if (isAdd) { // add new table id SArray* qa = filterQualifiedChildTables(pScanInfo, tableIdList); @@ -178,9 +183,10 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo return code; } -int32_t qGetQueriedTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* tableName, int32_t* sversion, int32_t* tversion) { +int32_t qGetQueriedTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* tableName, int32_t* sversion, + int32_t* tversion) { ASSERT(tinfo != NULL && dbName != NULL && tableName != NULL); - SExecTaskInfo* pTaskInfo = (SExecTaskInfo*) tinfo; + SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; *sversion = pTaskInfo->schemaVer.sversion; *tversion = pTaskInfo->schemaVer.tversion; @@ -196,4 +202,4 @@ int32_t qGetQueriedTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* tab } return 0; -} \ No newline at end of file +} diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 833f6db0c3..62a0c663b6 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -1682,7 +1682,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "top", .type = FUNCTION_TYPE_TOP, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_FORBID_STREAM_FUNC, .translateFunc = translateTopBot, .getEnvFunc = getTopBotFuncEnv, .initFunc = topBotFunctionSetup, @@ -1717,7 +1717,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "bottom", .type = FUNCTION_TYPE_BOTTOM, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_FORBID_STREAM_FUNC, .translateFunc = translateTopBot, .getEnvFunc = getTopBotFuncEnv, .initFunc = topBotFunctionSetup, diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 94208043e7..de17c0f32c 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -2385,6 +2385,7 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) { SInputColumnInfoData* pInput = &pCtx->input; SColumnInfoData* pInputCol = pInput->pData[0]; + int32_t type = pInputCol->info.type; int32_t bytes = pInputCol->info.bytes; pInfo->bytes = bytes; @@ -2421,6 +2422,10 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) { TSKEY cts = getRowPTs(pInput->pPTS, i); if (pResInfo->numOfRes == 0 || *(TSKEY*)(pInfo->buf + bytes) > cts) { + if (IS_VAR_DATA_TYPE(type)) { + bytes = varDataTLen(data); + pInfo->bytes = bytes; + } memcpy(pInfo->buf, data, bytes); *(TSKEY*)(pInfo->buf + bytes) = cts; pInfo->hasResult = true; @@ -2451,6 +2456,10 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) { TSKEY cts = getRowPTs(pInput->pPTS, i); if (pResInfo->numOfRes == 0 || *(TSKEY*)(pInfo->buf + bytes) > cts) { + if (IS_VAR_DATA_TYPE(type)) { + bytes = varDataTLen(data); + pInfo->bytes = bytes; + } memcpy(pInfo->buf, data, bytes); *(TSKEY*)(pInfo->buf + bytes) = cts; pInfo->hasResult = true; @@ -2474,6 +2483,7 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { SInputColumnInfoData* pInput = &pCtx->input; SColumnInfoData* pInputCol = pInput->pData[0]; + int32_t type = pInputCol->info.type; int32_t bytes = pInputCol->info.bytes; pInfo->bytes = bytes; @@ -2501,6 +2511,10 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { char* data = colDataGetData(pInputCol, i); TSKEY cts = getRowPTs(pInput->pPTS, i); if (pResInfo->numOfRes == 0 || *(TSKEY*)(pInfo->buf + bytes) < cts) { + if (IS_VAR_DATA_TYPE(type)) { + bytes = varDataTLen(data); + pInfo->bytes = bytes; + } memcpy(pInfo->buf, data, bytes); *(TSKEY*)(pInfo->buf + bytes) = cts; // DO_UPDATE_TAG_COLUMNS(pCtx, ts); @@ -2520,6 +2534,10 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { char* data = colDataGetData(pInputCol, i); TSKEY cts = getRowPTs(pInput->pPTS, i); if (pResInfo->numOfRes == 0 || *(TSKEY*)(pInfo->buf + bytes) < cts) { + if (IS_VAR_DATA_TYPE(type)) { + bytes = varDataTLen(data); + pInfo->bytes = bytes; + } memcpy(pInfo->buf, data, bytes); *(TSKEY*)(pInfo->buf + bytes) = cts; pInfo->hasResult = true; diff --git a/source/libs/stream/inc/streamInc.h b/source/libs/stream/inc/streamInc.h index 27746fff78..8aaf4953de 100644 --- a/source/libs/stream/inc/streamInc.h +++ b/source/libs/stream/inc/streamInc.h @@ -23,6 +23,13 @@ extern "C" { #endif +typedef struct { + int8_t inited; + void* timer; +} SStreamGlobalEnv; + +static SStreamGlobalEnv streamEnv; + int32_t streamExec(SStreamTask* pTask, SMsgCb* pMsgCb); int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb); int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock* pData); diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index d3828d9ee4..6dcbfad957 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -14,8 +14,74 @@ */ #include "streamInc.h" +#include "ttimer.h" -int32_t streamTriggerByWrite(SStreamTask* pTask, int32_t vgId, SMsgCb* pMsgCb) { +int32_t streamInit() { + int8_t old; + while (1) { + old = atomic_val_compare_exchange_8(&streamEnv.inited, 0, 2); + if (old != 2) break; + } + + if (old == 0) { + streamEnv.timer = taosTmrInit(10000, 100, 10000, "STREAM"); + if (streamEnv.timer == NULL) { + atomic_store_8(&streamEnv.inited, 0); + return -1; + } + atomic_store_8(&streamEnv.inited, 1); + } + return 0; +} + +void streamCleanUp() { + int8_t old; + while (1) { + old = atomic_val_compare_exchange_8(&streamEnv.inited, 1, 2); + if (old != 2) break; + } + + if (old == 1) { + taosTmrCleanUp(streamEnv.timer); + atomic_store_8(&streamEnv.inited, 0); + } +} + +void streamTriggerByTimer(void* param, void* tmrId) { + SStreamTask* pTask = (void*)param; + + if (atomic_load_8(&pTask->triggerStatus) == TASK_TRIGGER_STATUS__ACTIVE) { + SStreamTrigger* trigger = taosAllocateQitem(sizeof(SStreamTrigger), DEF_QITEM); + if (trigger == NULL) return; + trigger->type = STREAM_INPUT__TRIGGER; + trigger->pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); + if (trigger->pBlock == NULL) { + taosFreeQitem(trigger); + return; + } + trigger->pBlock->info.type = STREAM_GET_ALL; + + atomic_store_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__IN_ACTIVE); + + streamTaskInput(pTask, (SStreamQueueItem*)trigger); + streamLaunchByWrite(pTask, pTask->nodeId, pTask->pMsgCb); + } + + taosTmrReset(streamTriggerByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer, &pTask->timer); +} + +int32_t streamSetupTrigger(SStreamTask* pTask) { + if (pTask->triggerParam != 0) { + if (streamInit() < 0) { + return -1; + } + pTask->timer = taosTmrStart(streamTriggerByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer); + pTask->triggerStatus = TASK_TRIGGER_STATUS__IN_ACTIVE; + } + return 0; +} + +int32_t streamLaunchByWrite(SStreamTask* pTask, int32_t vgId, SMsgCb* pMsgCb) { int8_t execStatus = atomic_load_8(&pTask->status); if (execStatus == TASK_STATUS__IDLE || execStatus == TASK_STATUS__CLOSING) { SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq)); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index fe1a857743..04428136ae 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -20,15 +20,17 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes) void* exec = pTask->exec.executor; // set input - if (pTask->inputType == STREAM_INPUT__DATA_SUBMIT) { + SStreamQueueItem* pItem = (SStreamQueueItem*)data; + if (pItem->type == STREAM_INPUT__TRIGGER) { + SStreamTrigger* pTrigger = (SStreamTrigger*)data; + qSetMultiStreamInput(exec, pTrigger->pBlock, 1, STREAM_DATA_TYPE_SSDATA_BLOCK, false); + } else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) { SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)data; - ASSERT(pSubmit->type == STREAM_INPUT__DATA_SUBMIT); - + ASSERT(pTask->inputType == STREAM_INPUT__DATA_SUBMIT); qSetStreamInput(exec, pSubmit->data, STREAM_DATA_TYPE_SUBMIT_BLOCK, false); - } else if (pTask->inputType == STREAM_INPUT__DATA_BLOCK) { + } else if (pItem->type == STREAM_INPUT__DATA_BLOCK) { SStreamDataBlock* pBlock = (SStreamDataBlock*)data; - ASSERT(pBlock->type == STREAM_INPUT__DATA_BLOCK); - + ASSERT(pTask->inputType == STREAM_INPUT__DATA_BLOCK); SArray* blocks = pBlock->blocks; qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_DATA_TYPE_SSDATA_BLOCK, false); } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 5d8546bcb9..7a7d9d15ad 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -72,6 +72,7 @@ int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { if (tSerializeSUseDbRspImp(pEncoder, &pTask->shuffleDispatcher.dbInfo) < 0) return -1; /*if (tEncodeI8(pEncoder, pTask->shuffleDispatcher.hashMethod) < 0) return -1;*/ } + if (tEncodeI64(pEncoder, pTask->triggerParam) < 0) return -1; /*tEndEncode(pEncoder);*/ return pEncoder->pos; @@ -121,6 +122,7 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { /*if (tDecodeI8(pDecoder, &pTask->shuffleDispatcher.hashMethod) < 0) return -1;*/ if (tDeserializeSUseDbRspImp(pDecoder, &pTask->shuffleDispatcher.dbInfo) < 0) return -1; } + if (tDecodeI64(pDecoder, &pTask->triggerParam) < 0) return -1; /*tEndDecode(pDecoder);*/ return 0; diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 9db9550681..953294e489 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -149,12 +149,14 @@ void syncStop(int64_t rid) { int32_t syncSetStandby(int64_t rid) { SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); if (pSyncNode == NULL) { - return TAOS_SYNC_OTHER_ERROR; + terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + return -1; } if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { taosReleaseRef(tsNodeRefId, pSyncNode->rid); - return TAOS_SYNC_OTHER_ERROR; + terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + return -1; } // state change @@ -177,7 +179,8 @@ int32_t syncSetStandby(int64_t rid) { int32_t syncReconfigBuild(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg) { SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); if (pSyncNode == NULL) { - return TAOS_SYNC_OTHER_ERROR; + terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + return -1; } ASSERT(rid == pSyncNode->rid); @@ -201,7 +204,8 @@ int32_t syncReconfigBuild(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg if (!IamInNew) { taosReleaseRef(tsNodeRefId, pSyncNode->rid); - return TAOS_SYNC_NOT_IN_NEW_CONFIG; + terrno = TSDB_CODE_SYN_NOT_IN_NEW_CONFIG; + return -1; } char* newconfig = syncCfg2Str((SSyncCfg*)pNewCfg); @@ -219,7 +223,8 @@ int32_t syncReconfigBuild(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg) { SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); if (pSyncNode == NULL) { - return TAOS_SYNC_OTHER_ERROR; + terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + return -1; } ASSERT(rid == pSyncNode->rid); @@ -246,7 +251,8 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg) { if (!IamInNew) { sError("sync reconfig error, not in new config"); taosReleaseRef(tsNodeRefId, pSyncNode->rid); - return TAOS_SYNC_NOT_IN_NEW_CONFIG; + terrno = TSDB_CODE_SYN_NOT_IN_NEW_CONFIG; + return -1; } char* newconfig = syncCfg2Str((SSyncCfg*)pNewCfg); @@ -272,13 +278,15 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg) { int32_t syncLeaderTransfer(int64_t rid) { SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); if (pSyncNode == NULL) { - return TAOS_SYNC_OTHER_ERROR; + terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + return -1; } ASSERT(rid == pSyncNode->rid); if (pSyncNode->peersNum == 0) { taosReleaseRef(tsNodeRefId, pSyncNode->rid); - return TAOS_SYNC_OTHER_ERROR; + terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + return -1; } SNodeInfo newLeader = (pSyncNode->peersNodeInfo)[0]; @@ -291,7 +299,8 @@ int32_t syncLeaderTransfer(int64_t rid) { int32_t syncLeaderTransferTo(int64_t rid, SNodeInfo newLeader) { SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); if (pSyncNode == NULL) { - return TAOS_SYNC_OTHER_ERROR; + terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + return -1; } ASSERT(rid == pSyncNode->rid); int32_t ret = 0; @@ -299,7 +308,8 @@ int32_t syncLeaderTransferTo(int64_t rid, SNodeInfo newLeader) { if (pSyncNode->replicaNum == 1) { sError("only one replica, cannot drop leader"); taosReleaseRef(tsNodeRefId, pSyncNode->rid); - return TAOS_SYNC_ONLY_ONE_REPLICA; + terrno = TSDB_CODE_SYN_ONE_REPLICA; + return -1; } SyncLeaderTransfer* pMsg = syncLeaderTransferBuild(pSyncNode->vgId); @@ -538,11 +548,12 @@ void setHeartbeatTimerMS(int64_t rid, int32_t hbTimerMS) { } int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) { - int32_t ret = TAOS_SYNC_PROPOSE_SUCCESS; + int32_t ret = 0; SSyncNode* pSyncNode = taosAcquireRef(tsNodeRefId, rid); if (pSyncNode == NULL) { - return TAOS_SYNC_OTHER_ERROR; + terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + return -1; } assert(rid == pSyncNode->rid); sDebug("vgId:%d sync event propose msgType:%s", pSyncNode->vgId, TMSG_INFO(pMsg->msgType)); @@ -553,7 +564,7 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) { } int32_t syncNodePropose(SSyncNode* pSyncNode, const SRpcMsg* pMsg, bool isWeak) { - int32_t ret = TAOS_SYNC_PROPOSE_SUCCESS; + int32_t ret = 0; sDebug("vgId:%d sync event propose msgType:%s", pSyncNode->vgId, TMSG_INFO(pMsg->msgType)); if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { @@ -567,14 +578,17 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, const SRpcMsg* pMsg, bool isWeak) syncClientRequest2RpcMsg(pSyncMsg, &rpcMsg); if (pSyncNode->FpEqMsg != NULL && (*pSyncNode->FpEqMsg)(pSyncNode->msgcb, &rpcMsg) == 0) { - ret = TAOS_SYNC_PROPOSE_SUCCESS; + ret = 0; } else { + ret = -1; + terrno = TSDB_CODE_SYN_INTERNAL_ERROR; sError("syncPropose pSyncNode->FpEqMsg is NULL"); } syncClientRequestDestroy(pSyncMsg); } else { + ret = -1; + terrno = TSDB_CODE_SYN_NOT_LEADER; sError("syncPropose not leader, %s", syncUtilState2String(pSyncNode->state)); - ret = TAOS_SYNC_PROPOSE_NOT_LEADER; } return ret; @@ -945,9 +959,13 @@ int32_t syncNodePingAll(SSyncNode* pSyncNode) { // timer control -------------- int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) { int32_t ret = 0; - taosTmrReset(pSyncNode->FpPingTimerCB, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager, - &pSyncNode->pPingTimer); - atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser); + if (syncEnvIsStart()) { + taosTmrReset(pSyncNode->FpPingTimerCB, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager, + &pSyncNode->pPingTimer); + atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser); + } else { + sError("sync env is stop, syncNodeStartPingTimer"); + } return ret; } @@ -961,10 +979,14 @@ int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) { int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) { int32_t ret = 0; - pSyncNode->electTimerMS = ms; - taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, pSyncNode, gSyncEnv->pTimerManager, - &pSyncNode->pElectTimer); - atomic_store_64(&pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser); + if (syncEnvIsStart()) { + pSyncNode->electTimerMS = ms; + taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, pSyncNode, gSyncEnv->pTimerManager, + &pSyncNode->pElectTimer); + atomic_store_64(&pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser); + } else { + sError("sync env is stop, syncNodeStartElectTimer"); + } return ret; } @@ -998,9 +1020,13 @@ int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode) { int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) { int32_t ret = 0; - taosTmrReset(pSyncNode->FpHeartbeatTimerCB, pSyncNode->heartbeatTimerMS, pSyncNode, gSyncEnv->pTimerManager, - &pSyncNode->pHeartbeatTimer); - atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser); + if (syncEnvIsStart()) { + taosTmrReset(pSyncNode->FpHeartbeatTimerCB, pSyncNode->heartbeatTimerMS, pSyncNode, gSyncEnv->pTimerManager, + &pSyncNode->pHeartbeatTimer); + atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser); + } else { + sError("sync env is stop, syncNodeStartHeartbeatTimer"); + } return ret; } @@ -1720,14 +1746,25 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) { syncTimeout2RpcMsg(pSyncMsg, &rpcMsg); syncRpcMsgLog2((char*)"==syncNodeEqPingTimer==", &rpcMsg); if (pSyncNode->FpEqMsg != NULL) { - pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg); + int32_t code = pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg); + if (code != 0) { + sError("vgId:%d sync enqueue ping msg error, code:%d", pSyncNode->vgId, code); + rpcFreeCont(rpcMsg.pCont); + syncTimeoutDestroy(pSyncMsg); + return; + } } else { sTrace("syncNodeEqPingTimer pSyncNode->FpEqMsg is NULL"); } syncTimeoutDestroy(pSyncMsg); - taosTmrReset(syncNodeEqPingTimer, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager, - &pSyncNode->pPingTimer); + if (syncEnvIsStart()) { + taosTmrReset(syncNodeEqPingTimer, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager, + &pSyncNode->pPingTimer); + } else { + sError("sync env is stop, syncNodeEqPingTimer"); + } + } else { sTrace("==syncNodeEqPingTimer== pingTimerLogicClock:%" PRIu64 ", pingTimerLogicClockUser:%" PRIu64 "", pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser); @@ -1743,16 +1780,26 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) { syncTimeout2RpcMsg(pSyncMsg, &rpcMsg); syncRpcMsgLog2((char*)"==syncNodeEqElectTimer==", &rpcMsg); if (pSyncNode->FpEqMsg != NULL) { - pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg); + int32_t code = pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg); + if (code != 0) { + sError("vgId:%d sync enqueue elect msg error, code:%d", pSyncNode->vgId, code); + rpcFreeCont(rpcMsg.pCont); + syncTimeoutDestroy(pSyncMsg); + return; + } } else { - sTrace("syncNodeEqElectTimer pSyncNode->FpEqMsg is NULL"); + sTrace("syncNodeEqElectTimer FpEqMsg is NULL"); } syncTimeoutDestroy(pSyncMsg); // reset timer ms - pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine); - taosTmrReset(syncNodeEqElectTimer, pSyncNode->electTimerMS, pSyncNode, gSyncEnv->pTimerManager, - &pSyncNode->pElectTimer); + if (syncEnvIsStart()) { + pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine); + taosTmrReset(syncNodeEqElectTimer, pSyncNode->electTimerMS, pSyncNode, gSyncEnv->pTimerManager, + &pSyncNode->pElectTimer); + } else { + sError("sync env is stop, syncNodeEqElectTimer"); + } } else { sTrace("==syncNodeEqElectTimer== electTimerLogicClock:%" PRIu64 ", electTimerLogicClockUser:%" PRIu64 "", pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser); @@ -1774,19 +1821,19 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) { if (code != 0) { sError("vgId:%d sync enqueue timer msg error, code:%d", pSyncNode->vgId, code); rpcFreeCont(rpcMsg.pCont); + syncTimeoutDestroy(pSyncMsg); return; } - } else { - sTrace("syncNodeEqHeartbeatTimer pSyncNode->FpEqMsg is NULL"); + sError("syncNodeEqHeartbeatTimer FpEqMsg is NULL"); } syncTimeoutDestroy(pSyncMsg); - if (gSyncEnv != NULL) { - taosTmrReset(syncNodeEqHeartbeatTimer, pSyncNode->heartbeatTimerMS, pSyncNode, gSyncEnv->pTimerManager, - &pSyncNode->pHeartbeatTimer); + if (syncEnvIsStart()) { + taosTmrReset(syncNodeEqHeartbeatTimer, pSyncNode->heartbeatTimerMS, pSyncNode, gSyncEnv->pTimerManager, + &pSyncNode->pHeartbeatTimer); } else { - sError("sync env is already stop"); + sError("sync env is stop, syncNodeEqHeartbeatTimer"); } } else { sTrace("==syncNodeEqHeartbeatTimer== heartbeatTimerLogicClock:%" PRIu64 ", heartbeatTimerLogicClockUser:%" PRIu64 diff --git a/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp b/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp index 10b54d0aa4..3ee2b458d6 100644 --- a/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp +++ b/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp @@ -338,7 +338,7 @@ int main(int argc, char** argv) { if (alreadySend < writeRecordNum) { SRpcMsg* pRpcMsg = createRpcMsg(alreadySend, writeRecordNum, myIndex); int32_t ret = syncPropose(rid, pRpcMsg, false); - if (ret == TAOS_SYNC_PROPOSE_NOT_LEADER) { + if (ret == -1 && terrno == TSDB_CODE_SYN_NOT_LEADER) { sTrace("%s value%d write not leader", s, alreadySend); } else { assert(ret == 0); diff --git a/source/libs/sync/test/syncConfigChangeTest.cpp b/source/libs/sync/test/syncConfigChangeTest.cpp index 1e64a8a6f7..b1dc53d804 100644 --- a/source/libs/sync/test/syncConfigChangeTest.cpp +++ b/source/libs/sync/test/syncConfigChangeTest.cpp @@ -251,7 +251,7 @@ int main(int argc, char** argv) { if (alreadySend < writeRecordNum) { SRpcMsg* pRpcMsg = createRpcMsg(alreadySend, writeRecordNum, myIndex); int32_t ret = syncPropose(rid, pRpcMsg, false); - if (ret == TAOS_SYNC_PROPOSE_NOT_LEADER) { + if (ret == -1 && terrno == TSDB_CODE_SYN_NOT_LEADER) { sTrace("%s value%d write not leader", s, alreadySend); } else { assert(ret == 0); diff --git a/source/libs/sync/test/syncReplicateTest.cpp b/source/libs/sync/test/syncReplicateTest.cpp index bf9e34fffb..d955e64f81 100644 --- a/source/libs/sync/test/syncReplicateTest.cpp +++ b/source/libs/sync/test/syncReplicateTest.cpp @@ -188,7 +188,7 @@ int main(int argc, char** argv) { if (alreadySend < writeRecordNum) { SRpcMsg* pRpcMsg = createRpcMsg(alreadySend, writeRecordNum, myIndex); int32_t ret = syncPropose(rid, pRpcMsg, false); - if (ret == TAOS_SYNC_PROPOSE_NOT_LEADER) { + if (ret == -1 && terrno == TSDB_CODE_SYN_NOT_LEADER) { sTrace("%s value%d write not leader", s, alreadySend); } else { assert(ret == 0); diff --git a/source/libs/sync/test/syncTestTool.cpp b/source/libs/sync/test/syncTestTool.cpp index ebcd7368cc..21049454f4 100644 --- a/source/libs/sync/test/syncTestTool.cpp +++ b/source/libs/sync/test/syncTestTool.cpp @@ -391,7 +391,7 @@ int main(int argc, char** argv) { if (alreadySend < writeRecordNum) { SRpcMsg* pRpcMsg = createRpcMsg(alreadySend, writeRecordNum, myIndex); int32_t ret = syncPropose(rid, pRpcMsg, false); - if (ret == TAOS_SYNC_PROPOSE_NOT_LEADER) { + if (ret == -1 && terrno == TSDB_CODE_SYN_NOT_LEADER) { sTrace("%s value%d write not leader, leaderTransferWait:%d", simpleStr, alreadySend, leaderTransferWait); } else { assert(ret == 0); diff --git a/source/util/src/tarray.c b/source/util/src/tarray.c index 2357f760d1..ec90e5a9b9 100644 --- a/source/util/src/tarray.c +++ b/source/util/src/tarray.c @@ -174,7 +174,11 @@ void taosArrayRemoveDuplicate(SArray* pArray, __compar_fn_t comparFn, void (*fp) } void* taosArrayAddAll(SArray* pArray, const SArray* pInput) { - return taosArrayAddBatch(pArray, pInput->pData, (int32_t)taosArrayGetSize(pInput)); + if (pInput) { + return taosArrayAddBatch(pArray, pInput->pData, (int32_t)taosArrayGetSize(pInput)); + } else { + return NULL; + } } void* taosArrayPop(SArray* pArray) { diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 8e6284d2cb..621f947b64 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -413,6 +413,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_MSGLEN, "Invalid msg length") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_MSGTYPE, "Invalid msg type") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NOT_LEADER, "Sync not leader") +TAOS_DEFINE_ERROR(TSDB_CODE_SYN_ONE_REPLICA, "Sync one replica") +TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NOT_IN_NEW_CONFIG, "Sync not in new config") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INTERNAL_ERROR, "Sync internal error") // wal diff --git a/tests/system-test/7-tmq/basic5.py b/tests/system-test/7-tmq/basic5.py index 3d9efea938..a10eaf1fb5 100644 --- a/tests/system-test/7-tmq/basic5.py +++ b/tests/system-test/7-tmq/basic5.py @@ -134,7 +134,7 @@ class TDTestCase: parameterDict['cfg'] = cfgPath prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict) prepareEnvThread.start() - time.sleep(2) + prepareEnvThread.join() # wait stb ready while 1: @@ -245,6 +245,7 @@ class TDTestCase: prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict) prepareEnvThread.start() + prepareEnvThread.join() # wait db ready while 1: @@ -371,6 +372,7 @@ class TDTestCase: prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict) prepareEnvThread.start() + prepareEnvThread.join() # wait db ready while 1: diff --git a/tests/system-test/test-all.bat b/tests/system-test/test-all.bat index 076be6563a..0929b1fc6e 100644 --- a/tests/system-test/test-all.bat +++ b/tests/system-test/test-all.bat @@ -61,7 +61,8 @@ goto :eof set tt=%1 set tt=%tt:.= % set tt=%tt::= % -set index=1 +set tt=%tt: 0= % +set /a index=1 for %%a in (%tt%) do ( if !index! EQU 1 ( set /a hh=%%a @@ -75,5 +76,5 @@ for %%a in (%tt%) do ( ) set /a index=index+1 ) -set /a _timeTemp=(%hh%*60+%mm%)*60+%ss% || echo hh:%hh% mm:%mm% ss:%ss% +set /a _timeTemp=(%hh%*60+%mm%)*60+%ss% goto :eof \ No newline at end of file diff --git a/tests/system-test/test.py b/tests/system-test/test.py index 47e0cefb52..5a68f548da 100644 --- a/tests/system-test/test.py +++ b/tests/system-test/test.py @@ -21,6 +21,7 @@ import base64 import json import platform import socket +import threading from distutils.log import warn as printf from fabric2 import Connection sys.path.append("../pytest") @@ -30,6 +31,13 @@ from util.cases import * import taos +def checkRunTimeError(): + import win32gui + while 1: + time.sleep(1) + hwnd = win32gui.FindWindow(None, "Microsoft Visual C++ Runtime Library") + if hwnd: + os.system("TASKKILL /F /IM taosd.exe") if __name__ == "__main__": @@ -42,9 +50,6 @@ if __name__ == "__main__": logSql = True stop = 0 restart = False - windows = 0 - if platform.system().lower() == 'windows': - windows = 1 updateCfgDict = {} execCmd = "" opts, args = getopt.gnu_getopt(sys.argv[1:], 'f:p:m:l:scghrd:k:e:', [ @@ -159,7 +164,9 @@ if __name__ == "__main__": host = masterIp tdLog.info("Procedures for tdengine deployed in %s" % (host)) - if windows: + if platform.system().lower() == 'windows': + if (masterIp == "" and not fileName[0:12] == "0-others\\udf"): + threading.Thread(target=checkRunTimeError,daemon=True).start() tdCases.logSql(logSql) tdLog.info("Procedures for testing self-deployment") tdDnodes.init(deployPath, masterIp)