Merge pull request #18554 from taosdata/feature/stream
fix(stream): delete multiple row
This commit is contained in:
commit
4b11f358dd
|
@ -1592,14 +1592,14 @@ typedef struct SSubQueryMsg {
|
|||
int8_t explain;
|
||||
int8_t needFetch;
|
||||
uint32_t sqlLen;
|
||||
char *sql;
|
||||
char* sql;
|
||||
uint32_t msgLen;
|
||||
char *msg;
|
||||
char* msg;
|
||||
} SSubQueryMsg;
|
||||
|
||||
int32_t tSerializeSSubQueryMsg(void *buf, int32_t bufLen, SSubQueryMsg *pReq);
|
||||
int32_t tDeserializeSSubQueryMsg(void *buf, int32_t bufLen, SSubQueryMsg *pReq);
|
||||
void tFreeSSubQueryMsg(SSubQueryMsg *pReq);
|
||||
int32_t tSerializeSSubQueryMsg(void* buf, int32_t bufLen, SSubQueryMsg* pReq);
|
||||
int32_t tDeserializeSSubQueryMsg(void* buf, int32_t bufLen, SSubQueryMsg* pReq);
|
||||
void tFreeSSubQueryMsg(SSubQueryMsg* pReq);
|
||||
|
||||
typedef struct {
|
||||
SMsgHead header;
|
||||
|
@ -1638,9 +1638,8 @@ typedef struct {
|
|||
int32_t execId;
|
||||
} SResFetchReq;
|
||||
|
||||
int32_t tSerializeSResFetchReq(void *buf, int32_t bufLen, SResFetchReq *pReq);
|
||||
int32_t tDeserializeSResFetchReq(void *buf, int32_t bufLen, SResFetchReq *pReq);
|
||||
|
||||
int32_t tSerializeSResFetchReq(void* buf, int32_t bufLen, SResFetchReq* pReq);
|
||||
int32_t tDeserializeSResFetchReq(void* buf, int32_t bufLen, SResFetchReq* pReq);
|
||||
|
||||
typedef struct {
|
||||
SMsgHead header;
|
||||
|
@ -1713,12 +1712,11 @@ typedef struct {
|
|||
int32_t execId;
|
||||
} STaskDropReq;
|
||||
|
||||
int32_t tSerializeSTaskDropReq(void *buf, int32_t bufLen, STaskDropReq *pReq);
|
||||
int32_t tDeserializeSTaskDropReq(void *buf, int32_t bufLen, STaskDropReq *pReq);
|
||||
|
||||
int32_t tSerializeSQueryTableRsp(void *buf, int32_t bufLen, SQueryTableRsp *pRsp);
|
||||
int32_t tDeserializeSQueryTableRsp(void *buf, int32_t bufLen, SQueryTableRsp *pRsp);
|
||||
int32_t tSerializeSTaskDropReq(void* buf, int32_t bufLen, STaskDropReq* pReq);
|
||||
int32_t tDeserializeSTaskDropReq(void* buf, int32_t bufLen, STaskDropReq* pReq);
|
||||
|
||||
int32_t tSerializeSQueryTableRsp(void* buf, int32_t bufLen, SQueryTableRsp* pRsp);
|
||||
int32_t tDeserializeSQueryTableRsp(void* buf, int32_t bufLen, SQueryTableRsp* pRsp);
|
||||
|
||||
typedef struct {
|
||||
int32_t code;
|
||||
|
@ -2923,9 +2921,8 @@ typedef struct {
|
|||
STqOffsetVal reqOffset;
|
||||
} SMqPollReq;
|
||||
|
||||
int32_t tSerializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq);
|
||||
int32_t tDeserializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq);
|
||||
|
||||
int32_t tSerializeSMqPollReq(void* buf, int32_t bufLen, SMqPollReq* pReq);
|
||||
int32_t tDeserializeSMqPollReq(void* buf, int32_t bufLen, SMqPollReq* pReq);
|
||||
|
||||
typedef struct {
|
||||
int32_t vgId;
|
||||
|
@ -3138,7 +3135,8 @@ int32_t tDecodeDeleteRes(SDecoder* pCoder, SDeleteRes* pRes);
|
|||
typedef struct {
|
||||
// int64_t uid;
|
||||
char tbname[TSDB_TABLE_NAME_LEN];
|
||||
int64_t ts;
|
||||
int64_t startTs;
|
||||
int64_t endTs;
|
||||
} SSingleDeleteReq;
|
||||
|
||||
int32_t tEncodeSSingleDeleteReq(SEncoder* pCoder, const SSingleDeleteReq* pReq);
|
||||
|
@ -3160,8 +3158,8 @@ typedef struct {
|
|||
} SBatchMsg;
|
||||
|
||||
typedef struct {
|
||||
SMsgHead header;
|
||||
SArray* pMsgs; //SArray<SBatchMsg>
|
||||
SMsgHead header;
|
||||
SArray* pMsgs; // SArray<SBatchMsg>
|
||||
} SBatchReq;
|
||||
|
||||
typedef struct {
|
||||
|
@ -3173,11 +3171,11 @@ typedef struct {
|
|||
} SBatchRspMsg;
|
||||
|
||||
typedef struct {
|
||||
SArray* pRsps; //SArray<SBatchRspMsg>
|
||||
SArray* pRsps; // SArray<SBatchRspMsg>
|
||||
} SBatchRsp;
|
||||
|
||||
int32_t tSerializeSBatchReq(void *buf, int32_t bufLen, SBatchReq *pReq);
|
||||
int32_t tDeserializeSBatchReq(void *buf, int32_t bufLen, SBatchReq *pReq);
|
||||
int32_t tSerializeSBatchReq(void* buf, int32_t bufLen, SBatchReq* pReq);
|
||||
int32_t tDeserializeSBatchReq(void* buf, int32_t bufLen, SBatchReq* pReq);
|
||||
static FORCE_INLINE void tFreeSBatchReqMsg(void* msg) {
|
||||
if (NULL == msg) {
|
||||
return;
|
||||
|
@ -3186,8 +3184,8 @@ static FORCE_INLINE void tFreeSBatchReqMsg(void* msg) {
|
|||
taosMemoryFree(pMsg->msg);
|
||||
}
|
||||
|
||||
int32_t tSerializeSBatchRsp(void *buf, int32_t bufLen, SBatchRsp *pRsp);
|
||||
int32_t tDeserializeSBatchRsp(void *buf, int32_t bufLen, SBatchRsp *pRsp);
|
||||
int32_t tSerializeSBatchRsp(void* buf, int32_t bufLen, SBatchRsp* pRsp);
|
||||
int32_t tDeserializeSBatchRsp(void* buf, int32_t bufLen, SBatchRsp* pRsp);
|
||||
|
||||
static FORCE_INLINE void tFreeSBatchRspMsg(void* p) {
|
||||
if (NULL == p) {
|
||||
|
@ -3198,11 +3196,10 @@ static FORCE_INLINE void tFreeSBatchRspMsg(void* p) {
|
|||
taosMemoryFree(pRsp->msg);
|
||||
}
|
||||
|
||||
int32_t tSerializeSMqAskEpReq(void *buf, int32_t bufLen, SMqAskEpReq *pReq);
|
||||
int32_t tDeserializeSMqAskEpReq(void *buf, int32_t bufLen, SMqAskEpReq *pReq);
|
||||
int32_t tSerializeSMqHbReq(void *buf, int32_t bufLen, SMqHbReq *pReq);
|
||||
int32_t tDeserializeSMqHbReq(void *buf, int32_t bufLen, SMqHbReq *pReq);
|
||||
|
||||
int32_t tSerializeSMqAskEpReq(void* buf, int32_t bufLen, SMqAskEpReq* pReq);
|
||||
int32_t tDeserializeSMqAskEpReq(void* buf, int32_t bufLen, SMqAskEpReq* pReq);
|
||||
int32_t tSerializeSMqHbReq(void* buf, int32_t bufLen, SMqHbReq* pReq);
|
||||
int32_t tDeserializeSMqHbReq(void* buf, int32_t bufLen, SMqHbReq* pReq);
|
||||
|
||||
#pragma pack(pop)
|
||||
|
||||
|
|
|
@ -280,8 +280,8 @@ enum {
|
|||
|
||||
TD_NEW_MSG_SEG(TDMT_VND_STREAM_MSG)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRIGGER, "vnode-stream-trigger", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_RECOVER_STEP1, "vnode-stream-recover1", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_RECOVER_STEP2, "vnode-stream-recover2", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_RECOVER_NONBLOCKING_STAGE, "vnode-stream-recover1", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_RECOVER_BLOCKING_STAGE, "vnode-stream-recover2", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_MAX_MSG, "vnd-stream-max", NULL, NULL)
|
||||
|
||||
TD_NEW_MSG_SEG(TDMT_VND_TMQ_MSG)
|
||||
|
|
|
@ -4496,7 +4496,7 @@ int32_t tDeserializeSBatchReq(void *buf, int32_t bufLen, SBatchReq *pReq) {
|
|||
if (num <= 0) {
|
||||
pReq->pMsgs = NULL;
|
||||
tEndDecode(&decoder);
|
||||
|
||||
|
||||
tDecoderClear(&decoder);
|
||||
return 0;
|
||||
}
|
||||
|
@ -4511,7 +4511,7 @@ int32_t tDeserializeSBatchReq(void *buf, int32_t bufLen, SBatchReq *pReq) {
|
|||
if (tDecodeBinaryAlloc(&decoder, &msg.msg, NULL) < 0) return -1;
|
||||
if (NULL == taosArrayPush(pReq->pMsgs, &msg)) return -1;
|
||||
}
|
||||
|
||||
|
||||
tEndDecode(&decoder);
|
||||
|
||||
tDecoderClear(&decoder);
|
||||
|
@ -4553,7 +4553,7 @@ int32_t tDeserializeSBatchRsp(void *buf, int32_t bufLen, SBatchRsp *pRsp) {
|
|||
if (num <= 0) {
|
||||
pRsp->pRsps = NULL;
|
||||
tEndDecode(&decoder);
|
||||
|
||||
|
||||
tDecoderClear(&decoder);
|
||||
return 0;
|
||||
}
|
||||
|
@ -4569,14 +4569,13 @@ int32_t tDeserializeSBatchRsp(void *buf, int32_t bufLen, SBatchRsp *pRsp) {
|
|||
if (tDecodeBinaryAlloc(&decoder, &msg.msg, NULL) < 0) return -1;
|
||||
if (NULL == taosArrayPush(pRsp->pRsps, &msg)) return -1;
|
||||
}
|
||||
|
||||
|
||||
tEndDecode(&decoder);
|
||||
|
||||
tDecoderClear(&decoder);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
int32_t tSerializeSMqAskEpReq(void *buf, int32_t bufLen, SMqAskEpReq *pReq) {
|
||||
SEncoder encoder = {0};
|
||||
tEncoderInit(&encoder, buf, bufLen);
|
||||
|
@ -4603,7 +4602,7 @@ int32_t tDeserializeSMqAskEpReq(void *buf, int32_t bufLen, SMqAskEpReq *pReq) {
|
|||
if (tDecodeI64(&decoder, &pReq->consumerId) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->epoch) < 0) return -1;
|
||||
if (tDecodeCStrTo(&decoder, pReq->cgroup) < 0) return -1;
|
||||
|
||||
|
||||
tEndDecode(&decoder);
|
||||
|
||||
tDecoderClear(&decoder);
|
||||
|
@ -4634,7 +4633,7 @@ int32_t tDeserializeSMqHbReq(void *buf, int32_t bufLen, SMqHbReq *pReq) {
|
|||
|
||||
if (tDecodeI64(&decoder, &pReq->consumerId) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->epoch) < 0) return -1;
|
||||
|
||||
|
||||
tEndDecode(&decoder);
|
||||
|
||||
tDecoderClear(&decoder);
|
||||
|
@ -4664,7 +4663,7 @@ int32_t tSerializeSSubQueryMsg(void *buf, int32_t bufLen, SSubQueryMsg *pReq) {
|
|||
if (tEncodeU32(&encoder, pReq->sqlLen) < 0) return -1;
|
||||
if (tEncodeCStrWithLen(&encoder, pReq->sql, pReq->sqlLen) < 0) return -1;
|
||||
if (tEncodeU32(&encoder, pReq->msgLen) < 0) return -1;
|
||||
if (tEncodeBinary(&encoder, (uint8_t*)pReq->msg, pReq->msgLen) < 0) return -1;
|
||||
if (tEncodeBinary(&encoder, (uint8_t *)pReq->msg, pReq->msgLen) < 0) return -1;
|
||||
|
||||
tEndEncode(&encoder);
|
||||
|
||||
|
@ -4704,8 +4703,8 @@ int32_t tDeserializeSSubQueryMsg(void *buf, int32_t bufLen, SSubQueryMsg *pReq)
|
|||
if (tDecodeU32(&decoder, &pReq->sqlLen) < 0) return -1;
|
||||
if (tDecodeCStrAlloc(&decoder, &pReq->sql) < 0) return -1;
|
||||
if (tDecodeU32(&decoder, &pReq->msgLen) < 0) return -1;
|
||||
if (tDecodeBinaryAlloc(&decoder, (void**)&pReq->msg, NULL) < 0) return -1;
|
||||
|
||||
if (tDecodeBinaryAlloc(&decoder, (void **)&pReq->msg, NULL) < 0) return -1;
|
||||
|
||||
tEndDecode(&decoder);
|
||||
|
||||
tDecoderClear(&decoder);
|
||||
|
@ -4721,7 +4720,6 @@ void tFreeSSubQueryMsg(SSubQueryMsg *pReq) {
|
|||
taosMemoryFreeClear(pReq->msg);
|
||||
}
|
||||
|
||||
|
||||
int32_t tSerializeSResFetchReq(void *buf, int32_t bufLen, SResFetchReq *pReq) {
|
||||
int32_t headLen = sizeof(SMsgHead);
|
||||
if (buf != NULL) {
|
||||
|
@ -4768,14 +4766,13 @@ int32_t tDeserializeSResFetchReq(void *buf, int32_t bufLen, SResFetchReq *pReq)
|
|||
if (tDecodeU64(&decoder, &pReq->queryId) < 0) return -1;
|
||||
if (tDecodeU64(&decoder, &pReq->taskId) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->execId) < 0) return -1;
|
||||
|
||||
|
||||
tEndDecode(&decoder);
|
||||
|
||||
tDecoderClear(&decoder);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
int32_t tSerializeSTqOffsetVal(SEncoder *pEncoder, STqOffsetVal *pOffset) {
|
||||
if (tEncodeI8(pEncoder, pOffset->type) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pOffset->uid) < 0) return -1;
|
||||
|
@ -4846,14 +4843,13 @@ int32_t tDeserializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq) {
|
|||
if (tDecodeI64(&decoder, &pReq->consumerId) < 0) return -1;
|
||||
if (tDecodeI64(&decoder, &pReq->timeout) < 0) return -1;
|
||||
if (tDerializeSTqOffsetVal(&decoder, &pReq->reqOffset) < 0) return -1;
|
||||
|
||||
|
||||
tEndDecode(&decoder);
|
||||
|
||||
tDecoderClear(&decoder);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
int32_t tSerializeSTaskDropReq(void *buf, int32_t bufLen, STaskDropReq *pReq) {
|
||||
int32_t headLen = sizeof(SMsgHead);
|
||||
if (buf != NULL) {
|
||||
|
@ -4902,7 +4898,7 @@ int32_t tDeserializeSTaskDropReq(void *buf, int32_t bufLen, STaskDropReq *pReq)
|
|||
if (tDecodeU64(&decoder, &pReq->taskId) < 0) return -1;
|
||||
if (tDecodeI64(&decoder, &pReq->refId) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->execId) < 0) return -1;
|
||||
|
||||
|
||||
tEndDecode(&decoder);
|
||||
|
||||
tDecoderClear(&decoder);
|
||||
|
@ -4939,14 +4935,13 @@ int32_t tDeserializeSQueryTableRsp(void *buf, int32_t bufLen, SQueryTableRsp *pR
|
|||
if (tDecodeI32(&decoder, &pRsp->sversion) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pRsp->tversion) < 0) return -1;
|
||||
if (tDecodeI64(&decoder, &pRsp->affectedRows) < 0) return -1;
|
||||
|
||||
|
||||
tEndDecode(&decoder);
|
||||
|
||||
tDecoderClear(&decoder);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
int32_t tSerializeSSchedulerHbReq(void *buf, int32_t bufLen, SSchedulerHbReq *pReq) {
|
||||
int32_t headLen = sizeof(SMsgHead);
|
||||
if (buf != NULL) {
|
||||
|
@ -6645,13 +6640,15 @@ void tDeleteSTaosxRsp(STaosxRsp *pRsp) {
|
|||
|
||||
int32_t tEncodeSSingleDeleteReq(SEncoder *pEncoder, const SSingleDeleteReq *pReq) {
|
||||
if (tEncodeCStr(pEncoder, pReq->tbname) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pReq->ts) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pReq->startTs) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pReq->endTs) < 0) return -1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tDecodeSSingleDeleteReq(SDecoder *pDecoder, SSingleDeleteReq *pReq) {
|
||||
if (tDecodeCStrTo(pDecoder, pReq->tbname) < 0) return -1;
|
||||
if (tDecodeI64(pDecoder, &pReq->ts) < 0) return -1;
|
||||
if (tDecodeI64(pDecoder, &pReq->startTs) < 0) return -1;
|
||||
if (tDecodeI64(pDecoder, &pReq->endTs) < 0) return -1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -1127,7 +1127,7 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) {
|
|||
SRpcMsg rpcMsg = {
|
||||
.code = 0,
|
||||
.contLen = len,
|
||||
.msgType = TDMT_VND_STREAM_RECOVER_STEP2,
|
||||
.msgType = TDMT_VND_STREAM_RECOVER_BLOCKING_STAGE,
|
||||
.pCont = serializedReq,
|
||||
};
|
||||
|
||||
|
|
|
@ -21,14 +21,16 @@ int32_t tqBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBl
|
|||
SBatchDeleteReq* deleteReq) {
|
||||
ASSERT(pDataBlock->info.type == STREAM_DELETE_RESULT);
|
||||
int32_t totRow = pDataBlock->info.rows;
|
||||
SColumnInfoData* pTsCol = taosArrayGet(pDataBlock->pDataBlock, START_TS_COLUMN_INDEX);
|
||||
SColumnInfoData* pStartTsCol = taosArrayGet(pDataBlock->pDataBlock, START_TS_COLUMN_INDEX);
|
||||
SColumnInfoData* pEndTsCol = taosArrayGet(pDataBlock->pDataBlock, END_TS_COLUMN_INDEX);
|
||||
SColumnInfoData* pGidCol = taosArrayGet(pDataBlock->pDataBlock, GROUPID_COLUMN_INDEX);
|
||||
SColumnInfoData* pTbNameCol = taosArrayGet(pDataBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
|
||||
|
||||
tqDebug("stream delete msg: row %d", totRow);
|
||||
|
||||
for (int32_t row = 0; row < totRow; row++) {
|
||||
int64_t ts = *(int64_t*)colDataGetData(pTsCol, row);
|
||||
int64_t startTs = *(int64_t*)colDataGetData(pStartTsCol, row);
|
||||
int64_t endTs = *(int64_t*)colDataGetData(pEndTsCol, row);
|
||||
int64_t groupId = *(int64_t*)colDataGetData(pGidCol, row);
|
||||
char* name;
|
||||
void* varTbName = NULL;
|
||||
|
@ -42,8 +44,8 @@ int32_t tqBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBl
|
|||
} else {
|
||||
name = buildCtbNameByGroupId(stbFullName, groupId);
|
||||
}
|
||||
tqDebug("stream delete msg: vgId:%d, groupId :%" PRId64 ", name: %s, ts:%" PRId64, pVnode->config.vgId, groupId,
|
||||
name, ts);
|
||||
tqDebug("stream delete msg: vgId:%d, groupId :%" PRId64 ", name: %s, start ts:%" PRId64 "end ts:%" PRId64,
|
||||
pVnode->config.vgId, groupId, name, startTs, endTs);
|
||||
#if 0
|
||||
SMetaReader mr = {0};
|
||||
metaReaderInit(&mr, pVnode->pMeta, 0);
|
||||
|
@ -59,7 +61,8 @@ int32_t tqBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBl
|
|||
taosMemoryFree(name);
|
||||
#endif
|
||||
SSingleDeleteReq req = {
|
||||
.ts = ts,
|
||||
.startTs = startTs,
|
||||
.endTs = endTs,
|
||||
};
|
||||
strncpy(req.tbname, name, TSDB_TABLE_NAME_LEN);
|
||||
taosMemoryFree(name);
|
||||
|
|
|
@ -263,7 +263,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
|
|||
goto _err;
|
||||
}
|
||||
} break;
|
||||
case TDMT_VND_STREAM_RECOVER_STEP2: {
|
||||
case TDMT_VND_STREAM_RECOVER_BLOCKING_STAGE: {
|
||||
if (tqProcessTaskRecover2Req(pVnode->pTq, version, pMsg->pCont, pMsg->contLen) < 0) {
|
||||
goto _err;
|
||||
}
|
||||
|
@ -402,7 +402,7 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
|
|||
return tqProcessTaskRetrieveReq(pVnode->pTq, pMsg);
|
||||
case TDMT_STREAM_RETRIEVE_RSP:
|
||||
return tqProcessTaskRetrieveRsp(pVnode->pTq, pMsg);
|
||||
case TDMT_VND_STREAM_RECOVER_STEP1:
|
||||
case TDMT_VND_STREAM_RECOVER_NONBLOCKING_STAGE:
|
||||
return tqProcessTaskRecover1Req(pVnode->pTq, pMsg);
|
||||
case TDMT_STREAM_RECOVER_FINISH:
|
||||
return tqProcessTaskRecoverFinishReq(pVnode->pTq, pMsg);
|
||||
|
@ -1184,11 +1184,11 @@ static int32_t vnodeProcessBatchDeleteReq(SVnode *pVnode, int64_t version, void
|
|||
|
||||
int64_t uid = mr.me.uid;
|
||||
|
||||
int32_t code = tsdbDeleteTableData(pVnode->pTsdb, version, deleteReq.suid, uid, pOneReq->ts, pOneReq->ts);
|
||||
int32_t code = tsdbDeleteTableData(pVnode->pTsdb, version, deleteReq.suid, uid, pOneReq->startTs, pOneReq->endTs);
|
||||
if (code < 0) {
|
||||
terrno = code;
|
||||
vError("vgId:%d, delete error since %s, suid:%" PRId64 ", uid:%" PRId64 ", start ts:%" PRId64 ", end ts:%" PRId64,
|
||||
TD_VID(pVnode), terrstr(), deleteReq.suid, uid, pOneReq->ts, pOneReq->ts);
|
||||
TD_VID(pVnode), terrstr(), deleteReq.suid, uid, pOneReq->startTs, pOneReq->endTs);
|
||||
}
|
||||
|
||||
tDecoderClear(&mr.coder);
|
||||
|
|
|
@ -1313,8 +1313,8 @@ static void buildDeleteRange(SOperatorInfo* pOp, TSKEY start, TSKEY end, uint64_
|
|||
char parTbName[VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN];
|
||||
STR_WITH_MAXSIZE_TO_VARSTR(parTbName, tbname, sizeof(parTbName));
|
||||
colDataAppend(pTableCol, pBlock->info.rows, (const char*)parTbName, false);
|
||||
tdbFree(tbname);
|
||||
}
|
||||
tdbFree(tbname);
|
||||
|
||||
pBlock->info.rows++;
|
||||
}
|
||||
|
@ -1427,7 +1427,7 @@ static void doDeleteFillResult(SOperatorInfo* pOperator) {
|
|||
streamStateFreeCur(pCur);
|
||||
pCur = streamStateGetAndCheckCur(pOperator->pTaskInfo->streamInfo.pState, &nextKey);
|
||||
}
|
||||
endTs = nextKey.ts - 1;
|
||||
endTs = TMAX(ts, nextKey.ts - 1);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -3117,7 +3117,7 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData
|
|||
if (!winInfo.pOutputBuf) {
|
||||
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
|
||||
code = doOneWindowAggImpl(&pInfo->twAggSup.timeWindowData, &winInfo, &pResult, i, winRows, rows, numOfOutput,
|
||||
pOperator);
|
||||
if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
|
||||
|
@ -3242,8 +3242,8 @@ void doBuildDeleteDataBlock(SOperatorInfo* pOp, SSHashObj* pStDeleted, SSDataBlo
|
|||
char parTbName[VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN];
|
||||
STR_WITH_MAXSIZE_TO_VARSTR(parTbName, tbname, sizeof(parTbName));
|
||||
colDataAppend(pTableCol, pBlock->info.rows, (const char*)parTbName, false);
|
||||
tdbFree(tbname);
|
||||
}
|
||||
tdbFree(tbname);
|
||||
pBlock->info.rows += 1;
|
||||
}
|
||||
if ((*Ite) == NULL) {
|
||||
|
|
|
@ -16,7 +16,8 @@
|
|||
#include "streamInc.h"
|
||||
|
||||
static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* pRes) {
|
||||
void* exec = pTask->exec.executor;
|
||||
int32_t code;
|
||||
void* exec = pTask->exec.executor;
|
||||
|
||||
// set input
|
||||
const SStreamQueueItem* pItem = (const SStreamQueueItem*)data;
|
||||
|
@ -49,8 +50,10 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray*
|
|||
while (1) {
|
||||
SSDataBlock* output = NULL;
|
||||
uint64_t ts = 0;
|
||||
if (qExecTask(exec, &output, &ts) < 0) {
|
||||
ASSERT(false);
|
||||
if ((code = qExecTask(exec, &output, &ts)) < 0) {
|
||||
/*ASSERT(false);*/
|
||||
qError("unexpected stream execution, stream %" PRId64 " task: %d, since %s", pTask->streamId, pTask->taskId,
|
||||
terrstr());
|
||||
}
|
||||
if (output == NULL) {
|
||||
if (pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
|
||||
|
|
|
@ -36,7 +36,7 @@ int32_t streamTaskLaunchRecover(SStreamTask* pTask, int64_t version) {
|
|||
SRpcMsg rpcMsg = {
|
||||
.contLen = len,
|
||||
.pCont = serializedReq,
|
||||
.msgType = TDMT_VND_STREAM_RECOVER_STEP1,
|
||||
.msgType = TDMT_VND_STREAM_RECOVER_NONBLOCKING_STAGE,
|
||||
};
|
||||
|
||||
if (tmsgPutToQueue(pTask->pMsgCb, STREAM_QUEUE, &rpcMsg) < 0) {
|
||||
|
|
|
@ -76,7 +76,8 @@ int32_t tsem_wait(tsem_t* sem) {
|
|||
}
|
||||
|
||||
int32_t tsem_timewait(tsem_t* sem, int64_t milis) {
|
||||
return tsem_wait(sem);
|
||||
return 0;
|
||||
/*return tsem_wait(sem);*/
|
||||
#if 0
|
||||
struct timespec ts;
|
||||
timespec_get(&ts);
|
||||
|
|
Loading…
Reference in New Issue