refactor(stream): stream deploy and state transfer
This commit is contained in:
parent
15308ad140
commit
2fbc8e6306
|
@ -241,6 +241,7 @@ enum {
|
||||||
TD_DEF_MSG_TYPE(TDMT_STREAM_UNUSED1, "stream-unused1", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_STREAM_UNUSED1, "stream-unused1", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_STREAM_RETRIEVE, "stream-retrieve", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_STREAM_RETRIEVE, "stream-retrieve", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_STREAM_RECOVER_FINISH, "vnode-stream-finish", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_STREAM_RECOVER_FINISH, "vnode-stream-finish", NULL, NULL)
|
||||||
|
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_CHECK, "vnode-stream-task-check", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_STREAM_MAX_MSG, "stream-max", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_STREAM_MAX_MSG, "stream-max", NULL, NULL)
|
||||||
|
|
||||||
TD_NEW_MSG_SEG(TDMT_MON_MSG)
|
TD_NEW_MSG_SEG(TDMT_MON_MSG)
|
||||||
|
|
|
@ -46,7 +46,7 @@ enum {
|
||||||
TASK_STATUS__DROPPING,
|
TASK_STATUS__DROPPING,
|
||||||
TASK_STATUS__FAIL,
|
TASK_STATUS__FAIL,
|
||||||
TASK_STATUS__STOP,
|
TASK_STATUS__STOP,
|
||||||
TASK_STATUS__RECOVER_DOWNSTREAM,
|
TASK_STATUS__WAIT_DOWNSTREAM,
|
||||||
TASK_STATUS__RECOVER_PREPARE,
|
TASK_STATUS__RECOVER_PREPARE,
|
||||||
TASK_STATUS__RECOVER1,
|
TASK_STATUS__RECOVER1,
|
||||||
TASK_STATUS__RECOVER2,
|
TASK_STATUS__RECOVER2,
|
||||||
|
@ -332,7 +332,10 @@ typedef struct SStreamTask {
|
||||||
SStreamState* pState;
|
SStreamState* pState;
|
||||||
|
|
||||||
// do not serialize
|
// do not serialize
|
||||||
int32_t recoverWaitingChild;
|
int32_t recoverTryingDownstream;
|
||||||
|
int32_t recoverWaitingUpstream;
|
||||||
|
int64_t checkReqId;
|
||||||
|
SArray* checkReqIds; // shuffle
|
||||||
|
|
||||||
} SStreamTask;
|
} SStreamTask;
|
||||||
|
|
||||||
|
@ -418,7 +421,10 @@ typedef struct {
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int64_t streamId;
|
int64_t streamId;
|
||||||
int32_t taskId;
|
int32_t upstreamNodeId;
|
||||||
|
int32_t upstreamTaskId;
|
||||||
|
int32_t downstreamNodeId;
|
||||||
|
int32_t downstreamTaskId;
|
||||||
int8_t inputStatus;
|
int8_t inputStatus;
|
||||||
} SStreamDispatchRsp;
|
} SStreamDispatchRsp;
|
||||||
|
|
||||||
|
@ -440,6 +446,27 @@ typedef struct {
|
||||||
int32_t rspToTaskId;
|
int32_t rspToTaskId;
|
||||||
} SStreamRetrieveRsp;
|
} SStreamRetrieveRsp;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int64_t reqId;
|
||||||
|
int64_t streamId;
|
||||||
|
int32_t upstreamNodeId;
|
||||||
|
int32_t upstreamTaskId;
|
||||||
|
int32_t downstreamNodeId;
|
||||||
|
int32_t downstreamTaskId;
|
||||||
|
int32_t childId;
|
||||||
|
} SStreamTaskCheckReq;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int64_t reqId;
|
||||||
|
int64_t streamId;
|
||||||
|
int32_t upstreamNodeId;
|
||||||
|
int32_t upstreamTaskId;
|
||||||
|
int32_t downstreamNodeId;
|
||||||
|
int32_t downstreamTaskId;
|
||||||
|
int32_t childId;
|
||||||
|
int8_t status;
|
||||||
|
} SStreamTaskCheckRsp;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
SMsgHead msgHead;
|
SMsgHead msgHead;
|
||||||
int64_t streamId;
|
int64_t streamId;
|
||||||
|
@ -455,47 +482,6 @@ typedef struct {
|
||||||
int32_t tEncodeSStreamRecoverFinishReq(SEncoder* pEncoder, const SStreamRecoverFinishReq* pReq);
|
int32_t tEncodeSStreamRecoverFinishReq(SEncoder* pEncoder, const SStreamRecoverFinishReq* pReq);
|
||||||
int32_t tDecodeSStreamRecoverFinishReq(SDecoder* pDecoder, SStreamRecoverFinishReq* pReq);
|
int32_t tDecodeSStreamRecoverFinishReq(SDecoder* pDecoder, SStreamRecoverFinishReq* pReq);
|
||||||
|
|
||||||
#if 0
|
|
||||||
typedef struct {
|
|
||||||
int64_t streamId;
|
|
||||||
int32_t taskId;
|
|
||||||
int32_t upstreamTaskId;
|
|
||||||
int32_t upstreamNodeId;
|
|
||||||
} SStreamTaskRecoverReq;
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
int64_t streamId;
|
|
||||||
int32_t rspTaskId;
|
|
||||||
int32_t reqTaskId;
|
|
||||||
int8_t inputStatus;
|
|
||||||
} SStreamTaskRecoverRsp;
|
|
||||||
|
|
||||||
int32_t tEncodeStreamTaskRecoverReq(SEncoder* pEncoder, const SStreamTaskRecoverReq* pReq);
|
|
||||||
int32_t tDecodeStreamTaskRecoverReq(SDecoder* pDecoder, SStreamTaskRecoverReq* pReq);
|
|
||||||
|
|
||||||
int32_t tEncodeStreamTaskRecoverRsp(SEncoder* pEncoder, const SStreamTaskRecoverRsp* pRsp);
|
|
||||||
int32_t tDecodeStreamTaskRecoverRsp(SDecoder* pDecoder, SStreamTaskRecoverRsp* pRsp);
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
int64_t streamId;
|
|
||||||
int32_t taskId;
|
|
||||||
} SMStreamTaskRecoverReq;
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
int64_t streamId;
|
|
||||||
int32_t taskId;
|
|
||||||
} SMStreamTaskRecoverRsp;
|
|
||||||
|
|
||||||
int32_t tEncodeSMStreamTaskRecoverReq(SEncoder* pEncoder, const SMStreamTaskRecoverReq* pReq);
|
|
||||||
int32_t tDecodeSMStreamTaskRecoverReq(SDecoder* pDecoder, SMStreamTaskRecoverReq* pReq);
|
|
||||||
|
|
||||||
int32_t tEncodeSMStreamTaskRecoverRsp(SEncoder* pEncoder, const SMStreamTaskRecoverRsp* pRsp);
|
|
||||||
int32_t tDecodeSMStreamTaskRecoverRsp(SDecoder* pDecoder, SMStreamTaskRecoverRsp* pRsp);
|
|
||||||
|
|
||||||
int32_t streamProcessRecoverReq(SStreamTask* pTask, SStreamTaskRecoverReq* pReq, SRpcMsg* pMsg);
|
|
||||||
int32_t streamProcessRecoverRsp(SStreamTask* pTask, SStreamTaskRecoverRsp* pRsp);
|
|
||||||
#endif
|
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int64_t streamId;
|
int64_t streamId;
|
||||||
int32_t downstreamTaskId;
|
int32_t downstreamTaskId;
|
||||||
|
@ -509,20 +495,18 @@ typedef struct {
|
||||||
SArray* checkpointVer; // SArray<SStreamCheckpointInfo>
|
SArray* checkpointVer; // SArray<SStreamCheckpointInfo>
|
||||||
} SStreamRecoverDownstreamRsp;
|
} SStreamRecoverDownstreamRsp;
|
||||||
|
|
||||||
|
int32_t tEncodeSStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq);
|
||||||
|
int32_t tDecodeSStreamTaskCheckReq(SDecoder* pDecoder, SStreamTaskCheckReq* pReq);
|
||||||
|
|
||||||
|
int32_t tEncodeSStreamTaskCheckRsp(SEncoder* pEncoder, const SStreamTaskCheckRsp* pRsp);
|
||||||
|
int32_t tDecodeSStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp);
|
||||||
|
|
||||||
int32_t tEncodeSStreamTaskRecoverReq(SEncoder* pEncoder, const SStreamRecoverDownstreamReq* pReq);
|
int32_t tEncodeSStreamTaskRecoverReq(SEncoder* pEncoder, const SStreamRecoverDownstreamReq* pReq);
|
||||||
int32_t tDecodeSStreamTaskRecoverReq(SDecoder* pDecoder, SStreamRecoverDownstreamReq* pReq);
|
int32_t tDecodeSStreamTaskRecoverReq(SDecoder* pDecoder, SStreamRecoverDownstreamReq* pReq);
|
||||||
|
|
||||||
int32_t tEncodeSStreamTaskRecoverRsp(SEncoder* pEncoder, const SStreamRecoverDownstreamRsp* pRsp);
|
int32_t tEncodeSStreamTaskRecoverRsp(SEncoder* pEncoder, const SStreamRecoverDownstreamRsp* pRsp);
|
||||||
int32_t tDecodeSStreamTaskRecoverRsp(SDecoder* pDecoder, SStreamRecoverDownstreamRsp* pRsp);
|
int32_t tDecodeSStreamTaskRecoverRsp(SDecoder* pDecoder, SStreamRecoverDownstreamRsp* pRsp);
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
int64_t streamId;
|
|
||||||
int32_t taskId;
|
|
||||||
int32_t waitingRspCnt;
|
|
||||||
int32_t totReq;
|
|
||||||
SArray* info; // SArray<SArray<SStreamCheckpointInfo>*>
|
|
||||||
} SStreamRecoverStatus;
|
|
||||||
|
|
||||||
int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq);
|
int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq);
|
||||||
int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq);
|
int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq);
|
||||||
void tDeleteStreamRetrieveReq(SStreamRetrieveReq* pReq);
|
void tDeleteStreamRetrieveReq(SStreamRetrieveReq* pReq);
|
||||||
|
@ -533,7 +517,7 @@ int32_t streamSetupTrigger(SStreamTask* pTask);
|
||||||
|
|
||||||
int32_t streamProcessRunReq(SStreamTask* pTask);
|
int32_t streamProcessRunReq(SStreamTask* pTask);
|
||||||
int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pMsg, bool exec);
|
int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pMsg, bool exec);
|
||||||
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp);
|
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code);
|
||||||
|
|
||||||
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pMsg);
|
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pMsg);
|
||||||
int32_t streamProcessRetrieveRsp(SStreamTask* pTask, SStreamRetrieveRsp* pRsp);
|
int32_t streamProcessRetrieveRsp(SStreamTask* pTask, SStreamRetrieveRsp* pRsp);
|
||||||
|
@ -544,6 +528,10 @@ int32_t streamSchedExec(SStreamTask* pTask);
|
||||||
int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz);
|
int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz);
|
||||||
|
|
||||||
// recover and fill history
|
// recover and fill history
|
||||||
|
int32_t streamTaskCheckDownstream(SStreamTask* pTask, int64_t version);
|
||||||
|
int32_t streamTaskLaunchRecover(SStreamTask* pTask, int64_t version);
|
||||||
|
int32_t streamProcessTaskCheckReq(SStreamTask* pTask, const SStreamTaskCheckReq* pReq);
|
||||||
|
int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp, int64_t version);
|
||||||
// common
|
// common
|
||||||
int32_t streamSetParamForRecover(SStreamTask* pTask);
|
int32_t streamSetParamForRecover(SStreamTask* pTask);
|
||||||
int32_t streamRestoreParam(SStreamTask* pTask);
|
int32_t streamRestoreParam(SStreamTask* pTask);
|
||||||
|
|
|
@ -635,9 +635,12 @@ int32_t* taosGetErrno();
|
||||||
#define TSDB_CODE_TMQ_CONSUMER_MISMATCH TAOS_DEF_ERROR_CODE(0, 0x4001)
|
#define TSDB_CODE_TMQ_CONSUMER_MISMATCH TAOS_DEF_ERROR_CODE(0, 0x4001)
|
||||||
#define TSDB_CODE_TMQ_CONSUMER_CLOSED TAOS_DEF_ERROR_CODE(0, 0x4002)
|
#define TSDB_CODE_TMQ_CONSUMER_CLOSED TAOS_DEF_ERROR_CODE(0, 0x4002)
|
||||||
|
|
||||||
|
// stream
|
||||||
|
#define TSDB_CODE_STREAM_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x4100)
|
||||||
|
|
||||||
// TDLite
|
// TDLite
|
||||||
#define TSDB_CODE_TDLITE_IVLD_OPEN_FLAGS TAOS_DEF_ERROR_CODE(0, 0x4100)
|
#define TSDB_CODE_TDLITE_IVLD_OPEN_FLAGS TAOS_DEF_ERROR_CODE(0, 0x5100)
|
||||||
#define TSDB_CODE_TDLITE_IVLD_OPEN_DIR TAOS_DEF_ERROR_CODE(0, 0x4101)
|
#define TSDB_CODE_TDLITE_IVLD_OPEN_DIR TAOS_DEF_ERROR_CODE(0, 0x5101)
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -1756,7 +1756,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
|
||||||
|
|
||||||
rspObj = tmqHandleAllRsp(tmq, timeout, false);
|
rspObj = tmqHandleAllRsp(tmq, timeout, false);
|
||||||
if (rspObj) {
|
if (rspObj) {
|
||||||
tscDebug("consumer:%" PRId64 ", return rsp", tmq->consumerId);
|
tscDebug("consumer:%" PRId64 ", return rsp %p", tmq->consumerId, rspObj);
|
||||||
return (TAOS_RES*)rspObj;
|
return (TAOS_RES*)rspObj;
|
||||||
} else if (terrno == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
|
} else if (terrno == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
|
||||||
tscDebug("consumer:%" PRId64 ", return null since no committed offset", tmq->consumerId);
|
tscDebug("consumer:%" PRId64 ", return null since no committed offset", tmq->consumerId);
|
||||||
|
|
|
@ -58,7 +58,7 @@ static void smProcessStreamQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
|
||||||
dTrace("msg:%p, get from snode-stream queue", pMsg);
|
dTrace("msg:%p, get from snode-stream queue", pMsg);
|
||||||
int32_t code = sndProcessStreamMsg(pMgmt->pSnode, pMsg);
|
int32_t code = sndProcessStreamMsg(pMgmt->pSnode, pMsg);
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
dGError("snd, msg:%p failed to process stream since %s", pMsg, terrstr(code));
|
dGError("snd, msg:%p failed to process stream msg %s since %s", pMsg, TMSG_INFO(pMsg->msgType), terrstr(code));
|
||||||
smSendRsp(pMsg, terrno);
|
smSendRsp(pMsg, terrno);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -403,7 +403,6 @@ SArray *vmGetMsgHandles() {
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_COMMIT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_COMMIT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_HEARTBEAT, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_HEARTBEAT, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
|
||||||
|
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TRIGGER, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RUN, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RUN, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||||
|
@ -412,6 +411,9 @@ SArray *vmGetMsgHandles() {
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RECOVER_FINISH, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RECOVER_FINISH, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||||
|
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||||
|
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK_RSP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
|
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TRIGGER, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||||
|
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
|
|
|
@ -86,7 +86,8 @@ static void vmProcessStreamQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
|
||||||
int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, pMsg, pInfo);
|
int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, pMsg, pInfo);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
if (terrno != 0) code = terrno;
|
if (terrno != 0) code = terrno;
|
||||||
dGError("vgId:%d, msg:%p failed to process stream since %s", pVnode->vgId, pMsg, terrstr(code));
|
dGError("vgId:%d, msg:%p failed to process stream msg %s since %s", pVnode->vgId, pMsg, TMSG_INFO(pMsg->msgType),
|
||||||
|
terrstr(code));
|
||||||
vmSendRsp(pMsg, code);
|
vmSendRsp(pMsg, code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -218,7 +219,9 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t vmPutMsgToSyncCtrlQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, SYNC_CTRL_QUEUE); }
|
int32_t vmPutMsgToSyncCtrlQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||||
|
return vmPutMsgToQueue(pMgmt, pMsg, SYNC_CTRL_QUEUE);
|
||||||
|
}
|
||||||
|
|
||||||
int32_t vmPutMsgToSyncQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, SYNC_QUEUE); }
|
int32_t vmPutMsgToSyncQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, SYNC_QUEUE); }
|
||||||
|
|
||||||
|
|
|
@ -155,7 +155,7 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
|
||||||
_OVER:
|
_OVER:
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
if (terrno != 0) code = terrno;
|
if (terrno != 0) code = terrno;
|
||||||
dGTrace("msg:%p, failed to process since %s", pMsg, terrstr());
|
dGTrace("msg:%p, failed to process %s since %s", pMsg, TMSG_INFO(pMsg->msgType), terrstr());
|
||||||
|
|
||||||
if (IsReq(pRpc)) {
|
if (IsReq(pRpc)) {
|
||||||
SRpcMsg rsp = {.code = code, .info = pRpc->info};
|
SRpcMsg rsp = {.code = code, .info = pRpc->info};
|
||||||
|
|
|
@ -705,7 +705,8 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq, "drop-stream");
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, "drop-stream");
|
||||||
|
mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb);
|
||||||
if (pTrans == NULL) {
|
if (pTrans == NULL) {
|
||||||
mError("stream:%s, failed to drop since %s", dropReq.name, terrstr());
|
mError("stream:%s, failed to drop since %s", dropReq.name, terrstr());
|
||||||
sdbRelease(pMnode->pSdb, pStream);
|
sdbRelease(pMnode->pSdb, pStream);
|
||||||
|
|
|
@ -612,14 +612,14 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
|
||||||
mndReleaseSubscribe(pMnode, pSub);
|
mndReleaseSubscribe(pMnode, pSub);
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO replace assert with error check
|
if (mndDoRebalance(pMnode, &rebInput, &rebOutput) < 0) {
|
||||||
ASSERT(mndDoRebalance(pMnode, &rebInput, &rebOutput) == 0);
|
mError("mq rebalance internal error");
|
||||||
|
}
|
||||||
|
|
||||||
// if add more consumer to balanced subscribe,
|
// if add more consumer to balanced subscribe,
|
||||||
// possibly no vg is changed
|
// possibly no vg is changed
|
||||||
/*ASSERT(taosArrayGetSize(rebOutput.rebVgs) != 0);*/
|
/*ASSERT(taosArrayGetSize(rebOutput.rebVgs) != 0);*/
|
||||||
|
|
||||||
// TODO replace assert with error check
|
|
||||||
if (mndPersistRebResult(pMnode, pMsg, &rebOutput) < 0) {
|
if (mndPersistRebResult(pMnode, pMsg, &rebOutput) < 0) {
|
||||||
mError("mq rebalance persist rebalance output error, possibly vnode splitted or dropped");
|
mError("mq rebalance persist rebalance output error, possibly vnode splitted or dropped");
|
||||||
}
|
}
|
||||||
|
|
|
@ -231,10 +231,10 @@ int32_t sndProcessTaskRetrieveReq(SSnode *pSnode, SRpcMsg *pMsg) {
|
||||||
|
|
||||||
int32_t sndProcessTaskDispatchRsp(SSnode *pSnode, SRpcMsg *pMsg) {
|
int32_t sndProcessTaskDispatchRsp(SSnode *pSnode, SRpcMsg *pMsg) {
|
||||||
SStreamDispatchRsp *pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
SStreamDispatchRsp *pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||||
int32_t taskId = pRsp->taskId;
|
int32_t taskId = pRsp->upstreamTaskId;
|
||||||
SStreamTask *pTask = streamMetaGetTask(pSnode->pMeta, taskId);
|
SStreamTask *pTask = streamMetaGetTask(pSnode->pMeta, taskId);
|
||||||
if (pTask) {
|
if (pTask) {
|
||||||
streamProcessDispatchRsp(pTask, pRsp);
|
streamProcessDispatchRsp(pTask, pRsp, pMsg->code);
|
||||||
return 0;
|
return 0;
|
||||||
} else {
|
} else {
|
||||||
return -1;
|
return -1;
|
||||||
|
|
|
@ -158,7 +158,7 @@ int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, void *pTableL
|
||||||
void tsdbReaderClose(STsdbReader *pReader);
|
void tsdbReaderClose(STsdbReader *pReader);
|
||||||
bool tsdbNextDataBlock(STsdbReader *pReader);
|
bool tsdbNextDataBlock(STsdbReader *pReader);
|
||||||
bool tsdbTableNextDataBlock(STsdbReader *pReader, uint64_t uid);
|
bool tsdbTableNextDataBlock(STsdbReader *pReader, uint64_t uid);
|
||||||
void tsdbRetrieveDataBlockInfo(const STsdbReader* pReader, int32_t* rows, uint64_t* uid, STimeWindow* pWindow);
|
void tsdbRetrieveDataBlockInfo(const STsdbReader *pReader, int32_t *rows, uint64_t *uid, STimeWindow *pWindow);
|
||||||
int32_t tsdbRetrieveDatablockSMA(STsdbReader *pReader, SColumnDataAgg ***pBlockStatis, bool *allHave);
|
int32_t tsdbRetrieveDatablockSMA(STsdbReader *pReader, SColumnDataAgg ***pBlockStatis, bool *allHave);
|
||||||
SArray *tsdbRetrieveDataBlock(STsdbReader *pTsdbReadHandle, SArray *pColumnIdList);
|
SArray *tsdbRetrieveDataBlock(STsdbReader *pTsdbReadHandle, SArray *pColumnIdList);
|
||||||
int32_t tsdbReaderReset(STsdbReader *pReader, SQueryTableDataCond *pCond);
|
int32_t tsdbReaderReset(STsdbReader *pReader, SQueryTableDataCond *pCond);
|
||||||
|
@ -240,7 +240,7 @@ bool tqNextDataBlock(STqReader *pReader);
|
||||||
bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids);
|
bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids);
|
||||||
int32_t tqRetrieveDataBlock(SSDataBlock *pBlock, STqReader *pReader);
|
int32_t tqRetrieveDataBlock(SSDataBlock *pBlock, STqReader *pReader);
|
||||||
|
|
||||||
void vnodeEnqueueStreamMsg(SVnode *pVnode, SRpcMsg *pMsg);
|
int32_t vnodeEnqueueStreamMsg(SVnode *pVnode, SRpcMsg *pMsg);
|
||||||
|
|
||||||
// sma
|
// sma
|
||||||
int32_t smaGetTSmaDays(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days);
|
int32_t smaGetTSmaDays(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days);
|
||||||
|
|
|
@ -150,19 +150,19 @@ typedef struct {
|
||||||
int32_t metaGetStbStats(SMeta* pMeta, int64_t uid, SMetaStbStats* pInfo);
|
int32_t metaGetStbStats(SMeta* pMeta, int64_t uid, SMetaStbStats* pInfo);
|
||||||
|
|
||||||
// tsdb
|
// tsdb
|
||||||
int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeepCfg* pKeepCfg, int8_t rollback);
|
int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeepCfg* pKeepCfg, int8_t rollback);
|
||||||
int tsdbClose(STsdb** pTsdb);
|
int tsdbClose(STsdb** pTsdb);
|
||||||
int32_t tsdbBegin(STsdb* pTsdb);
|
int32_t tsdbBegin(STsdb* pTsdb);
|
||||||
int32_t tsdbCommit(STsdb* pTsdb);
|
int32_t tsdbCommit(STsdb* pTsdb);
|
||||||
int32_t tsdbFinishCommit(STsdb* pTsdb);
|
int32_t tsdbFinishCommit(STsdb* pTsdb);
|
||||||
int32_t tsdbRollbackCommit(STsdb* pTsdb);
|
int32_t tsdbRollbackCommit(STsdb* pTsdb);
|
||||||
int32_t tsdbDoRetention(STsdb* pTsdb, int64_t now);
|
int32_t tsdbDoRetention(STsdb* pTsdb, int64_t now);
|
||||||
int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq* pMsg);
|
int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq* pMsg);
|
||||||
int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSubmitRsp* pRsp);
|
int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSubmitRsp* pRsp);
|
||||||
int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock,
|
int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock,
|
||||||
SSubmitBlkRsp* pRsp);
|
SSubmitBlkRsp* pRsp);
|
||||||
int32_t tsdbDeleteTableData(STsdb* pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey);
|
int32_t tsdbDeleteTableData(STsdb* pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey);
|
||||||
int32_t tsdbSetKeepCfg(STsdb* pTsdb, STsdbCfg* pCfg);
|
int32_t tsdbSetKeepCfg(STsdb* pTsdb, STsdbCfg* pCfg);
|
||||||
|
|
||||||
// tq
|
// tq
|
||||||
int tqInit();
|
int tqInit();
|
||||||
|
@ -183,13 +183,13 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg);
|
||||||
// tq-stream
|
// tq-stream
|
||||||
int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
|
int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
|
||||||
int32_t tqProcessTaskDropReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
|
int32_t tqProcessTaskDropReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
|
||||||
|
int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg);
|
||||||
|
int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
|
||||||
int32_t tqProcessSubmitReq(STQ* pTq, SSubmitReq* data, int64_t ver);
|
int32_t tqProcessSubmitReq(STQ* pTq, SSubmitReq* data, int64_t ver);
|
||||||
int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver);
|
int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver);
|
||||||
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg);
|
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg);
|
||||||
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec);
|
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec);
|
||||||
int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg);
|
int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg);
|
||||||
// int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg);
|
|
||||||
// int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg);
|
|
||||||
int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg);
|
int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg);
|
||||||
int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg);
|
int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg);
|
||||||
int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg);
|
int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg);
|
||||||
|
|
|
@ -896,6 +896,10 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
|
||||||
pTask->startVer = ver;
|
pTask->startVer = ver;
|
||||||
|
|
||||||
// expand executor
|
// expand executor
|
||||||
|
if (pTask->fillHistory) {
|
||||||
|
pTask->taskStatus = TASK_STATUS__WAIT_DOWNSTREAM;
|
||||||
|
}
|
||||||
|
|
||||||
if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
|
if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
|
||||||
pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1);
|
pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1);
|
||||||
if (pTask->pState == NULL) {
|
if (pTask->pState == NULL) {
|
||||||
|
@ -911,9 +915,6 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
|
||||||
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle);
|
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle);
|
||||||
ASSERT(pTask->exec.executor);
|
ASSERT(pTask->exec.executor);
|
||||||
|
|
||||||
if (pTask->fillHistory) {
|
|
||||||
pTask->taskStatus = TASK_STATUS__RECOVER_PREPARE;
|
|
||||||
}
|
|
||||||
} else if (pTask->taskLevel == TASK_LEVEL__AGG) {
|
} else if (pTask->taskLevel == TASK_LEVEL__AGG) {
|
||||||
pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1);
|
pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1);
|
||||||
if (pTask->pState == NULL) {
|
if (pTask->pState == NULL) {
|
||||||
|
@ -947,11 +948,90 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
|
||||||
|
|
||||||
streamSetupTrigger(pTask);
|
streamSetupTrigger(pTask);
|
||||||
|
|
||||||
tqInfo("expand stream task on vg %d, task id %d, child id %d", TD_VID(pTq->pVnode), pTask->taskId,
|
tqInfo("expand stream task on vg %d, task id %d, child id %d, level %d", TD_VID(pTq->pVnode), pTask->taskId,
|
||||||
pTask->selfChildId);
|
pTask->selfChildId, pTask->taskLevel);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
|
char* msgStr = pMsg->pCont;
|
||||||
|
char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
|
||||||
|
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
|
||||||
|
SStreamTaskCheckReq req;
|
||||||
|
SDecoder decoder;
|
||||||
|
tDecoderInit(&decoder, msgBody, msgLen);
|
||||||
|
tDecodeSStreamTaskCheckReq(&decoder, &req);
|
||||||
|
tDecoderClear(&decoder);
|
||||||
|
int32_t taskId = req.downstreamTaskId;
|
||||||
|
SStreamTaskCheckRsp rsp = {
|
||||||
|
.reqId = req.reqId,
|
||||||
|
.streamId = req.streamId,
|
||||||
|
.childId = req.childId,
|
||||||
|
.downstreamNodeId = req.downstreamNodeId,
|
||||||
|
.downstreamTaskId = req.downstreamTaskId,
|
||||||
|
.upstreamNodeId = req.upstreamNodeId,
|
||||||
|
.upstreamTaskId = req.upstreamTaskId,
|
||||||
|
};
|
||||||
|
SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId);
|
||||||
|
if (pTask && atomic_load_8(&pTask->taskStatus) == TASK_STATUS__NORMAL) {
|
||||||
|
rsp.status = 1;
|
||||||
|
} else {
|
||||||
|
rsp.status = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
tqDebug("tq recv task check req(reqId: %" PRId64 ") %d at node %d check req from task %d at node %d, status %d",
|
||||||
|
rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
|
||||||
|
|
||||||
|
SEncoder encoder;
|
||||||
|
int32_t code;
|
||||||
|
int32_t len;
|
||||||
|
tEncodeSize(tEncodeSStreamTaskCheckRsp, &rsp, len, code);
|
||||||
|
if (code < 0) {
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
void* buf = rpcMallocCont(sizeof(SMsgHead) + len);
|
||||||
|
((SMsgHead*)buf)->vgId = htonl(req.upstreamNodeId);
|
||||||
|
|
||||||
|
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
||||||
|
tEncoderInit(&encoder, (uint8_t*)abuf, len);
|
||||||
|
tEncodeSStreamTaskCheckRsp(&encoder, &rsp);
|
||||||
|
tEncoderClear(&encoder);
|
||||||
|
|
||||||
|
SRpcMsg rspMsg = {
|
||||||
|
.code = 0,
|
||||||
|
.pCont = buf,
|
||||||
|
.contLen = sizeof(SMsgHead) + len,
|
||||||
|
.info = pMsg->info,
|
||||||
|
};
|
||||||
|
|
||||||
|
tmsgSendRsp(&rspMsg);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
|
||||||
|
int32_t code;
|
||||||
|
SStreamTaskCheckRsp rsp;
|
||||||
|
|
||||||
|
SDecoder decoder;
|
||||||
|
tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
|
||||||
|
code = tDecodeSStreamTaskCheckRsp(&decoder, &rsp);
|
||||||
|
if (code < 0) {
|
||||||
|
tDecoderClear(&decoder);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
tDecoderClear(&decoder);
|
||||||
|
|
||||||
|
tqDebug("tq recv task check rsp(reqId: %" PRId64 ") %d at node %d check req from task %d at node %d, status %d",
|
||||||
|
rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
|
||||||
|
|
||||||
|
SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, rsp.upstreamTaskId);
|
||||||
|
if (pTask == NULL) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return streamProcessTaskCheckRsp(pTask, &rsp, version);
|
||||||
|
}
|
||||||
|
|
||||||
int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
|
int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
|
||||||
int32_t code;
|
int32_t code;
|
||||||
#if 0
|
#if 0
|
||||||
|
@ -982,37 +1062,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t version, char* msg, int32_t msg
|
||||||
|
|
||||||
// 3.go through recover steps to fill history
|
// 3.go through recover steps to fill history
|
||||||
if (pTask->fillHistory) {
|
if (pTask->fillHistory) {
|
||||||
if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
|
streamTaskCheckDownstream(pTask, version);
|
||||||
streamSetParamForRecover(pTask);
|
|
||||||
streamSourceRecoverPrepareStep1(pTask, version);
|
|
||||||
|
|
||||||
SStreamRecoverStep1Req req;
|
|
||||||
streamBuildSourceRecover1Req(pTask, &req);
|
|
||||||
int32_t len = sizeof(SStreamRecoverStep1Req);
|
|
||||||
|
|
||||||
void* serializedReq = rpcMallocCont(len);
|
|
||||||
if (serializedReq == NULL) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
memcpy(serializedReq, &req, len);
|
|
||||||
|
|
||||||
SRpcMsg rpcMsg = {
|
|
||||||
.contLen = len,
|
|
||||||
.pCont = serializedReq,
|
|
||||||
.msgType = TDMT_VND_STREAM_RECOVER_STEP1,
|
|
||||||
};
|
|
||||||
|
|
||||||
if (tmsgPutToQueue(&pTq->pVnode->msgCb, STREAM_QUEUE, &rpcMsg) < 0) {
|
|
||||||
/*ASSERT(0);*/
|
|
||||||
}
|
|
||||||
|
|
||||||
} else if (pTask->taskLevel == TASK_LEVEL__AGG) {
|
|
||||||
streamSetParamForRecover(pTask);
|
|
||||||
streamAggRecoverPrepare(pTask);
|
|
||||||
} else if (pTask->taskLevel == TASK_LEVEL__SINK) {
|
|
||||||
// do nothing
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -1268,7 +1318,7 @@ int32_t tqProcessSubmitReq(STQ* pTq, SSubmitReq* pReq, int64_t ver) {
|
||||||
if (pIter == NULL) break;
|
if (pIter == NULL) break;
|
||||||
SStreamTask* pTask = *(SStreamTask**)pIter;
|
SStreamTask* pTask = *(SStreamTask**)pIter;
|
||||||
if (pTask->taskLevel != TASK_LEVEL__SOURCE) continue;
|
if (pTask->taskLevel != TASK_LEVEL__SOURCE) continue;
|
||||||
if (pTask->taskStatus == TASK_STATUS__RECOVER_PREPARE) {
|
if (pTask->taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) {
|
||||||
tqDebug("skip push task %d, task status %d", pTask->taskId, pTask->taskStatus);
|
tqDebug("skip push task %d, task status %d", pTask->taskId, pTask->taskStatus);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -1335,10 +1385,11 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) {
|
||||||
|
|
||||||
int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
|
int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||||
int32_t taskId = pRsp->taskId;
|
int32_t taskId = ntohl(pRsp->upstreamTaskId);
|
||||||
SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId);
|
SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId);
|
||||||
|
tqDebug("recv dispatch rsp, code: %x", pMsg->code);
|
||||||
if (pTask) {
|
if (pTask) {
|
||||||
streamProcessDispatchRsp(pTask, pRsp);
|
streamProcessDispatchRsp(pTask, pRsp, pMsg->code);
|
||||||
return 0;
|
return 0;
|
||||||
} else {
|
} else {
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -1379,12 +1430,12 @@ int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) {
|
int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) {
|
||||||
STQ* pTq = pVnode->pTq;
|
STQ* pTq = pVnode->pTq;
|
||||||
char* msgStr = pMsg->pCont;
|
SMsgHead* msgStr = pMsg->pCont;
|
||||||
char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
|
char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
|
||||||
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
|
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
SStreamDispatchReq req;
|
SStreamDispatchReq req;
|
||||||
SDecoder decoder;
|
SDecoder decoder;
|
||||||
|
@ -1407,16 +1458,45 @@ void vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) {
|
||||||
streamProcessDispatchReq(pTask, &req, &rsp, false);
|
streamProcessDispatchReq(pTask, &req, &rsp, false);
|
||||||
rpcFreeCont(pMsg->pCont);
|
rpcFreeCont(pMsg->pCont);
|
||||||
taosFreeQitem(pMsg);
|
taosFreeQitem(pMsg);
|
||||||
return;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
code = TSDB_CODE_STREAM_TASK_NOT_EXIST;
|
||||||
|
|
||||||
FAIL:
|
FAIL:
|
||||||
if (pMsg->info.handle == NULL) return;
|
if (pMsg->info.handle == NULL) return -1;
|
||||||
|
|
||||||
|
SMsgHead* pRspHead = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp));
|
||||||
|
if (pRspHead == NULL) {
|
||||||
|
SRpcMsg rsp = {
|
||||||
|
.code = TSDB_CODE_OUT_OF_MEMORY,
|
||||||
|
.info = pMsg->info,
|
||||||
|
};
|
||||||
|
tqDebug("send dispatch error rsp, code: %x", code);
|
||||||
|
tmsgSendRsp(&rsp);
|
||||||
|
rpcFreeCont(pMsg->pCont);
|
||||||
|
taosFreeQitem(pMsg);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
pRspHead->vgId = htonl(req.upstreamNodeId);
|
||||||
|
SStreamDispatchRsp* pRsp = POINTER_SHIFT(pRspHead, sizeof(SMsgHead));
|
||||||
|
pRsp->streamId = htobe64(req.streamId);
|
||||||
|
pRsp->upstreamTaskId = htonl(req.upstreamTaskId);
|
||||||
|
pRsp->upstreamNodeId = htonl(req.upstreamNodeId);
|
||||||
|
pRsp->downstreamNodeId = htonl(pVnode->config.vgId);
|
||||||
|
pRsp->downstreamTaskId = htonl(req.taskId);
|
||||||
|
pRsp->inputStatus = TASK_OUTPUT_STATUS__NORMAL;
|
||||||
|
|
||||||
SRpcMsg rsp = {
|
SRpcMsg rsp = {
|
||||||
.code = code,
|
.code = code,
|
||||||
.info = pMsg->info,
|
.info = pMsg->info,
|
||||||
|
.contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp),
|
||||||
|
.pCont = pRspHead,
|
||||||
};
|
};
|
||||||
|
tqDebug("send dispatch error rsp, code: %x", code);
|
||||||
tmsgSendRsp(&rsp);
|
tmsgSendRsp(&rsp);
|
||||||
rpcFreeCont(pMsg->pCont);
|
rpcFreeCont(pMsg->pCont);
|
||||||
taosFreeQitem(pMsg);
|
taosFreeQitem(pMsg);
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -229,8 +229,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
|
||||||
break;
|
break;
|
||||||
/* TQ */
|
/* TQ */
|
||||||
case TDMT_VND_TMQ_SUBSCRIBE:
|
case TDMT_VND_TMQ_SUBSCRIBE:
|
||||||
if (tqProcessSubscribeReq(pVnode->pTq, version, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
|
if (tqProcessSubscribeReq(pVnode->pTq, version, pReq, len) < 0) {
|
||||||
pMsg->contLen - sizeof(SMsgHead)) < 0) {
|
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
@ -240,26 +239,22 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case TDMT_VND_TMQ_COMMIT_OFFSET:
|
case TDMT_VND_TMQ_COMMIT_OFFSET:
|
||||||
if (tqProcessOffsetCommitReq(pVnode->pTq, version, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
|
if (tqProcessOffsetCommitReq(pVnode->pTq, version, pReq, pMsg->contLen - sizeof(SMsgHead)) < 0) {
|
||||||
pMsg->contLen - sizeof(SMsgHead)) < 0) {
|
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case TDMT_VND_TMQ_ADD_CHECKINFO:
|
case TDMT_VND_TMQ_ADD_CHECKINFO:
|
||||||
if (tqProcessAddCheckInfoReq(pVnode->pTq, version, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
|
if (tqProcessAddCheckInfoReq(pVnode->pTq, version, pReq, len) < 0) {
|
||||||
pMsg->contLen - sizeof(SMsgHead)) < 0) {
|
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case TDMT_VND_TMQ_DEL_CHECKINFO:
|
case TDMT_VND_TMQ_DEL_CHECKINFO:
|
||||||
if (tqProcessDelCheckInfoReq(pVnode->pTq, version, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
|
if (tqProcessDelCheckInfoReq(pVnode->pTq, version, pReq, len) < 0) {
|
||||||
pMsg->contLen - sizeof(SMsgHead)) < 0) {
|
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case TDMT_STREAM_TASK_DEPLOY: {
|
case TDMT_STREAM_TASK_DEPLOY: {
|
||||||
if (tqProcessTaskDeployReq(pVnode->pTq, version, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
|
if (tqProcessTaskDeployReq(pVnode->pTq, version, pReq, len) < 0) {
|
||||||
pMsg->contLen - sizeof(SMsgHead)) < 0) {
|
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
} break;
|
} break;
|
||||||
|
@ -273,6 +268,11 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
} break;
|
} break;
|
||||||
|
case TDMT_STREAM_TASK_CHECK_RSP: {
|
||||||
|
if (tqProcessStreamTaskCheckRsp(pVnode->pTq, version, pReq, len) < 0) {
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
} break;
|
||||||
case TDMT_VND_ALTER_CONFIRM:
|
case TDMT_VND_ALTER_CONFIRM:
|
||||||
vnodeProcessAlterConfirmReq(pVnode, version, pReq, len, pRsp);
|
vnodeProcessAlterConfirmReq(pVnode, version, pReq, len, pRsp);
|
||||||
break;
|
break;
|
||||||
|
@ -388,10 +388,12 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
|
||||||
return tqProcessPollReq(pVnode->pTq, pMsg);
|
return tqProcessPollReq(pVnode->pTq, pMsg);
|
||||||
case TDMT_STREAM_TASK_RUN:
|
case TDMT_STREAM_TASK_RUN:
|
||||||
return tqProcessTaskRunReq(pVnode->pTq, pMsg);
|
return tqProcessTaskRunReq(pVnode->pTq, pMsg);
|
||||||
#if 0
|
#if 1
|
||||||
case TDMT_STREAM_TASK_DISPATCH:
|
case TDMT_STREAM_TASK_DISPATCH:
|
||||||
return tqProcessTaskDispatchReq(pVnode->pTq, pMsg, true);
|
return tqProcessTaskDispatchReq(pVnode->pTq, pMsg, true);
|
||||||
#endif
|
#endif
|
||||||
|
case TDMT_STREAM_TASK_CHECK:
|
||||||
|
return tqProcessStreamTaskCheckReq(pVnode->pTq, pMsg);
|
||||||
case TDMT_STREAM_TASK_DISPATCH_RSP:
|
case TDMT_STREAM_TASK_DISPATCH_RSP:
|
||||||
return tqProcessTaskDispatchRsp(pVnode->pTq, pMsg);
|
return tqProcessTaskDispatchRsp(pVnode->pTq, pMsg);
|
||||||
case TDMT_STREAM_RETRIEVE:
|
case TDMT_STREAM_RETRIEVE:
|
||||||
|
|
|
@ -1884,11 +1884,15 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
||||||
STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
|
STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
|
||||||
memcpy(&pTSInfo->cond, &pTaskInfo->streamInfo.tableCond, sizeof(SQueryTableDataCond));
|
memcpy(&pTSInfo->cond, &pTaskInfo->streamInfo.tableCond, sizeof(SQueryTableDataCond));
|
||||||
if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE1) {
|
if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE1) {
|
||||||
pTSInfo->cond.startVersion = -1;
|
pTSInfo->cond.startVersion = 0;
|
||||||
pTSInfo->cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer1;
|
pTSInfo->cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer1;
|
||||||
|
qDebug("stream recover step 1, from %" PRId64 " to %" PRId64, pTSInfo->cond.startVersion,
|
||||||
|
pTSInfo->cond.endVersion);
|
||||||
} else {
|
} else {
|
||||||
pTSInfo->cond.startVersion = pTaskInfo->streamInfo.fillHistoryVer1 + 1;
|
pTSInfo->cond.startVersion = pTaskInfo->streamInfo.fillHistoryVer1 + 1;
|
||||||
pTSInfo->cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer2;
|
pTSInfo->cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer2;
|
||||||
|
qDebug("stream recover step 2, from %" PRId64 " to %" PRId64, pTSInfo->cond.startVersion,
|
||||||
|
pTSInfo->cond.endVersion);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*resetTableScanInfo(pTSInfo, pWin);*/
|
/*resetTableScanInfo(pTSInfo, pWin);*/
|
||||||
|
@ -1905,11 +1909,15 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
||||||
if (pBlock != NULL) {
|
if (pBlock != NULL) {
|
||||||
calBlockTbName(&pInfo->tbnameCalSup, pBlock);
|
calBlockTbName(&pInfo->tbnameCalSup, pBlock);
|
||||||
updateInfoFillBlockData(pInfo->pUpdateInfo, pBlock, pInfo->primaryTsIndex);
|
updateInfoFillBlockData(pInfo->pUpdateInfo, pBlock, pInfo->primaryTsIndex);
|
||||||
|
qDebug("stream recover scan get block, rows %d", pBlock->info.rows);
|
||||||
return pBlock;
|
return pBlock;
|
||||||
}
|
}
|
||||||
pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__NONE;
|
pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__NONE;
|
||||||
STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
|
STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
|
||||||
pTSInfo->cond.startVersion = 0;
|
tsdbReaderClose(pTSInfo->dataReader);
|
||||||
|
pTSInfo->dataReader = NULL;
|
||||||
|
|
||||||
|
pTSInfo->cond.startVersion = -1;
|
||||||
pTSInfo->cond.endVersion = -1;
|
pTSInfo->cond.endVersion = -1;
|
||||||
|
|
||||||
return NULL;
|
return NULL;
|
||||||
|
|
|
@ -43,6 +43,8 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock)
|
||||||
|
|
||||||
int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq);
|
int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq);
|
||||||
|
|
||||||
|
int32_t streamDispatchOneCheckReq(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet);
|
||||||
|
|
||||||
int32_t streamDispatchOneRecoverFinishReq(SStreamTask* pTask, const SStreamRecoverFinishReq* pReq, int32_t vgId,
|
int32_t streamDispatchOneRecoverFinishReq(SStreamTask* pTask, const SStreamRecoverFinishReq* pReq, int32_t vgId,
|
||||||
SEpSet* pEpSet);
|
SEpSet* pEpSet);
|
||||||
|
|
||||||
|
|
|
@ -135,8 +135,11 @@ int32_t streamTaskEnqueue(SStreamTask* pTask, const SStreamDispatchReq* pReq, SR
|
||||||
((SMsgHead*)buf)->vgId = htonl(pReq->upstreamNodeId);
|
((SMsgHead*)buf)->vgId = htonl(pReq->upstreamNodeId);
|
||||||
SStreamDispatchRsp* pCont = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
SStreamDispatchRsp* pCont = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
||||||
pCont->inputStatus = status;
|
pCont->inputStatus = status;
|
||||||
pCont->streamId = pReq->streamId;
|
pCont->streamId = htobe64(pReq->streamId);
|
||||||
pCont->taskId = pReq->upstreamTaskId;
|
pCont->upstreamNodeId = htonl(pReq->upstreamNodeId);
|
||||||
|
pCont->upstreamTaskId = htonl(pReq->upstreamTaskId);
|
||||||
|
pCont->downstreamNodeId = htonl(pTask->nodeId);
|
||||||
|
pCont->downstreamTaskId = htonl(pTask->taskId);
|
||||||
pRsp->pCont = buf;
|
pRsp->pCont = buf;
|
||||||
pRsp->contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp);
|
pRsp->contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp);
|
||||||
tmsgSendRsp(pRsp);
|
tmsgSendRsp(pRsp);
|
||||||
|
@ -203,10 +206,10 @@ int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, S
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp) {
|
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code) {
|
||||||
ASSERT(pRsp->inputStatus == TASK_OUTPUT_STATUS__NORMAL || pRsp->inputStatus == TASK_OUTPUT_STATUS__BLOCKED);
|
ASSERT(pRsp->inputStatus == TASK_OUTPUT_STATUS__NORMAL || pRsp->inputStatus == TASK_OUTPUT_STATUS__BLOCKED);
|
||||||
|
|
||||||
qDebug("task %d receive dispatch rsp", pTask->taskId);
|
qDebug("task %d receive dispatch rsp, code: %x", pTask->taskId, code);
|
||||||
|
|
||||||
if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||||
int32_t leftRsp = atomic_sub_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1);
|
int32_t leftRsp = atomic_sub_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1);
|
||||||
|
|
|
@ -210,6 +210,46 @@ static int32_t streamAddBlockToDispatchMsg(const SSDataBlock* pBlock, SStreamDis
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t streamDispatchOneCheckReq(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet) {
|
||||||
|
void* buf = NULL;
|
||||||
|
int32_t code = -1;
|
||||||
|
SRpcMsg msg = {0};
|
||||||
|
|
||||||
|
int32_t tlen;
|
||||||
|
tEncodeSize(tEncodeSStreamTaskCheckReq, pReq, tlen, code);
|
||||||
|
if (code < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
buf = rpcMallocCont(sizeof(SMsgHead) + tlen);
|
||||||
|
if (buf == NULL) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
((SMsgHead*)buf)->vgId = htonl(nodeId);
|
||||||
|
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
||||||
|
|
||||||
|
SEncoder encoder;
|
||||||
|
tEncoderInit(&encoder, abuf, tlen);
|
||||||
|
if ((code = tEncodeSStreamTaskCheckReq(&encoder, pReq)) < 0) {
|
||||||
|
goto FAIL;
|
||||||
|
}
|
||||||
|
tEncoderClear(&encoder);
|
||||||
|
|
||||||
|
msg.contLen = tlen + sizeof(SMsgHead);
|
||||||
|
msg.pCont = buf;
|
||||||
|
msg.msgType = TDMT_STREAM_TASK_CHECK;
|
||||||
|
|
||||||
|
qDebug("dispatch from task %d to task %d node %d: check msg", pTask->taskId, pReq->downstreamTaskId, nodeId);
|
||||||
|
|
||||||
|
tmsgSendReq(pEpSet, &msg);
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
FAIL:
|
||||||
|
if (buf) rpcFreeCont(buf);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t streamDispatchOneRecoverFinishReq(SStreamTask* pTask, const SStreamRecoverFinishReq* pReq, int32_t vgId,
|
int32_t streamDispatchOneRecoverFinishReq(SStreamTask* pTask, const SStreamRecoverFinishReq* pReq, int32_t vgId,
|
||||||
SEpSet* pEpSet) {
|
SEpSet* pEpSet) {
|
||||||
void* buf = NULL;
|
void* buf = NULL;
|
||||||
|
@ -243,7 +283,8 @@ int32_t streamDispatchOneRecoverFinishReq(SStreamTask* pTask, const SStreamRecov
|
||||||
|
|
||||||
tmsgSendReq(pEpSet, &msg);
|
tmsgSendReq(pEpSet, &msg);
|
||||||
|
|
||||||
code = 0;
|
qDebug("dispatch from task %d to task %d node %d: recover finish msg", pTask->taskId, pReq->taskId, vgId);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
FAIL:
|
FAIL:
|
||||||
if (buf) rpcFreeCont(buf);
|
if (buf) rpcFreeCont(buf);
|
||||||
|
@ -279,7 +320,7 @@ int32_t streamDispatchOneDataReq(SStreamTask* pTask, const SStreamDispatchReq* p
|
||||||
msg.pCont = buf;
|
msg.pCont = buf;
|
||||||
msg.msgType = pTask->dispatchMsgType;
|
msg.msgType = pTask->dispatchMsgType;
|
||||||
|
|
||||||
qDebug("dispatch from task %d to task %d node %d", pTask->taskId, pReq->taskId, vgId);
|
qDebug("dispatch from task %d to task %d node %d: data msg", pTask->taskId, pReq->taskId, vgId);
|
||||||
|
|
||||||
tmsgSendReq(pEpSet, &msg);
|
tmsgSendReq(pEpSet, &msg);
|
||||||
|
|
||||||
|
|
|
@ -202,83 +202,83 @@ int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum, bool dispatch)
|
||||||
int32_t streamExecForAll(SStreamTask* pTask) {
|
int32_t streamExecForAll(SStreamTask* pTask) {
|
||||||
while (1) {
|
while (1) {
|
||||||
int32_t batchCnt = 1;
|
int32_t batchCnt = 1;
|
||||||
void* data = NULL;
|
void* input = NULL;
|
||||||
while (1) {
|
while (1) {
|
||||||
SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue);
|
SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue);
|
||||||
if (qItem == NULL) {
|
if (qItem == NULL) {
|
||||||
qDebug("stream task exec over, queue empty, task: %d", pTask->taskId);
|
qDebug("stream task exec over, queue empty, task: %d", pTask->taskId);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
if (data == NULL) {
|
if (input == NULL) {
|
||||||
data = qItem;
|
input = qItem;
|
||||||
streamQueueProcessSuccess(pTask->inputQueue);
|
streamQueueProcessSuccess(pTask->inputQueue);
|
||||||
if (pTask->taskLevel == TASK_LEVEL__SINK) {
|
if (pTask->taskLevel == TASK_LEVEL__SINK) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
void* newRet;
|
void* newRet;
|
||||||
if ((newRet = streamMergeQueueItem(data, qItem)) == NULL) {
|
if ((newRet = streamMergeQueueItem(input, qItem)) == NULL) {
|
||||||
streamQueueProcessFail(pTask->inputQueue);
|
streamQueueProcessFail(pTask->inputQueue);
|
||||||
break;
|
break;
|
||||||
} else {
|
} else {
|
||||||
batchCnt++;
|
batchCnt++;
|
||||||
data = newRet;
|
input = newRet;
|
||||||
streamQueueProcessSuccess(pTask->inputQueue);
|
streamQueueProcessSuccess(pTask->inputQueue);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pTask->taskStatus == TASK_STATUS__DROPPING) {
|
if (pTask->taskStatus == TASK_STATUS__DROPPING) {
|
||||||
if (data) streamFreeQitem(data);
|
if (input) streamFreeQitem(input);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (data == NULL) {
|
if (input == NULL) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pTask->taskLevel == TASK_LEVEL__SINK) {
|
if (pTask->taskLevel == TASK_LEVEL__SINK) {
|
||||||
ASSERT(((SStreamQueueItem*)data)->type == STREAM_INPUT__DATA_BLOCK);
|
ASSERT(((SStreamQueueItem*)input)->type == STREAM_INPUT__DATA_BLOCK);
|
||||||
streamTaskOutput(pTask, data);
|
streamTaskOutput(pTask, input);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
|
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
|
||||||
|
|
||||||
qDebug("stream task %d exec begin, msg batch: %d", pTask->taskId, batchCnt);
|
qDebug("stream task %d exec begin, msg batch: %d", pTask->taskId, batchCnt);
|
||||||
streamTaskExecImpl(pTask, data, pRes);
|
streamTaskExecImpl(pTask, input, pRes);
|
||||||
qDebug("stream task %d exec end", pTask->taskId);
|
qDebug("stream task %d exec end", pTask->taskId);
|
||||||
|
|
||||||
if (taosArrayGetSize(pRes) != 0) {
|
if (taosArrayGetSize(pRes) != 0) {
|
||||||
SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
|
SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
|
||||||
if (qRes == NULL) {
|
if (qRes == NULL) {
|
||||||
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
|
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
|
||||||
streamFreeQitem(data);
|
streamFreeQitem(input);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
qRes->type = STREAM_INPUT__DATA_BLOCK;
|
qRes->type = STREAM_INPUT__DATA_BLOCK;
|
||||||
qRes->blocks = pRes;
|
qRes->blocks = pRes;
|
||||||
|
|
||||||
if (((SStreamQueueItem*)data)->type == STREAM_INPUT__DATA_SUBMIT) {
|
if (((SStreamQueueItem*)input)->type == STREAM_INPUT__DATA_SUBMIT) {
|
||||||
SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)data;
|
SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)input;
|
||||||
qRes->childId = pTask->selfChildId;
|
qRes->childId = pTask->selfChildId;
|
||||||
qRes->sourceVer = pSubmit->ver;
|
qRes->sourceVer = pSubmit->ver;
|
||||||
} else if (((SStreamQueueItem*)data)->type == STREAM_INPUT__MERGED_SUBMIT) {
|
} else if (((SStreamQueueItem*)input)->type == STREAM_INPUT__MERGED_SUBMIT) {
|
||||||
SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)data;
|
SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)input;
|
||||||
qRes->childId = pTask->selfChildId;
|
qRes->childId = pTask->selfChildId;
|
||||||
qRes->sourceVer = pMerged->ver;
|
qRes->sourceVer = pMerged->ver;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (streamTaskOutput(pTask, qRes) < 0) {
|
if (streamTaskOutput(pTask, qRes) < 0) {
|
||||||
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
|
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
|
||||||
streamFreeQitem(data);
|
streamFreeQitem(input);
|
||||||
taosFreeQitem(qRes);
|
taosFreeQitem(qRes);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
taosArrayDestroy(pRes);
|
taosArrayDestroy(pRes);
|
||||||
}
|
}
|
||||||
streamFreeQitem(data);
|
streamFreeQitem(input);
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,6 +15,148 @@
|
||||||
|
|
||||||
#include "streamInc.h"
|
#include "streamInc.h"
|
||||||
|
|
||||||
|
int32_t streamTaskLaunchRecover(SStreamTask* pTask, int64_t version) {
|
||||||
|
qDebug("task %d at node %d launch recover", pTask->taskId, pTask->nodeId);
|
||||||
|
if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
|
||||||
|
atomic_store_8(&pTask->taskStatus, TASK_STATUS__RECOVER_PREPARE);
|
||||||
|
streamSetParamForRecover(pTask);
|
||||||
|
streamSourceRecoverPrepareStep1(pTask, version);
|
||||||
|
|
||||||
|
SStreamRecoverStep1Req req;
|
||||||
|
streamBuildSourceRecover1Req(pTask, &req);
|
||||||
|
int32_t len = sizeof(SStreamRecoverStep1Req);
|
||||||
|
|
||||||
|
void* serializedReq = rpcMallocCont(len);
|
||||||
|
if (serializedReq == NULL) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
memcpy(serializedReq, &req, len);
|
||||||
|
|
||||||
|
SRpcMsg rpcMsg = {
|
||||||
|
.contLen = len,
|
||||||
|
.pCont = serializedReq,
|
||||||
|
.msgType = TDMT_VND_STREAM_RECOVER_STEP1,
|
||||||
|
};
|
||||||
|
|
||||||
|
if (tmsgPutToQueue(pTask->pMsgCb, STREAM_QUEUE, &rpcMsg) < 0) {
|
||||||
|
/*ASSERT(0);*/
|
||||||
|
}
|
||||||
|
|
||||||
|
} else if (pTask->taskLevel == TASK_LEVEL__AGG) {
|
||||||
|
atomic_store_8(&pTask->taskStatus, TASK_STATUS__NORMAL);
|
||||||
|
streamSetParamForRecover(pTask);
|
||||||
|
streamAggRecoverPrepare(pTask);
|
||||||
|
} else if (pTask->taskLevel == TASK_LEVEL__SINK) {
|
||||||
|
atomic_store_8(&pTask->taskStatus, TASK_STATUS__NORMAL);
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
// checkstatus
|
||||||
|
int32_t streamTaskCheckDownstream(SStreamTask* pTask, int64_t version) {
|
||||||
|
SStreamTaskCheckReq req = {
|
||||||
|
.streamId = pTask->streamId,
|
||||||
|
.upstreamTaskId = pTask->taskId,
|
||||||
|
.upstreamNodeId = pTask->nodeId,
|
||||||
|
.childId = pTask->selfChildId,
|
||||||
|
};
|
||||||
|
// serialize
|
||||||
|
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
|
||||||
|
req.reqId = tGenIdPI64();
|
||||||
|
req.downstreamNodeId = pTask->fixedEpDispatcher.nodeId;
|
||||||
|
req.downstreamTaskId = pTask->fixedEpDispatcher.taskId;
|
||||||
|
pTask->checkReqId = req.reqId;
|
||||||
|
|
||||||
|
qDebug("task %d at node %d check downstream task %d at node %d", pTask->taskId, pTask->nodeId, req.downstreamTaskId,
|
||||||
|
req.downstreamNodeId);
|
||||||
|
streamDispatchOneCheckReq(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet);
|
||||||
|
} else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||||
|
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
|
||||||
|
int32_t vgSz = taosArrayGetSize(vgInfo);
|
||||||
|
pTask->recoverTryingDownstream = vgSz;
|
||||||
|
pTask->checkReqIds = taosArrayInit(vgSz, sizeof(int64_t));
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < vgSz; i++) {
|
||||||
|
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
|
||||||
|
req.reqId = tGenIdPI64();
|
||||||
|
taosArrayPush(pTask->checkReqIds, &req.reqId);
|
||||||
|
req.downstreamNodeId = pVgInfo->vgId;
|
||||||
|
req.downstreamTaskId = pVgInfo->taskId;
|
||||||
|
qDebug("task %d at node %d check downstream task %d at node %d (shuffle)", pTask->taskId, pTask->nodeId,
|
||||||
|
req.downstreamTaskId, req.downstreamNodeId);
|
||||||
|
streamDispatchOneCheckReq(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
qDebug("task %d at node %d direct launch recover since no downstream", pTask->taskId, pTask->nodeId);
|
||||||
|
streamTaskLaunchRecover(pTask, version);
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t streamRecheckOneDownstream(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) {
|
||||||
|
SStreamTaskCheckReq req = {
|
||||||
|
.reqId = pRsp->reqId,
|
||||||
|
.streamId = pRsp->streamId,
|
||||||
|
.upstreamTaskId = pRsp->upstreamTaskId,
|
||||||
|
.upstreamNodeId = pRsp->upstreamNodeId,
|
||||||
|
.downstreamTaskId = pRsp->downstreamTaskId,
|
||||||
|
.downstreamNodeId = pRsp->downstreamNodeId,
|
||||||
|
.childId = pRsp->childId,
|
||||||
|
};
|
||||||
|
qDebug("task %d at node %d check downstream task %d at node %d (recheck)", pTask->taskId, pTask->nodeId,
|
||||||
|
req.downstreamTaskId, req.downstreamNodeId);
|
||||||
|
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
|
||||||
|
streamDispatchOneCheckReq(pTask, &req, pRsp->downstreamNodeId, &pTask->fixedEpDispatcher.epSet);
|
||||||
|
} else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||||
|
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
|
||||||
|
int32_t vgSz = taosArrayGetSize(vgInfo);
|
||||||
|
for (int32_t i = 0; i < vgSz; i++) {
|
||||||
|
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
|
||||||
|
if (pVgInfo->taskId == req.downstreamTaskId) {
|
||||||
|
streamDispatchOneCheckReq(pTask, &req, pRsp->downstreamNodeId, &pVgInfo->epSet);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t streamProcessTaskCheckReq(SStreamTask* pTask, const SStreamTaskCheckReq* pReq) {
|
||||||
|
return atomic_load_8(&pTask->taskStatus) == TASK_STATUS__NORMAL;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp, int64_t version) {
|
||||||
|
qDebug("task %d at node %d recv check rsp from task %d at node %d: status %d", pRsp->upstreamTaskId,
|
||||||
|
pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status);
|
||||||
|
if (pRsp->status == 1) {
|
||||||
|
if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||||
|
bool found = false;
|
||||||
|
for (int32_t i = 0; i < taosArrayGetSize(pTask->checkReqIds); i++) {
|
||||||
|
int64_t reqId = *(int64_t*)taosArrayGet(pTask->checkReqIds, i);
|
||||||
|
if (reqId == pRsp->reqId) {
|
||||||
|
found = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!found) return -1;
|
||||||
|
int32_t left = atomic_sub_fetch_32(&pTask->recoverTryingDownstream, 1);
|
||||||
|
ASSERT(left >= 0);
|
||||||
|
if (left == 0) {
|
||||||
|
taosArrayDestroy(pTask->checkReqIds);
|
||||||
|
streamTaskLaunchRecover(pTask, version);
|
||||||
|
}
|
||||||
|
} else if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
|
||||||
|
if (pRsp->reqId != pTask->checkReqId) return -1;
|
||||||
|
streamTaskLaunchRecover(pTask, version);
|
||||||
|
} else {
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
streamRecheckOneDownstream(pTask, pRsp);
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
// common
|
// common
|
||||||
int32_t streamSetParamForRecover(SStreamTask* pTask) {
|
int32_t streamSetParamForRecover(SStreamTask* pTask) {
|
||||||
void* exec = pTask->exec.executor;
|
void* exec = pTask->exec.executor;
|
||||||
|
@ -86,10 +228,7 @@ int32_t streamDispatchRecoverFinishReq(SStreamTask* pTask) {
|
||||||
// agg
|
// agg
|
||||||
int32_t streamAggRecoverPrepare(SStreamTask* pTask) {
|
int32_t streamAggRecoverPrepare(SStreamTask* pTask) {
|
||||||
void* exec = pTask->exec.executor;
|
void* exec = pTask->exec.executor;
|
||||||
/*if (qStreamSetParamForRecover(exec) < 0) {*/
|
pTask->recoverWaitingUpstream = taosArrayGetSize(pTask->childEpInfo);
|
||||||
/*return -1;*/
|
|
||||||
/*}*/
|
|
||||||
pTask->recoverWaitingChild = taosArrayGetSize(pTask->childEpInfo);
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -107,7 +246,7 @@ int32_t streamAggChildrenRecoverFinish(SStreamTask* pTask) {
|
||||||
|
|
||||||
int32_t streamProcessRecoverFinishReq(SStreamTask* pTask, int32_t childId) {
|
int32_t streamProcessRecoverFinishReq(SStreamTask* pTask, int32_t childId) {
|
||||||
if (pTask->taskLevel == TASK_LEVEL__AGG) {
|
if (pTask->taskLevel == TASK_LEVEL__AGG) {
|
||||||
int32_t left = atomic_sub_fetch_32(&pTask->recoverWaitingChild, 1);
|
int32_t left = atomic_sub_fetch_32(&pTask->recoverWaitingUpstream, 1);
|
||||||
ASSERT(left >= 0);
|
ASSERT(left >= 0);
|
||||||
if (left == 0) {
|
if (left == 0) {
|
||||||
streamAggChildrenRecoverFinish(pTask);
|
streamAggChildrenRecoverFinish(pTask);
|
||||||
|
@ -116,6 +255,60 @@ int32_t streamProcessRecoverFinishReq(SStreamTask* pTask, int32_t childId) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t tEncodeSStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq) {
|
||||||
|
if (tStartEncode(pEncoder) < 0) return -1;
|
||||||
|
if (tEncodeI64(pEncoder, pReq->reqId) < 0) return -1;
|
||||||
|
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, pReq->downstreamNodeId) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, pReq->downstreamTaskId) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, pReq->childId) < 0) return -1;
|
||||||
|
tEndEncode(pEncoder);
|
||||||
|
return pEncoder->pos;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tDecodeSStreamTaskCheckReq(SDecoder* pDecoder, SStreamTaskCheckReq* pReq) {
|
||||||
|
if (tStartDecode(pDecoder) < 0) return -1;
|
||||||
|
if (tDecodeI64(pDecoder, &pReq->reqId) < 0) return -1;
|
||||||
|
if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &pReq->upstreamTaskId) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &pReq->downstreamNodeId) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &pReq->downstreamTaskId) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &pReq->childId) < 0) return -1;
|
||||||
|
tEndDecode(pDecoder);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tEncodeSStreamTaskCheckRsp(SEncoder* pEncoder, const SStreamTaskCheckRsp* pRsp) {
|
||||||
|
if (tStartEncode(pEncoder) < 0) return -1;
|
||||||
|
if (tEncodeI64(pEncoder, pRsp->reqId) < 0) return -1;
|
||||||
|
if (tEncodeI64(pEncoder, pRsp->streamId) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, pRsp->upstreamNodeId) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, pRsp->upstreamTaskId) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, pRsp->downstreamNodeId) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, pRsp->downstreamTaskId) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, pRsp->childId) < 0) return -1;
|
||||||
|
if (tEncodeI8(pEncoder, pRsp->status) < 0) return -1;
|
||||||
|
tEndEncode(pEncoder);
|
||||||
|
return pEncoder->pos;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tDecodeSStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp) {
|
||||||
|
if (tStartDecode(pDecoder) < 0) return -1;
|
||||||
|
if (tDecodeI64(pDecoder, &pRsp->reqId) < 0) return -1;
|
||||||
|
if (tDecodeI64(pDecoder, &pRsp->streamId) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &pRsp->upstreamNodeId) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &pRsp->upstreamTaskId) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &pRsp->downstreamNodeId) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &pRsp->downstreamTaskId) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &pRsp->childId) < 0) return -1;
|
||||||
|
if (tDecodeI8(pDecoder, &pRsp->status) < 0) return -1;
|
||||||
|
tEndDecode(pDecoder);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t tEncodeSStreamRecoverFinishReq(SEncoder* pEncoder, const SStreamRecoverFinishReq* pReq) {
|
int32_t tEncodeSStreamRecoverFinishReq(SEncoder* pEncoder, const SStreamRecoverFinishReq* pReq) {
|
||||||
if (tStartEncode(pEncoder) < 0) return -1;
|
if (tStartEncode(pEncoder) < 0) return -1;
|
||||||
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
|
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
|
||||||
|
@ -132,79 +325,6 @@ int32_t tDecodeSStreamRecoverFinishReq(SDecoder* pDecoder, SStreamRecoverFinishR
|
||||||
tEndDecode(pDecoder);
|
tEndDecode(pDecoder);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
#if 0
|
|
||||||
int32_t tEncodeStreamTaskRecoverReq(SEncoder* pEncoder, const SStreamTaskRecoverReq* pReq) {
|
|
||||||
if (tStartEncode(pEncoder) < 0) return -1;
|
|
||||||
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
|
|
||||||
if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1;
|
|
||||||
if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1;
|
|
||||||
if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1;
|
|
||||||
tEndEncode(pEncoder);
|
|
||||||
return pEncoder->pos;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tDecodeStreamTaskRecoverReq(SDecoder* pDecoder, SStreamTaskRecoverReq* pReq) {
|
|
||||||
if (tStartDecode(pDecoder) < 0) return -1;
|
|
||||||
if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
|
|
||||||
if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1;
|
|
||||||
if (tDecodeI32(pDecoder, &pReq->upstreamTaskId) < 0) return -1;
|
|
||||||
if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1;
|
|
||||||
tEndDecode(pDecoder);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tEncodeStreamTaskRecoverRsp(SEncoder* pEncoder, const SStreamTaskRecoverRsp* pRsp) {
|
|
||||||
if (tStartEncode(pEncoder) < 0) return -1;
|
|
||||||
if (tEncodeI64(pEncoder, pRsp->streamId) < 0) return -1;
|
|
||||||
if (tEncodeI32(pEncoder, pRsp->reqTaskId) < 0) return -1;
|
|
||||||
if (tEncodeI32(pEncoder, pRsp->rspTaskId) < 0) return -1;
|
|
||||||
if (tEncodeI8(pEncoder, pRsp->inputStatus) < 0) return -1;
|
|
||||||
tEndEncode(pEncoder);
|
|
||||||
return pEncoder->pos;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tDecodeStreamTaskRecoverRsp(SDecoder* pDecoder, SStreamTaskRecoverRsp* pReq) {
|
|
||||||
if (tStartDecode(pDecoder) < 0) return -1;
|
|
||||||
if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
|
|
||||||
if (tDecodeI32(pDecoder, &pReq->reqTaskId) < 0) return -1;
|
|
||||||
if (tDecodeI32(pDecoder, &pReq->rspTaskId) < 0) return -1;
|
|
||||||
if (tDecodeI8(pDecoder, &pReq->inputStatus) < 0) return -1;
|
|
||||||
tEndDecode(pDecoder);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tEncodeSMStreamTaskRecoverReq(SEncoder* pEncoder, const SMStreamTaskRecoverReq* pReq) {
|
|
||||||
if (tStartEncode(pEncoder) < 0) return -1;
|
|
||||||
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
|
|
||||||
if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1;
|
|
||||||
tEndEncode(pEncoder);
|
|
||||||
return pEncoder->pos;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tDecodeSMStreamTaskRecoverReq(SDecoder* pDecoder, SMStreamTaskRecoverReq* pReq) {
|
|
||||||
if (tStartDecode(pDecoder) < 0) return -1;
|
|
||||||
if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
|
|
||||||
if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1;
|
|
||||||
tEndDecode(pDecoder);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tEncodeSMStreamTaskRecoverRsp(SEncoder* pEncoder, const SMStreamTaskRecoverRsp* pRsp) {
|
|
||||||
if (tStartEncode(pEncoder) < 0) return -1;
|
|
||||||
if (tEncodeI64(pEncoder, pRsp->streamId) < 0) return -1;
|
|
||||||
if (tEncodeI32(pEncoder, pRsp->taskId) < 0) return -1;
|
|
||||||
tEndEncode(pEncoder);
|
|
||||||
return pEncoder->pos;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tDecodeSMStreamTaskRecoverRsp(SDecoder* pDecoder, SMStreamTaskRecoverRsp* pReq) {
|
|
||||||
if (tStartDecode(pDecoder) < 0) return -1;
|
|
||||||
if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
|
|
||||||
if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1;
|
|
||||||
tEndDecode(pDecoder);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
int32_t tEncodeSStreamCheckpointInfo(SEncoder* pEncoder, const SStreamCheckpointInfo* pCheckpoint) {
|
int32_t tEncodeSStreamCheckpointInfo(SEncoder* pEncoder, const SStreamCheckpointInfo* pCheckpoint) {
|
||||||
if (tEncodeI32(pEncoder, pCheckpoint->srcNodeId) < 0) return -1;
|
if (tEncodeI32(pEncoder, pCheckpoint->srcNodeId) < 0) return -1;
|
||||||
|
@ -248,308 +368,3 @@ int32_t tDecodeSStreamMultiVgCheckpointInfo(SDecoder* pDecoder, SStreamMultiVgCh
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
#if 0
|
|
||||||
int32_t tEncodeSStreamTaskRecoverReq(SEncoder* pEncoder, const SStreamRecoverDownstreamReq* pReq) {
|
|
||||||
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
|
|
||||||
if (tEncodeI32(pEncoder, pReq->downstreamTaskId) < 0) return -1;
|
|
||||||
if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1;
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tDecodeSStreamTaskRecoverReq(SDecoder* pDecoder, SStreamRecoverDownstreamReq* pReq) {
|
|
||||||
if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
|
|
||||||
if (tDecodeI32(pDecoder, &pReq->downstreamTaskId) < 0) return -1;
|
|
||||||
if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1;
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tEncodeSStreamTaskRecoverRsp(SEncoder* pEncoder, const SStreamRecoverDownstreamRsp* pRsp) {
|
|
||||||
if (tEncodeI64(pEncoder, pRsp->streamId) < 0) return -1;
|
|
||||||
if (tEncodeI32(pEncoder, pRsp->downstreamTaskId) < 0) return -1;
|
|
||||||
if (tEncodeI32(pEncoder, pRsp->taskId) < 0) return -1;
|
|
||||||
int32_t sz = taosArrayGetSize(pRsp->checkpointVer);
|
|
||||||
if (tEncodeI32(pEncoder, sz) < 0) return -1;
|
|
||||||
for (int32_t i = 0; i < sz; i++) {
|
|
||||||
SStreamCheckpointInfo* pInfo = taosArrayGet(pRsp->checkpointVer, i);
|
|
||||||
if (tEncodeSStreamCheckpointInfo(pEncoder, pInfo) < 0) return -1;
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tDecodeSStreamTaskRecoverRsp(SDecoder* pDecoder, SStreamRecoverDownstreamRsp* pRsp) {
|
|
||||||
if (tDecodeI64(pDecoder, &pRsp->streamId) < 0) return -1;
|
|
||||||
if (tDecodeI32(pDecoder, &pRsp->downstreamTaskId) < 0) return -1;
|
|
||||||
if (tDecodeI32(pDecoder, &pRsp->taskId) < 0) return -1;
|
|
||||||
int32_t sz;
|
|
||||||
if (tDecodeI32(pDecoder, &sz) < 0) return -1;
|
|
||||||
pRsp->checkpointVer = taosArrayInit(sz, sizeof(SStreamCheckpointInfo));
|
|
||||||
if (pRsp->checkpointVer == NULL) return -1;
|
|
||||||
for (int32_t i = 0; i < sz; i++) {
|
|
||||||
SStreamCheckpointInfo info;
|
|
||||||
if (tDecodeSStreamCheckpointInfo(pDecoder, &info) < 0) return -1;
|
|
||||||
taosArrayPush(pRsp->checkpointVer, &info);
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
int32_t streamSaveStateInfo(SStreamMeta* pMeta, SStreamTask* pTask) {
|
|
||||||
#if 0
|
|
||||||
void* buf = NULL;
|
|
||||||
|
|
||||||
ASSERT(pTask->taskLevel == TASK_LEVEL__SINK);
|
|
||||||
|
|
||||||
SStreamMultiVgCheckpointInfo checkpoint;
|
|
||||||
checkpoint.checkpointId = atomic_fetch_add_32(&pTask->nextCheckId, 1);
|
|
||||||
checkpoint.checkTs = taosGetTimestampMs();
|
|
||||||
checkpoint.streamId = pTask->streamId;
|
|
||||||
checkpoint.taskId = pTask->taskId;
|
|
||||||
checkpoint.checkpointVer = pTask->checkpointInfo;
|
|
||||||
|
|
||||||
int32_t len;
|
|
||||||
int32_t code;
|
|
||||||
tEncodeSize(tEncodeSStreamMultiVgCheckpointInfo, &checkpoint, len, code);
|
|
||||||
if (code < 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
buf = taosMemoryCalloc(1, len);
|
|
||||||
if (buf == NULL) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
SEncoder encoder;
|
|
||||||
tEncoderInit(&encoder, buf, len);
|
|
||||||
tEncodeSStreamMultiVgCheckpointInfo(&encoder, &checkpoint);
|
|
||||||
tEncoderClear(&encoder);
|
|
||||||
|
|
||||||
SStreamCheckpointKey key = {
|
|
||||||
.taskId = pTask->taskId,
|
|
||||||
.checkpointId = checkpoint.checkpointId,
|
|
||||||
};
|
|
||||||
|
|
||||||
if (tdbTbUpsert(pMeta->pStateDb, &key, sizeof(SStreamCheckpointKey), buf, len, &pMeta->txn) < 0) {
|
|
||||||
ASSERT(0);
|
|
||||||
goto FAIL;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t sz = taosArrayGetSize(pTask->checkpointInfo);
|
|
||||||
for (int32_t i = 0; i < sz; i++) {
|
|
||||||
SStreamCheckpointInfo* pCheck = taosArrayGet(pTask->checkpointInfo, i);
|
|
||||||
pCheck->stateSaveVer = pCheck->stateProcessedVer;
|
|
||||||
}
|
|
||||||
|
|
||||||
taosMemoryFree(buf);
|
|
||||||
return 0;
|
|
||||||
FAIL:
|
|
||||||
if (buf) taosMemoryFree(buf);
|
|
||||||
return -1;
|
|
||||||
#endif
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t streamLoadStateInfo(SStreamMeta* pMeta, SStreamTask* pTask) {
|
|
||||||
#if 0
|
|
||||||
void* pVal = NULL;
|
|
||||||
int32_t vLen = 0;
|
|
||||||
if (tdbTbGet(pMeta->pStateDb, &pTask->taskId, sizeof(void*), &pVal, &vLen) < 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
SDecoder decoder;
|
|
||||||
tDecoderInit(&decoder, pVal, vLen);
|
|
||||||
SStreamMultiVgCheckpointInfo aggCheckpoint;
|
|
||||||
tDecodeSStreamMultiVgCheckpointInfo(&decoder, &aggCheckpoint);
|
|
||||||
tDecoderClear(&decoder);
|
|
||||||
|
|
||||||
pTask->nextCheckId = aggCheckpoint.checkpointId + 1;
|
|
||||||
pTask->checkpointInfo = aggCheckpoint.checkpointVer;
|
|
||||||
#endif
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t streamSaveSinkLevel(SStreamMeta* pMeta, SStreamTask* pTask) {
|
|
||||||
ASSERT(pTask->taskLevel == TASK_LEVEL__SINK);
|
|
||||||
return streamSaveStateInfo(pMeta, pTask);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t streamRecoverSinkLevel(SStreamMeta* pMeta, SStreamTask* pTask) {
|
|
||||||
ASSERT(pTask->taskLevel == TASK_LEVEL__SINK);
|
|
||||||
return streamLoadStateInfo(pMeta, pTask);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t streamSaveAggLevel(SStreamMeta* pMeta, SStreamTask* pTask) {
|
|
||||||
ASSERT(pTask->taskLevel == TASK_LEVEL__AGG);
|
|
||||||
// TODO save and copy state
|
|
||||||
|
|
||||||
// save state info
|
|
||||||
if (streamSaveStateInfo(pMeta, pTask) < 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
#if 0
|
|
||||||
int32_t streamFetchRecoverStatus(SStreamTask* pTask, const SVgroupInfo* pVgInfo) {
|
|
||||||
int32_t taskId = pVgInfo->taskId;
|
|
||||||
int32_t nodeId = pVgInfo->vgId;
|
|
||||||
SStreamRecoverDownstreamReq req = {
|
|
||||||
.streamId = pTask->taskId,
|
|
||||||
.downstreamTaskId = taskId,
|
|
||||||
.taskId = pTask->taskId,
|
|
||||||
};
|
|
||||||
int32_t tlen;
|
|
||||||
int32_t code;
|
|
||||||
tEncodeSize(tEncodeSStreamTaskRecoverReq, &req, tlen, code);
|
|
||||||
if (code < 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + tlen);
|
|
||||||
if (buf == NULL) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
|
||||||
SEncoder encoder;
|
|
||||||
tEncoderInit(&encoder, abuf, tlen);
|
|
||||||
if (tEncodeSStreamTaskRecoverReq(&encoder, &req) < 0) {
|
|
||||||
tEncoderClear(&encoder);
|
|
||||||
taosMemoryFree(buf);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
tEncoderClear(&encoder);
|
|
||||||
|
|
||||||
((SMsgHead*)buf)->vgId = htonl(nodeId);
|
|
||||||
SRpcMsg msg = {
|
|
||||||
.pCont = buf, .contLen = sizeof(SMsgHead) + tlen,
|
|
||||||
/*.msgType = */
|
|
||||||
};
|
|
||||||
tmsgSendReq(&pVgInfo->epSet, &msg);
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t streamFetchDownstreamStatus(SStreamMeta* pMeta, SStreamTask* pTask) {
|
|
||||||
// set self status to recover_phase1
|
|
||||||
SStreamRecoverStatus* pRecover;
|
|
||||||
atomic_store_8(&pTask->taskStatus, TASK_STATUS__RECOVER_DOWNSTREAM);
|
|
||||||
pRecover = taosHashGet(pMeta->pRecoverStatus, &pTask->taskId, sizeof(int32_t));
|
|
||||||
if (pRecover == NULL) {
|
|
||||||
pRecover = taosMemoryCalloc(1, sizeof(SStreamRecoverStatus));
|
|
||||||
if (pRecover == NULL) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
pRecover->info = taosArrayInit(0, sizeof(void*));
|
|
||||||
if (pRecover->info == NULL) {
|
|
||||||
taosMemoryFree(pRecover);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
taosHashPut(pMeta->pRecoverStatus, &pTask->taskId, sizeof(int32_t), &pRecover, sizeof(void*));
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
|
|
||||||
pRecover->totReq = 1;
|
|
||||||
} else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
|
||||||
int32_t numOfDownstream = taosArrayGetSize(pTask->shuffleDispatcher.dbInfo.pVgroupInfos);
|
|
||||||
pRecover->totReq = numOfDownstream;
|
|
||||||
for (int32_t i = 0; i < numOfDownstream; i++) {
|
|
||||||
SVgroupInfo* pVgInfo = taosArrayGet(pTask->shuffleDispatcher.dbInfo.pVgroupInfos, i);
|
|
||||||
streamFetchRecoverStatus(pTask, pVgInfo);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
ASSERT(0);
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
#if 0
|
|
||||||
int32_t streamProcessFetchStatusRsp(SStreamMeta* pMeta, SStreamTask* pTask, SStreamRecoverDownstreamRsp* pRsp) {
|
|
||||||
// if failed, set timer and retry
|
|
||||||
// if successful
|
|
||||||
int32_t taskId = pTask->taskId;
|
|
||||||
SStreamRecoverStatus* pRecover = taosHashGet(pMeta->pRecoverStatus, &taskId, sizeof(int32_t));
|
|
||||||
if (pRecover == NULL) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
taosArrayPush(pRecover->info, &pRsp->checkpointVer);
|
|
||||||
|
|
||||||
int32_t leftRsp = atomic_sub_fetch_32(&pRecover->waitingRspCnt, 1);
|
|
||||||
ASSERT(leftRsp >= 0);
|
|
||||||
|
|
||||||
if (leftRsp == 0) {
|
|
||||||
ASSERT(taosArrayGetSize(pRecover->info) == pRecover->totReq);
|
|
||||||
|
|
||||||
// srcNodeId -> SStreamCheckpointInfo*
|
|
||||||
SHashObj* pFinalChecks = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
|
|
||||||
if (pFinalChecks == NULL) return -1;
|
|
||||||
|
|
||||||
for (int32_t i = 0; i < pRecover->totReq; i++) {
|
|
||||||
SArray* pChecks = taosArrayGetP(pRecover->info, i);
|
|
||||||
int32_t sz = taosArrayGetSize(pChecks);
|
|
||||||
for (int32_t j = 0; j < sz; j++) {
|
|
||||||
SStreamCheckpointInfo* pOneCheck = taosArrayGet(pChecks, j);
|
|
||||||
SStreamCheckpointInfo* pCheck = taosHashGet(pFinalChecks, &pOneCheck->srcNodeId, sizeof(int32_t));
|
|
||||||
if (pCheck == NULL) {
|
|
||||||
pCheck = taosMemoryCalloc(1, sizeof(SStreamCheckpointInfo));
|
|
||||||
pCheck->srcNodeId = pOneCheck->srcNodeId;
|
|
||||||
pCheck->srcChildId = pOneCheck->srcChildId;
|
|
||||||
pCheck->stateProcessedVer = pOneCheck->stateProcessedVer;
|
|
||||||
taosHashPut(pFinalChecks, &pCheck->srcNodeId, sizeof(int32_t), &pCheck, sizeof(void*));
|
|
||||||
} else {
|
|
||||||
pCheck->stateProcessedVer = TMIN(pCheck->stateProcessedVer, pOneCheck->stateProcessedVer);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// load local state
|
|
||||||
//
|
|
||||||
// recover
|
|
||||||
//
|
|
||||||
if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
|
|
||||||
qStreamPrepareRecover(pTask->exec.executor, pTask->startVer, pTask->recoverSnapVer);
|
|
||||||
if (streamPipelineExec(pTask, 10000, true) < 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
taosHashCleanup(pFinalChecks);
|
|
||||||
taosHashRemove(pMeta->pRecoverStatus, &taskId, sizeof(int32_t));
|
|
||||||
atomic_store_8(&pTask->taskStatus, TASK_STATUS__NORMAL);
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t streamRecoverAggLevel(SStreamMeta* pMeta, SStreamTask* pTask) {
|
|
||||||
ASSERT(pTask->taskLevel == TASK_LEVEL__AGG);
|
|
||||||
// recover sink level
|
|
||||||
// after all sink level recovered
|
|
||||||
// choose suitable state to recover
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t streamSaveSourceLevel(SStreamMeta* pMeta, SStreamTask* pTask) {
|
|
||||||
ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE);
|
|
||||||
// TODO: save and copy state
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t streamRecoverSourceLevel(SStreamMeta* pMeta, SStreamTask* pTask) {
|
|
||||||
ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE);
|
|
||||||
// if totLevel == 3
|
|
||||||
// fetch agg state
|
|
||||||
// recover from local state to agg state, not send msg
|
|
||||||
// recover from agg state to most recent log v1
|
|
||||||
// enable input queue, set status recover_phase2
|
|
||||||
// recover from v1 to queue msg v2, set status normal
|
|
||||||
|
|
||||||
// if totLevel == 2
|
|
||||||
// fetch sink state
|
|
||||||
// recover from local state to sink state v1, send msg
|
|
||||||
// enable input queue, set status recover_phase2
|
|
||||||
// recover from v1 to queue msg v2, set status normal
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t streamRecoverTask(SStreamTask* pTask) {
|
|
||||||
//
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
|
@ -633,6 +633,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_INVALID_MSG, "Invalid message")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_CONSUMER_MISMATCH, "Consumer mismatch")
|
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_CONSUMER_MISMATCH, "Consumer mismatch")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_CONSUMER_CLOSED, "Consumer closed")
|
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_CONSUMER_CLOSED, "Consumer closed")
|
||||||
|
|
||||||
|
// stream
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_NOT_EXIST, "Stream task not exist")
|
||||||
|
|
||||||
// TDLite
|
// TDLite
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_TDLITE_IVLD_OPEN_FLAGS, "Invalid TDLite open flags")
|
TAOS_DEFINE_ERROR(TSDB_CODE_TDLITE_IVLD_OPEN_FLAGS, "Invalid TDLite open flags")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_TDLITE_IVLD_OPEN_DIR, "Invalid TDLite open directory")
|
TAOS_DEFINE_ERROR(TSDB_CODE_TDLITE_IVLD_OPEN_DIR, "Invalid TDLite open directory")
|
||||||
|
|
|
@ -235,6 +235,8 @@
|
||||||
./test.sh -f tsim/stream/basic2.sim
|
./test.sh -f tsim/stream/basic2.sim
|
||||||
./test.sh -f tsim/stream/drop_stream.sim
|
./test.sh -f tsim/stream/drop_stream.sim
|
||||||
./test.sh -f tsim/stream/fillHistoryBasic1.sim
|
./test.sh -f tsim/stream/fillHistoryBasic1.sim
|
||||||
|
./test.sh -f tsim/stream/fillHistoryBasic2.sim
|
||||||
|
./test.sh -f tsim/stream/fillHistoryBasic3.sim
|
||||||
./test.sh -f tsim/stream/distributeInterval0.sim
|
./test.sh -f tsim/stream/distributeInterval0.sim
|
||||||
./test.sh -f tsim/stream/distributeIntervalRetrive0.sim
|
./test.sh -f tsim/stream/distributeIntervalRetrive0.sim
|
||||||
./test.sh -f tsim/stream/distributeSession0.sim
|
./test.sh -f tsim/stream/distributeSession0.sim
|
||||||
|
|
|
@ -26,7 +26,7 @@ sql insert into t1 values(1648791243003,4,2,3,3.1);
|
||||||
sql insert into t1 values(1648791213004,4,2,3,4.1);
|
sql insert into t1 values(1648791213004,4,2,3,4.1);
|
||||||
|
|
||||||
|
|
||||||
sleep 1000
|
sleep 5000
|
||||||
sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt;
|
sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt;
|
||||||
|
|
||||||
if $rows != 4 then
|
if $rows != 4 then
|
||||||
|
@ -466,9 +466,6 @@ if $data25 != 3 then
|
||||||
endi
|
endi
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
sql create database test2 vgroups 1;
|
sql create database test2 vgroups 1;
|
||||||
sql select * from information_schema.ins_databases;
|
sql select * from information_schema.ins_databases;
|
||||||
|
|
||||||
|
@ -484,7 +481,7 @@ sql insert into t1 values(1648791213004,4,2,3,4.1);
|
||||||
|
|
||||||
sql create stream stream2 trigger at_once fill_history 1 into streamt as select _wstart, count(*) c1, count(d) c2 , sum(a) c3 , max(b) c4, min(c) c5 from t1 interval(10s);
|
sql create stream stream2 trigger at_once fill_history 1 into streamt as select _wstart, count(*) c1, count(d) c2 , sum(a) c3 , max(b) c4, min(c) c5 from t1 interval(10s);
|
||||||
|
|
||||||
sleep 1000
|
sleep 5000
|
||||||
sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt;
|
sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt;
|
||||||
|
|
||||||
if $rows != 4 then
|
if $rows != 4 then
|
||||||
|
|
|
@ -0,0 +1,277 @@
|
||||||
|
system sh/stop_dnodes.sh
|
||||||
|
system sh/deploy.sh -n dnode1 -i 1
|
||||||
|
system sh/deploy.sh -n dnode2 -i 2
|
||||||
|
|
||||||
|
system sh/exec.sh -n dnode1 -s start
|
||||||
|
sleep 50
|
||||||
|
sql connect
|
||||||
|
|
||||||
|
sql create dnode $hostname2 port 7200
|
||||||
|
|
||||||
|
system sh/exec.sh -n dnode2 -s start
|
||||||
|
|
||||||
|
print ===== step1
|
||||||
|
$x = 0
|
||||||
|
step1:
|
||||||
|
$x = $x + 1
|
||||||
|
sleep 1000
|
||||||
|
if $x == 10 then
|
||||||
|
print ====> dnode not ready!
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
sql select * from information_schema.ins_dnodes
|
||||||
|
print ===> $data00 $data01 $data02 $data03 $data04 $data05
|
||||||
|
print ===> $data10 $data11 $data12 $data13 $data14 $data15
|
||||||
|
if $rows != 2 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
if $data(1)[4] != ready then
|
||||||
|
goto step1
|
||||||
|
endi
|
||||||
|
if $data(2)[4] != ready then
|
||||||
|
goto step1
|
||||||
|
endi
|
||||||
|
|
||||||
|
print ===== step2
|
||||||
|
sql drop stream if exists stream_t1;
|
||||||
|
sql drop database if exists test;
|
||||||
|
sql create database test vgroups 4;
|
||||||
|
sql use test;
|
||||||
|
sql create stable st(ts timestamp, a int, b int , c int, d double) tags(ta int,tb int,tc int);
|
||||||
|
sql create table ts1 using st tags(1,1,1);
|
||||||
|
sql create table ts2 using st tags(2,2,2);
|
||||||
|
sql create table ts3 using st tags(3,2,2);
|
||||||
|
sql create table ts4 using st tags(4,2,2);
|
||||||
|
|
||||||
|
sql insert into ts1 values(1648791213001,1,12,3,1.0);
|
||||||
|
sql insert into ts2 values(1648791213001,1,12,3,1.0);
|
||||||
|
|
||||||
|
sql insert into ts3 values(1648791213001,1,12,3,1.0);
|
||||||
|
sql insert into ts4 values(1648791213001,1,12,3,1.0);
|
||||||
|
|
||||||
|
sql insert into ts1 values(1648791213002,NULL,NULL,NULL,NULL);
|
||||||
|
sql insert into ts2 values(1648791213002,NULL,NULL,NULL,NULL);
|
||||||
|
|
||||||
|
sql insert into ts3 values(1648791213002,NULL,NULL,NULL,NULL);
|
||||||
|
sql insert into ts4 values(1648791213002,NULL,NULL,NULL,NULL);
|
||||||
|
|
||||||
|
sql insert into ts1 values(1648791223002,2,2,3,1.1);
|
||||||
|
sql insert into ts1 values(1648791233003,3,2,3,2.1);
|
||||||
|
sql insert into ts2 values(1648791243004,4,2,43,73.1);
|
||||||
|
sql insert into ts1 values(1648791213002,24,22,23,4.1);
|
||||||
|
sql insert into ts1 values(1648791243005,4,20,3,3.1);
|
||||||
|
sql insert into ts2 values(1648791243006,4,2,3,3.1) (1648791243007,4,2,3,3.1) ;
|
||||||
|
sql insert into ts1 values(1648791243008,4,2,30,3.1) (1648791243009,4,2,3,3.1) (1648791243010,4,2,3,3.1) ;
|
||||||
|
sql insert into ts2 values(1648791243011,4,2,3,3.1) (1648791243012,34,32,33,3.1) (1648791243013,4,2,3,3.1) (1648791243014,4,2,13,3.1);
|
||||||
|
sql insert into ts1 values(1648791243005,4,42,3,3.1) (1648791243003,4,2,33,3.1) (1648791243006,4,2,3,3.1) (1648791213001,1,52,13,1.0) (1648791223001,22,22,83,1.1) ;
|
||||||
|
sql insert into ts2 values(1648791243005,4,42,3,3.1) (1648791243003,4,2,33,3.1) (1648791243006,4,2,3,3.1) (1648791213001,1,52,13,1.0) (1648791223001,22,22,83,1.1) (1648791233004,13,12,13,2.1) ;
|
||||||
|
sql insert into ts1 values(1648791243006,4,2,3,3.1) (1648791213001,1,52,13,1.0) (1648791223001,22,22,83,1.1) ;
|
||||||
|
|
||||||
|
sql insert into ts3 values(1648791223002,2,2,3,1.1);
|
||||||
|
sql insert into ts4 values(1648791233003,3,2,3,2.1);
|
||||||
|
sql insert into ts3 values(1648791243004,4,2,43,73.1);
|
||||||
|
sql insert into ts4 values(1648791213002,24,22,23,4.1);
|
||||||
|
sql insert into ts3 values(1648791243005,4,20,3,3.1);
|
||||||
|
sql insert into ts4 values(1648791243006,4,2,3,3.1) (1648791243007,4,2,3,3.1) ;
|
||||||
|
sql insert into ts3 values(1648791243008,4,2,30,3.1) (1648791243009,4,2,3,3.1) (1648791243010,4,2,3,3.1) ;
|
||||||
|
sql insert into ts4 values(1648791243011,4,2,3,3.1) (1648791243012,34,32,33,3.1) (1648791243013,4,2,3,3.1) (1648791243014,4,2,13,3.1);
|
||||||
|
sql insert into ts3 values(1648791243005,4,42,3,3.1) (1648791243003,4,2,33,3.1) (1648791243006,4,2,3,3.1) (1648791213001,1,52,13,1.0) (1648791223001,22,22,83,1.1) ;
|
||||||
|
sql insert into ts4 values(1648791243005,4,42,3,3.1) (1648791243003,4,2,33,3.1) (1648791243006,4,2,3,3.1) (1648791213001,1,52,13,1.0) (1648791223001,22,22,83,1.1) (1648791233004,13,12,13,2.1) ;
|
||||||
|
sql insert into ts3 values(1648791243006,4,2,3,3.1) (1648791213001,1,52,13,1.0) (1648791223001,22,22,83,1.1) ;
|
||||||
|
|
||||||
|
sql create stream stream_t1 trigger at_once fill_history 1 watermark 1d into streamtST1 as select _wstart, count(*) c1, count(d) c2 , sum(a) c3 , max(b) c4, min(c) c5 from st interval(10s);
|
||||||
|
|
||||||
|
sleep 1000
|
||||||
|
|
||||||
|
$loop_count = 0
|
||||||
|
loop1:
|
||||||
|
sleep 300
|
||||||
|
sql select * from streamtST1;
|
||||||
|
|
||||||
|
$loop_count = $loop_count + 1
|
||||||
|
if $loop_count == 10 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
# row 0
|
||||||
|
if $data01 != 8 then
|
||||||
|
print =====data01=$data01
|
||||||
|
goto loop1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data02 != 6 then
|
||||||
|
print =====data02=$data02
|
||||||
|
goto loop1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data03 != 52 then
|
||||||
|
print ======data03=$data03
|
||||||
|
goto loop1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data04 != 52 then
|
||||||
|
print ======data04=$data04
|
||||||
|
goto loop1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data05 != 13 then
|
||||||
|
print ======data05=$data05
|
||||||
|
goto loop1
|
||||||
|
endi
|
||||||
|
|
||||||
|
# row 1
|
||||||
|
if $data11 != 6 then
|
||||||
|
print =====data11=$data11
|
||||||
|
goto loop1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data12 != 6 then
|
||||||
|
print =====data12=$data12
|
||||||
|
goto loop1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data13 != 92 then
|
||||||
|
print ======$data13
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data14 != 22 then
|
||||||
|
print ======$data14
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data15 != 3 then
|
||||||
|
print ======$data15
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
# row 2
|
||||||
|
if $data21 != 4 then
|
||||||
|
print =====data21=$data21
|
||||||
|
goto loop1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data22 != 4 then
|
||||||
|
print =====data22=$data22
|
||||||
|
goto loop1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data23 != 32 then
|
||||||
|
print ======$data23
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data24 != 12 then
|
||||||
|
print ======$data24
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data25 != 3 then
|
||||||
|
print ======$data25
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
# row 3
|
||||||
|
if $data31 != 30 then
|
||||||
|
print =====data31=$data31
|
||||||
|
goto loop1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data32 != 30 then
|
||||||
|
print =====data32=$data32
|
||||||
|
goto loop1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data33 != 180 then
|
||||||
|
print ======$data33
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data34 != 42 then
|
||||||
|
print ======$data34
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data35 != 3 then
|
||||||
|
print ======$data35
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
sql select _wstart, count(*) c1, count(d) c2 , sum(a) c3 , max(b) c4, min(c) c5, avg(d) from st interval(10s);
|
||||||
|
|
||||||
|
|
||||||
|
sql create database test1 vgroups 4;
|
||||||
|
sql use test1;
|
||||||
|
sql create stable st(ts timestamp, a int, b int , c int) tags(ta int,tb int,tc int);
|
||||||
|
sql create table ts1 using st tags(1,1,1);
|
||||||
|
sql create table ts2 using st tags(2,2,2);
|
||||||
|
|
||||||
|
sql insert into ts1 values(1648791211000,1,2,3);
|
||||||
|
sql insert into ts1 values(1648791222001,2,2,3);
|
||||||
|
sql insert into ts2 values(1648791211000,1,2,3);
|
||||||
|
sql insert into ts2 values(1648791222001,2,2,3);
|
||||||
|
|
||||||
|
sql create stream stream_t2 trigger at_once fill_history 1 watermark 20s into streamtST1 as select _wstart, count(*) c1, count(a) c2 , sum(a) c3 , max(b) c5, min(c) c6 from st interval(10s) ;
|
||||||
|
|
||||||
|
$loop_count = 0
|
||||||
|
loop2:
|
||||||
|
sql select * from streamtST1;
|
||||||
|
|
||||||
|
sleep 300
|
||||||
|
$loop_count = $loop_count + 1
|
||||||
|
if $loop_count == 10 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
# row 0
|
||||||
|
if $data01 != 2 then
|
||||||
|
print =====data01=$data01
|
||||||
|
goto loop2
|
||||||
|
endi
|
||||||
|
|
||||||
|
#rows 1
|
||||||
|
if $data11 != 2 then
|
||||||
|
print =====data11=$data11
|
||||||
|
goto loop2
|
||||||
|
endi
|
||||||
|
|
||||||
|
#max,min selectivity
|
||||||
|
sql create database test3 vgroups 4;
|
||||||
|
sql use test3;
|
||||||
|
sql create stable st(ts timestamp, a int, b int , c int) tags(ta int,tb int,tc int);
|
||||||
|
sql create table ts1 using st tags(1,1,1);
|
||||||
|
sql create table ts2 using st tags(2,2,2);
|
||||||
|
sql create stream stream_t3 trigger at_once into streamtST3 as select ts, min(a) c6, a, b, c, ta, tb, tc from st interval(10s) ;
|
||||||
|
|
||||||
|
sql insert into ts1 values(1648791211000,1,2,3);
|
||||||
|
sleep 50
|
||||||
|
sql insert into ts1 values(1648791222001,2,2,3);
|
||||||
|
sleep 50
|
||||||
|
sql insert into ts2 values(1648791211000,1,2,3);
|
||||||
|
sleep 50
|
||||||
|
sql insert into ts2 values(1648791222001,2,2,3);
|
||||||
|
sleep 50
|
||||||
|
|
||||||
|
$loop_count = 0
|
||||||
|
loop3:
|
||||||
|
sql select * from streamtST3;
|
||||||
|
|
||||||
|
sleep 300
|
||||||
|
$loop_count = $loop_count + 1
|
||||||
|
if $loop_count == 10 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
# row 0
|
||||||
|
if $data02 != 1 then
|
||||||
|
print =====data02=$data02
|
||||||
|
goto loop3
|
||||||
|
endi
|
||||||
|
|
||||||
|
# row 1
|
||||||
|
if $data12 != 2 then
|
||||||
|
print =====data12=$data12
|
||||||
|
goto loop3
|
||||||
|
endi
|
||||||
|
|
||||||
|
system sh/stop_dnodes.sh
|
|
@ -0,0 +1,203 @@
|
||||||
|
$loop_all = 0
|
||||||
|
looptest:
|
||||||
|
|
||||||
|
system sh/stop_dnodes.sh
|
||||||
|
system sh/deploy.sh -n dnode1 -i 1
|
||||||
|
system sh/exec.sh -n dnode1 -s start
|
||||||
|
sleep 50
|
||||||
|
sql connect
|
||||||
|
|
||||||
|
sql create database test vgroups 1;
|
||||||
|
sql create database test2 vgroups 4;
|
||||||
|
sql use test2;
|
||||||
|
sql create stable st(ts timestamp, a int, b int, c int, d double) tags(ta int,tb int,tc int);
|
||||||
|
sql create table t1 using st tags(1,1,1);
|
||||||
|
sql create table t2 using st tags(2,2,2);
|
||||||
|
|
||||||
|
sql insert into t1 values(1648791213000,NULL,NULL,NULL,NULL);
|
||||||
|
sql insert into t2 values(1648791213000,NULL,NULL,NULL,NULL);
|
||||||
|
|
||||||
|
sql create stream streams2 trigger at_once fill_history 1 into test.streamt2 as select _wstart c1, count(*) c2, max(a) c3 from st partition by a interval(10s);
|
||||||
|
|
||||||
|
sleep 3000
|
||||||
|
|
||||||
|
$loop_count = 0
|
||||||
|
|
||||||
|
loop7:
|
||||||
|
sleep 50
|
||||||
|
sql select * from test.streamt2 order by c1, c2, c3;
|
||||||
|
|
||||||
|
$loop_count = $loop_count + 1
|
||||||
|
if $loop_count == 20 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data01 != 2 then
|
||||||
|
print =====data01=$data01
|
||||||
|
goto loop7
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data02 != NULL then
|
||||||
|
print =====data02=$data02
|
||||||
|
goto loop7
|
||||||
|
endi
|
||||||
|
|
||||||
|
sql insert into t1 values(1648791213000,1,2,3,1.0);
|
||||||
|
sql insert into t2 values(1648791213000,1,2,3,1.0);
|
||||||
|
|
||||||
|
$loop_count = 0
|
||||||
|
|
||||||
|
loop8:
|
||||||
|
sleep 50
|
||||||
|
sql select * from test.streamt2 order by c1, c2, c3;
|
||||||
|
|
||||||
|
$loop_count = $loop_count + 1
|
||||||
|
if $loop_count == 20 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data01 != 2 then
|
||||||
|
print =====data01=$data01
|
||||||
|
goto loop8
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data02 != 1 then
|
||||||
|
print =====data02=$data02
|
||||||
|
goto loop8
|
||||||
|
endi
|
||||||
|
|
||||||
|
sql insert into t1 values(1648791213000,2,2,3,1.0);
|
||||||
|
sql insert into t2 values(1648791213000,2,2,3,1.0);
|
||||||
|
|
||||||
|
$loop_count = 0
|
||||||
|
|
||||||
|
loop9:
|
||||||
|
sleep 50
|
||||||
|
sql select * from test.streamt2 order by c1, c2, c3;
|
||||||
|
|
||||||
|
$loop_count = $loop_count + 1
|
||||||
|
if $loop_count == 20 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data01 != 2 then
|
||||||
|
print =====data01=$data01
|
||||||
|
goto loop9
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data02 != 2 then
|
||||||
|
print =====data02=$data02
|
||||||
|
goto loop9
|
||||||
|
endi
|
||||||
|
|
||||||
|
sql insert into t1 values(1648791213000,2,2,3,1.0);
|
||||||
|
sql insert into t1 values(1648791213001,2,2,3,1.0);
|
||||||
|
sql insert into t1 values(1648791213002,2,2,3,1.0);
|
||||||
|
sql insert into t1 values(1648791213002,1,2,3,1.0);
|
||||||
|
sql insert into t2 values(1648791213000,2,2,3,1.0);
|
||||||
|
sql insert into t2 values(1648791213001,2,2,3,1.0);
|
||||||
|
sql insert into t2 values(1648791213002,2,2,3,1.0);
|
||||||
|
sql insert into t2 values(1648791213002,1,2,3,1.0);
|
||||||
|
|
||||||
|
$loop_count = 0
|
||||||
|
|
||||||
|
loop10:
|
||||||
|
sleep 50
|
||||||
|
sql select * from test.streamt2 order by c1, c2, c3;
|
||||||
|
|
||||||
|
$loop_count = $loop_count + 1
|
||||||
|
if $loop_count == 20 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data01 != 2 then
|
||||||
|
print =====data01=$data01
|
||||||
|
goto loop10
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data02 != 1 then
|
||||||
|
print =====data02=$data02
|
||||||
|
goto loop10
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data11 != 4 thenloop4
|
||||||
|
print =====data11=$data11
|
||||||
|
goto loop10
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data12 != 2 then
|
||||||
|
print =====data12=$data12
|
||||||
|
goto loop10
|
||||||
|
endi
|
||||||
|
|
||||||
|
sql insert into t1 values(1648791223000,1,2,3,1.0);
|
||||||
|
sql insert into t1 values(1648791223001,1,2,3,1.0);
|
||||||
|
sql insert into t1 values(1648791223002,3,2,3,1.0);
|
||||||
|
sql insert into t1 values(1648791223003,3,2,3,1.0);
|
||||||
|
sql insert into t1 values(1648791213001,1,2,3,1.0) (1648791223001,2,2,3,1.0) (1648791223003,1,2,3,1.0);
|
||||||
|
sql insert into t2 values(1648791223000,1,2,3,1.0);
|
||||||
|
sql insert into t2 values(1648791223001,1,2,3,1.0);
|
||||||
|
sql insert into t2 values(1648791223002,3,2,3,1.0);
|
||||||
|
sql insert into t2 values(1648791223003,3,2,3,1.0);
|
||||||
|
sql insert into t2 values(1648791213001,1,2,3,1.0) (1648791223001,2,2,3,1.0) (1648791223003,1,2,3,1.0);
|
||||||
|
|
||||||
|
$loop_count = 0
|
||||||
|
|
||||||
|
loop11:
|
||||||
|
sleep 50
|
||||||
|
sql select * from test.streamt2 order by c1, c2, c3;
|
||||||
|
|
||||||
|
$loop_count = $loop_count + 1
|
||||||
|
if $loop_count == 20 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data01 != 2 then
|
||||||
|
print =====data01=$data01
|
||||||
|
goto loop11
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data02 != 2 then
|
||||||
|
print =====data02=$data02
|
||||||
|
goto loop11
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data11 != 4 then
|
||||||
|
print =====data11=$data11
|
||||||
|
goto loop11
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data12 != 1 then
|
||||||
|
print =====data12=$data12
|
||||||
|
goto loop11
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data21 != 2 then
|
||||||
|
print =====data21=$data21
|
||||||
|
goto loop11
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data22 != 2 then
|
||||||
|
print =====data22=$data22
|
||||||
|
goto loop11
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data31 != 2 then
|
||||||
|
print =====data31=$data31
|
||||||
|
goto loop11
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data32 != 3 then
|
||||||
|
print =====data32=$data32
|
||||||
|
goto loop11
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data41 != 4 then
|
||||||
|
print =====data41=$data41
|
||||||
|
goto loop11
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data42 != 1 then
|
||||||
|
print =====data42=$data42
|
||||||
|
goto loop11
|
||||||
|
endi
|
Loading…
Reference in New Issue