feat(tmq): offset support snapshot

This commit is contained in:
Liu Jicong 2022-06-29 20:57:15 +08:00
parent 58803fafcf
commit c2955a807b
24 changed files with 810 additions and 458 deletions

View File

@ -138,7 +138,7 @@ int32_t create_topic() {
taos_free_result(pRes); taos_free_result(pRes);
/*pRes = taos_query(pConn, "create topic topic_ctb_column with meta as database abc1");*/ /*pRes = taos_query(pConn, "create topic topic_ctb_column with meta as database abc1");*/
pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1 where t1 = 2000"); pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes)); printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes));
return -1; return -1;
@ -225,7 +225,7 @@ void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
} }
int32_t cnt = 0; int32_t cnt = 0;
while (running) { while (running) {
TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 0); TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, -1);
if (tmqmessage) { if (tmqmessage) {
cnt++; cnt++;
msg_process(tmqmessage); msg_process(tmqmessage);

View File

@ -25,10 +25,11 @@
extern "C" { extern "C" {
#endif #endif
// TODO remove it
enum { enum {
TMQ_CONF__RESET_OFFSET__LATEST = -1,
TMQ_CONF__RESET_OFFSET__EARLIEAST = -2,
TMQ_CONF__RESET_OFFSET__NONE = -3, TMQ_CONF__RESET_OFFSET__NONE = -3,
TMQ_CONF__RESET_OFFSET__EARLIEAST = -2,
TMQ_CONF__RESET_OFFSET__LATEST = -1,
}; };
enum { enum {
@ -39,6 +40,16 @@ enum {
TMQ_MSG_TYPE__END_RSP, TMQ_MSG_TYPE__END_RSP,
}; };
enum {
STREAM_INPUT__DATA_SUBMIT = 1,
STREAM_INPUT__DATA_BLOCK,
STREAM_INPUT__DATA_SCAN,
STREAM_INPUT__DATA_RETRIEVE,
STREAM_INPUT__TRIGGER,
STREAM_INPUT__CHECKPOINT,
STREAM_INPUT__DROP,
};
typedef enum EStreamType { typedef enum EStreamType {
STREAM_NORMAL = 1, STREAM_NORMAL = 1,
STREAM_INVERT, STREAM_INVERT,
@ -48,7 +59,7 @@ typedef enum EStreamType {
STREAM_DELETE, STREAM_DELETE,
STREAM_RETRIEVE, STREAM_RETRIEVE,
STREAM_PUSH_DATA, STREAM_PUSH_DATA,
STREAM_PUSH_EMPTY, STREAM_PUSH_OVER,
} EStreamType; } EStreamType;
typedef struct { typedef struct {

View File

@ -2462,21 +2462,36 @@ int32_t tDecodeSMqCMCommitOffsetReq(SDecoder* decoder, SMqCMCommitOffsetReq* pRe
// tqOffset // tqOffset
enum { enum {
TMQ_OFFSET__SNAPSHOT = 1, TMQ_OFFSET__RESET_NONE = -3,
TMQ_OFFSET__LOG, TMQ_OFFSET__RESET_EARLIEAST = -2,
TMQ_OFFSET__RESET_LATEST = -1,
TMQ_OFFSET__LOG = 1,
TMQ_OFFSET__SNAPSHOT_DATA = 2,
TMQ_OFFSET__SNAPSHOT_META = 3,
}; };
typedef struct { typedef struct {
int8_t type; int8_t type;
union { union {
// snapshot data
struct { struct {
int64_t uid; int64_t uid;
int64_t ts; int64_t ts;
}; };
// log
struct { struct {
int64_t version; int64_t version;
}; };
}; };
} STqOffsetVal;
int32_t tEncodeSTqOffsetVal(SEncoder* pEncoder, const STqOffsetVal* pOffsetVal);
int32_t tDecodeSTqOffsetVal(SDecoder* pDecoder, STqOffsetVal* pOffsetVal);
int32_t tFormatOffset(char* buf, int32_t maxLen, const STqOffsetVal* pVal);
bool tOffsetEqual(const STqOffsetVal* pLeft, const STqOffsetVal* pRight);
typedef struct {
STqOffsetVal val;
char subKey[TSDB_SUBSCRIBE_KEY_LEN]; char subKey[TSDB_SUBSCRIBE_KEY_LEN];
} STqOffset; } STqOffset;
@ -2710,7 +2725,8 @@ typedef struct {
uint64_t reqId; uint64_t reqId;
int64_t consumerId; int64_t consumerId;
int64_t timeout; int64_t timeout;
int64_t currentOffset; // int64_t currentOffset;
STqOffsetVal reqOffset;
} SMqPollReq; } SMqPollReq;
typedef struct { typedef struct {
@ -2782,6 +2798,8 @@ typedef struct {
SMqRspHead head; SMqRspHead head;
int64_t reqOffset; int64_t reqOffset;
int64_t rspOffset; int64_t rspOffset;
STqOffsetVal reqOffsetNew;
STqOffsetVal rspOffsetNew;
int16_t resMsgType; int16_t resMsgType;
int32_t metaRspLen; int32_t metaRspLen;
void* metaRsp; void* metaRsp;
@ -2806,6 +2824,24 @@ static FORCE_INLINE void* tDecodeSMqMetaRsp(const void* buf, SMqMetaRsp* pRsp) {
return (void*)buf; return (void*)buf;
} }
typedef struct {
SMqRspHead head;
STqOffsetVal reqOffset;
STqOffsetVal rspOffset;
int32_t skipLogNum;
int32_t blockNum;
int8_t withTbName;
int8_t withSchema;
SArray* blockDataLen;
SArray* blockData;
SArray* blockTbName;
SArray* blockSchema;
} SMqDataRsp;
int32_t tEncodeSMqDataRsp(SEncoder* pEncoder, const SMqDataRsp* pRsp);
int32_t tDecodeSMqDataRsp(SDecoder* pDecoder, SMqDataRsp* pRsp);
#if 0
typedef struct { typedef struct {
SMqRspHead head; SMqRspHead head;
int64_t reqOffset; int64_t reqOffset;
@ -2814,13 +2850,10 @@ typedef struct {
int32_t blockNum; int32_t blockNum;
int8_t withTbName; int8_t withTbName;
int8_t withSchema; int8_t withSchema;
int8_t withTag;
SArray* blockDataLen; // SArray<int32_t> SArray* blockDataLen; // SArray<int32_t>
SArray* blockData; // SArray<SRetrieveTableRsp*> SArray* blockData; // SArray<SRetrieveTableRsp*>
SArray* blockTbName; // SArray<char*> SArray* blockTbName; // SArray<char*>
SArray* blockSchema; // SArray<SSchemaWrapper> SArray* blockSchema; // SArray<SSchemaWrapper>
SArray* blockTags; // SArray<kvrow>
SArray* blockTagSchema; // SArray<kvrow>
} SMqDataBlkRsp; } SMqDataBlkRsp;
static FORCE_INLINE int32_t tEncodeSMqDataBlkRsp(void** buf, const SMqDataBlkRsp* pRsp) { static FORCE_INLINE int32_t tEncodeSMqDataBlkRsp(void** buf, const SMqDataBlkRsp* pRsp) {
@ -2832,7 +2865,6 @@ static FORCE_INLINE int32_t tEncodeSMqDataBlkRsp(void** buf, const SMqDataBlkRsp
if (pRsp->blockNum != 0) { if (pRsp->blockNum != 0) {
tlen += taosEncodeFixedI8(buf, pRsp->withTbName); tlen += taosEncodeFixedI8(buf, pRsp->withTbName);
tlen += taosEncodeFixedI8(buf, pRsp->withSchema); tlen += taosEncodeFixedI8(buf, pRsp->withSchema);
tlen += taosEncodeFixedI8(buf, pRsp->withTag);
for (int32_t i = 0; i < pRsp->blockNum; i++) { for (int32_t i = 0; i < pRsp->blockNum; i++) {
int32_t bLen = *(int32_t*)taosArrayGet(pRsp->blockDataLen, i); int32_t bLen = *(int32_t*)taosArrayGet(pRsp->blockDataLen, i);
@ -2862,7 +2894,6 @@ static FORCE_INLINE void* tDecodeSMqDataBlkRsp(const void* buf, SMqDataBlkRsp* p
pRsp->blockDataLen = taosArrayInit(pRsp->blockNum, sizeof(int32_t)); pRsp->blockDataLen = taosArrayInit(pRsp->blockNum, sizeof(int32_t));
buf = taosDecodeFixedI8(buf, &pRsp->withTbName); buf = taosDecodeFixedI8(buf, &pRsp->withTbName);
buf = taosDecodeFixedI8(buf, &pRsp->withSchema); buf = taosDecodeFixedI8(buf, &pRsp->withSchema);
buf = taosDecodeFixedI8(buf, &pRsp->withTag);
if (pRsp->withTbName) { if (pRsp->withTbName) {
pRsp->blockTbName = taosArrayInit(pRsp->blockNum, sizeof(void*)); pRsp->blockTbName = taosArrayInit(pRsp->blockNum, sizeof(void*));
} }
@ -2891,6 +2922,7 @@ static FORCE_INLINE void* tDecodeSMqDataBlkRsp(const void* buf, SMqDataBlkRsp* p
} }
return (void*)buf; return (void*)buf;
} }
#endif
typedef struct { typedef struct {
SMqRspHead head; SMqRspHead head;

View File

@ -36,15 +36,8 @@ typedef struct SReadHandle {
void* vnode; void* vnode;
void* mnd; void* mnd;
SMsgCb* pMsgCb; SMsgCb* pMsgCb;
// int8_t initTsdbReader;
} SReadHandle; } SReadHandle;
enum {
STREAM_DATA_TYPE_SUBMIT_BLOCK = 1,
STREAM_DATA_TYPE_SSDATA_BLOCK = 2,
STREAM_DATA_TYPE_FROM_SNAPSHOT = 3,
};
typedef enum { typedef enum {
OPTR_EXEC_MODEL_BATCH = 0x1, OPTR_EXEC_MODEL_BATCH = 0x1,
OPTR_EXEC_MODEL_STREAM = 0x2, OPTR_EXEC_MODEL_STREAM = 0x2,

View File

@ -55,15 +55,6 @@ enum {
TASK_OUTPUT_STATUS__BLOCKED, TASK_OUTPUT_STATUS__BLOCKED,
}; };
enum {
STREAM_INPUT__DATA_SUBMIT = 1,
STREAM_INPUT__DATA_BLOCK,
STREAM_INPUT__DATA_RETRIEVE,
STREAM_INPUT__TRIGGER,
STREAM_INPUT__CHECKPOINT,
STREAM_INPUT__DROP,
};
typedef struct { typedef struct {
int8_t type; int8_t type;
} SStreamQueueItem; } SStreamQueueItem;
@ -152,10 +143,6 @@ typedef struct {
void* executor; void* executor;
} STaskExec; } STaskExec;
typedef struct {
int32_t taskId;
} STaskDispatcherInplace;
typedef struct { typedef struct {
int32_t taskId; int32_t taskId;
int32_t nodeId; int32_t nodeId;
@ -208,7 +195,6 @@ enum {
enum { enum {
TASK_DISPATCH__NONE = 1, TASK_DISPATCH__NONE = 1,
TASK_DISPATCH__INPLACE,
TASK_DISPATCH__FIXED, TASK_DISPATCH__FIXED,
TASK_DISPATCH__SHUFFLE, TASK_DISPATCH__SHUFFLE,
}; };
@ -260,7 +246,7 @@ struct SStreamTask {
// exec // exec
STaskExec exec; STaskExec exec;
// TODO: merge sink and dispatch // TODO: unify sink and dispatch
// local sink // local sink
union { union {
@ -269,9 +255,8 @@ struct SStreamTask {
STaskSinkFetch fetchSink; STaskSinkFetch fetchSink;
}; };
// dispatch // remote dispatcher
union { union {
STaskDispatcherInplace inplaceDispatcher;
STaskDispatcherFixedEp fixedEpDispatcher; STaskDispatcherFixedEp fixedEpDispatcher;
STaskDispatcherShuffle shuffleDispatcher; STaskDispatcherShuffle shuffleDispatcher;
}; };

View File

@ -58,7 +58,6 @@ enum {
#define SHOW_VARIABLES_RESULT_FIELD1_LEN (TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE) #define SHOW_VARIABLES_RESULT_FIELD1_LEN (TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE)
#define SHOW_VARIABLES_RESULT_FIELD2_LEN (TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE) #define SHOW_VARIABLES_RESULT_FIELD2_LEN (TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE)
#define TD_RES_QUERY(res) (*(int8_t*)res == RES_TYPE__QUERY) #define TD_RES_QUERY(res) (*(int8_t*)res == RES_TYPE__QUERY)
#define TD_RES_TMQ(res) (*(int8_t*)res == RES_TYPE__TMQ) #define TD_RES_TMQ(res) (*(int8_t*)res == RES_TYPE__TMQ)
#define TD_RES_TMQ_META(res) (*(int8_t*)res == RES_TYPE__TMQ_META) #define TD_RES_TMQ_META(res) (*(int8_t*)res == RES_TYPE__TMQ_META)
@ -194,7 +193,7 @@ typedef struct {
int32_t vgId; int32_t vgId;
SSchemaWrapper schema; SSchemaWrapper schema;
int32_t resIter; int32_t resIter;
SMqDataBlkRsp rsp; SMqDataRsp rsp;
SReqResultInfo resInfo; SReqResultInfo resInfo;
} SMqRspObj; } SMqRspObj;

View File

@ -179,8 +179,6 @@ void taos_free_result(TAOS_RES *res) {
SMqRspObj *pRsp = (SMqRspObj *)res; SMqRspObj *pRsp = (SMqRspObj *)res;
if (pRsp->rsp.blockData) taosArrayDestroyP(pRsp->rsp.blockData, taosMemoryFree); if (pRsp->rsp.blockData) taosArrayDestroyP(pRsp->rsp.blockData, taosMemoryFree);
if (pRsp->rsp.blockDataLen) taosArrayDestroy(pRsp->rsp.blockDataLen); if (pRsp->rsp.blockDataLen) taosArrayDestroy(pRsp->rsp.blockDataLen);
if (pRsp->rsp.blockTags) taosArrayDestroy(pRsp->rsp.blockTags);
if (pRsp->rsp.blockTagSchema) taosArrayDestroy(pRsp->rsp.blockTagSchema);
if (pRsp->rsp.withTbName) taosArrayDestroyP(pRsp->rsp.blockTbName, taosMemoryFree); if (pRsp->rsp.withTbName) taosArrayDestroyP(pRsp->rsp.blockTbName, taosMemoryFree);
if (pRsp->rsp.withSchema) taosArrayDestroyP(pRsp->rsp.blockSchema, (FDelete)tDeleteSSchemaWrapper); if (pRsp->rsp.withSchema) taosArrayDestroyP(pRsp->rsp.blockSchema, (FDelete)tDeleteSSchemaWrapper);
pRsp->resInfo.pRspMsg = NULL; pRsp->resInfo.pRspMsg = NULL;

View File

@ -126,8 +126,10 @@ typedef struct {
// statistics // statistics
int64_t pollCnt; int64_t pollCnt;
// offset // offset
int64_t committedOffset; /*int64_t committedOffset;*/
int64_t currentOffset; /*int64_t currentOffset;*/
STqOffsetVal committedOffsetNew;
STqOffsetVal currentOffsetNew;
// connection info // connection info
int32_t vgId; int32_t vgId;
int32_t vgStatus; int32_t vgStatus;
@ -152,7 +154,7 @@ typedef struct {
SMqClientVg* vgHandle; SMqClientVg* vgHandle;
SMqClientTopic* topicHandle; SMqClientTopic* topicHandle;
union { union {
SMqDataBlkRsp dataRsp; SMqDataRsp dataRsp;
SMqMetaRsp metaRsp; SMqMetaRsp metaRsp;
}; };
} SMqPollRspWrapper; } SMqPollRspWrapper;
@ -179,6 +181,7 @@ typedef struct {
tsem_t rspSem; tsem_t rspSem;
} SMqPollCbParam; } SMqPollCbParam;
#if 0
typedef struct { typedef struct {
tmq_t* tmq; tmq_t* tmq;
int8_t async; int8_t async;
@ -190,12 +193,13 @@ typedef struct {
SArray* offsets; SArray* offsets;
void* userParam; void* userParam;
} SMqCommitCbParam; } SMqCommitCbParam;
#endif
typedef struct { typedef struct {
tmq_t* tmq; tmq_t* tmq;
int8_t automatic; int8_t automatic;
int8_t async; int8_t async;
int8_t freeOffsets; /*int8_t freeOffsets;*/
int32_t waitingRspNum; int32_t waitingRspNum;
int32_t totalRspNum; int32_t totalRspNum;
int32_t rspErr; int32_t rspErr;
@ -351,6 +355,7 @@ static int32_t tmqMakeTopicVgKey(char* dst, const char* topicName, int32_t vg) {
return sprintf(dst, "%s:%d", topicName, vg); return sprintf(dst, "%s:%d", topicName, vg);
} }
#if 0
int32_t tmqCommitCb(void* param, const SDataBuf* pMsg, int32_t code) { int32_t tmqCommitCb(void* param, const SDataBuf* pMsg, int32_t code) {
SMqCommitCbParam* pParam = (SMqCommitCbParam*)param; SMqCommitCbParam* pParam = (SMqCommitCbParam*)param;
pParam->rspErr = code; pParam->rspErr = code;
@ -371,6 +376,7 @@ int32_t tmqCommitCb(void* param, const SDataBuf* pMsg, int32_t code) {
} }
return 0; return 0;
} }
#endif
int32_t tmqCommitCb2(void* param, const SDataBuf* pBuf, int32_t code) { int32_t tmqCommitCb2(void* param, const SDataBuf* pBuf, int32_t code) {
SMqCommitCbParam2* pParam = (SMqCommitCbParam2*)param; SMqCommitCbParam2* pParam = (SMqCommitCbParam2*)param;
@ -413,13 +419,76 @@ int32_t tmqCommitCb2(void* param, const SDataBuf* pBuf, int32_t code) {
return 0; return 0;
} }
int32_t tmqCommitInner2(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_t async, tmq_commit_cb* userCb, static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pTopic, SMqCommitCbParamSet* pParamSet) {
void* userParam) { STqOffset* pOffset = taosMemoryCalloc(1, sizeof(STqOffset));
int32_t code = -1; if (pOffset == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pOffset->val = pVg->currentOffsetNew;
if (msg != NULL) { int32_t groupLen = strlen(tmq->groupId);
memcpy(pOffset->subKey, tmq->groupId, groupLen);
pOffset->subKey[groupLen] = TMQ_SEPARATOR;
strcpy(pOffset->subKey + groupLen + 1, pTopic->topicName);
int32_t len;
int32_t code;
tEncodeSize(tEncodeSTqOffset, pOffset, len, code);
if (code < 0) {
ASSERT(0);
return -1;
}
void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
if (buf == NULL) return -1;
((SMsgHead*)buf)->vgId = htonl(pVg->vgId);
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
SEncoder encoder;
tEncoderInit(&encoder, abuf, len);
tEncodeSTqOffset(&encoder, pOffset);
// build param
SMqCommitCbParam2* pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam2));
pParam->params = pParamSet;
pParam->pOffset = pOffset;
// build send info
SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (pMsgSendInfo == NULL) {
return -1;
}
pMsgSendInfo->msgInfo = (SDataBuf){
.pData = buf,
.len = sizeof(SMsgHead) + len,
.handle = NULL,
};
tscDebug("consumer %ld commit offset of %s on vg %d, offset is %ld", tmq->consumerId, pOffset->subKey, pVg->vgId,
pOffset->val.version);
// TODO: put into cb
pVg->committedOffsetNew = pVg->currentOffsetNew;
pMsgSendInfo->requestId = generateRequestId();
pMsgSendInfo->requestObjRefId = 0;
pMsgSendInfo->param = pParam;
pMsgSendInfo->fp = tmqCommitCb2;
pMsgSendInfo->msgType = TDMT_VND_MQ_COMMIT_OFFSET;
// send msg
int64_t transporterId = 0;
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, pMsgSendInfo);
pParamSet->waitingRspNum++;
pParamSet->totalRspNum++;
return 0;
}
int32_t tmqCommitMsgImpl(tmq_t* tmq, const TAOS_RES* msg, int8_t async, tmq_commit_cb* userCb, void* userParam) {
char* topic; char* topic;
int32_t vgId; int32_t vgId;
ASSERT(msg != NULL);
if (TD_RES_TMQ(msg)) { if (TD_RES_TMQ(msg)) {
SMqRspObj* pRspObj = (SMqRspObj*)msg; SMqRspObj* pRspObj = (SMqRspObj*)msg;
topic = pRspObj->topic; topic = pRspObj->topic;
@ -438,92 +507,32 @@ int32_t tmqCommitInner2(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_
return -1; return -1;
} }
pParamSet->tmq = tmq; pParamSet->tmq = tmq;
pParamSet->automatic = automatic; pParamSet->automatic = 0;
pParamSet->async = async; pParamSet->async = async;
pParamSet->freeOffsets = 1; /*pParamSet->freeOffsets = 1;*/
pParamSet->userCb = userCb; pParamSet->userCb = userCb;
pParamSet->userParam = userParam; pParamSet->userParam = userParam;
tsem_init(&pParamSet->rspSem, 0, 0); tsem_init(&pParamSet->rspSem, 0, 0);
for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) { for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
if (strcmp(pTopic->topicName, topic) == 0) { if (strcmp(pTopic->topicName, topic) != 0) continue;
for (int32_t j = 0; j < taosArrayGetSize(pTopic->vgs); j++) { for (int32_t j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
if (pVg->vgId == vgId) { if (pVg->vgId != vgId) continue;
if (pVg->currentOffset < 0 || pVg->committedOffset == pVg->currentOffset) {
tscDebug("consumer %ld skip commit for topic %s vg %d, current offset is %ld, committed offset is %ld",
tmq->consumerId, pTopic->topicName, pVg->vgId, pVg->currentOffset, pVg->committedOffset);
return 0; if (pVg->currentOffsetNew.type > 0 && !tOffsetEqual(&pVg->currentOffsetNew, &pVg->committedOffsetNew)) {
} if (tmqSendCommitReq(tmq, pVg, pTopic, pParamSet) < 0) {
goto FAIL;
STqOffset* pOffset = taosMemoryCalloc(1, sizeof(STqOffset));
if (pOffset == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pOffset->type = TMQ_OFFSET__LOG;
pOffset->version = pVg->currentOffset;
int32_t groupLen = strlen(tmq->groupId);
memcpy(pOffset->subKey, tmq->groupId, groupLen);
pOffset->subKey[groupLen] = TMQ_SEPARATOR;
strcpy(pOffset->subKey + groupLen + 1, pTopic->topicName);
int32_t len;
int32_t code;
tEncodeSize(tEncodeSTqOffset, pOffset, len, code);
if (code < 0) {
ASSERT(0);
}
void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
((SMsgHead*)buf)->vgId = htonl(pVg->vgId);
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
SEncoder encoder;
tEncoderInit(&encoder, abuf, len);
tEncodeSTqOffset(&encoder, pOffset);
// build param
SMqCommitCbParam2* pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam2));
pParam->params = pParamSet;
pParam->pOffset = pOffset;
// build send info
SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (pMsgSendInfo == NULL) {
// TODO
continue;
}
pMsgSendInfo->msgInfo = (SDataBuf){
.pData = buf,
.len = sizeof(SMsgHead) + len,
.handle = NULL,
};
tscDebug("consumer %ld commit offset of %s on vg %d, offset is %ld", tmq->consumerId, pOffset->subKey,
pVg->vgId, pOffset->version);
// TODO: put into cb
pVg->committedOffset = pVg->currentOffset;
pMsgSendInfo->requestId = generateRequestId();
pMsgSendInfo->requestObjRefId = 0;
pMsgSendInfo->param = pParam;
pMsgSendInfo->fp = tmqCommitCb2;
pMsgSendInfo->msgType = TDMT_VND_MQ_COMMIT_OFFSET;
// send msg
int64_t transporterId = 0;
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, pMsgSendInfo);
pParamSet->waitingRspNum++;
pParamSet->totalRspNum++;
} }
goto HANDLE_RSP;
} }
} }
} }
int32_t code = -1;
HANDLE_RSP:
if (pParamSet->totalRspNum == 0) { if (pParamSet->totalRspNum == 0) {
tsem_destroy(&pParamSet->rspSem); tsem_destroy(&pParamSet->rspSem);
taosMemoryFree(pParamSet); taosMemoryFree(pParamSet);
@ -534,20 +543,26 @@ int32_t tmqCommitInner2(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_
tsem_wait(&pParamSet->rspSem); tsem_wait(&pParamSet->rspSem);
code = pParamSet->rspErr; code = pParamSet->rspErr;
tsem_destroy(&pParamSet->rspSem); tsem_destroy(&pParamSet->rspSem);
return code;
} else { } else {
code = 0; code = 0;
} }
FAIL:
if (code != 0 && async) { if (code != 0 && async) {
if (automatic) {
tmq->commitCb(tmq, code, tmq->commitCbUserParam);
} else {
userCb(tmq, code, userParam); userCb(tmq, code, userParam);
} }
}
return 0; return 0;
} }
int32_t tmqCommitInner2(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_t async, tmq_commit_cb* userCb,
void* userParam) {
int32_t code = -1;
if (msg != NULL) {
return tmqCommitMsgImpl(tmq, msg, async, userCb, userParam);
}
SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet)); SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
if (pParamSet == NULL) { if (pParamSet == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
@ -556,7 +571,7 @@ int32_t tmqCommitInner2(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_
pParamSet->tmq = tmq; pParamSet->tmq = tmq;
pParamSet->automatic = automatic; pParamSet->automatic = automatic;
pParamSet->async = async; pParamSet->async = async;
pParamSet->freeOffsets = 1; /*pParamSet->freeOffsets = 1;*/
pParamSet->userCb = userCb; pParamSet->userCb = userCb;
pParamSet->userParam = userParam; pParamSet->userParam = userParam;
tsem_init(&pParamSet->rspSem, 0, 0); tsem_init(&pParamSet->rspSem, 0, 0);
@ -572,75 +587,11 @@ int32_t tmqCommitInner2(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_
tscDebug("consumer %ld begin commit for topic %s, vgId %d", tmq->consumerId, pTopic->topicName, pVg->vgId); tscDebug("consumer %ld begin commit for topic %s, vgId %d", tmq->consumerId, pTopic->topicName, pVg->vgId);
/*if (pVg->currentOffset < 0) {*/ if (pVg->currentOffsetNew.type > 0 && !tOffsetEqual(&pVg->currentOffsetNew, &pVg->committedOffsetNew)) {
if (pVg->currentOffset < 0 || pVg->committedOffset == pVg->currentOffset) { if (tmqSendCommitReq(tmq, pVg, pTopic, pParamSet) < 0) {
tscDebug("consumer %ld skip commit for topic %s vg %d, current offset is %ld, committed offset is %ld",
tmq->consumerId, pTopic->topicName, pVg->vgId, pVg->currentOffset, pVg->committedOffset);
continue; continue;
} }
STqOffset* pOffset = taosMemoryCalloc(1, sizeof(STqOffset));
if (pOffset == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
} }
pOffset->type = TMQ_OFFSET__LOG;
pOffset->version = pVg->currentOffset;
int32_t groupLen = strlen(tmq->groupId);
memcpy(pOffset->subKey, tmq->groupId, groupLen);
pOffset->subKey[groupLen] = TMQ_SEPARATOR;
strcpy(pOffset->subKey + groupLen + 1, pTopic->topicName);
int32_t len;
int32_t code;
tEncodeSize(tEncodeSTqOffset, pOffset, len, code);
if (code < 0) {
ASSERT(0);
}
void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
((SMsgHead*)buf)->vgId = htonl(pVg->vgId);
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
SEncoder encoder;
tEncoderInit(&encoder, abuf, len);
tEncodeSTqOffset(&encoder, pOffset);
// build param
SMqCommitCbParam2* pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam2));
pParam->params = pParamSet;
pParam->pOffset = pOffset;
// build send info
SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (pMsgSendInfo == NULL) {
// TODO
continue;
}
pMsgSendInfo->msgInfo = (SDataBuf){
.pData = buf,
.len = sizeof(SMsgHead) + len,
.handle = NULL,
};
tscDebug("consumer %ld commit offset of %s on vg %d, offset is %ld", tmq->consumerId, pOffset->subKey, pVg->vgId,
pOffset->version);
// TODO: put into cb
pVg->committedOffset = pVg->currentOffset;
pMsgSendInfo->requestId = generateRequestId();
pMsgSendInfo->requestObjRefId = 0;
pMsgSendInfo->param = pParam;
pMsgSendInfo->fp = tmqCommitCb2;
pMsgSendInfo->msgType = TDMT_VND_MQ_COMMIT_OFFSET;
// send msg
int64_t transporterId = 0;
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, pMsgSendInfo);
pParamSet->waitingRspNum++;
pParamSet->totalRspNum++;
} }
} }
@ -1174,7 +1125,10 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
if (rspType == TMQ_MSG_TYPE__POLL_RSP) { if (rspType == TMQ_MSG_TYPE__POLL_RSP) {
memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead)); memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead));
tDecodeSMqDataBlkRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRspWrapper->dataRsp); SDecoder decoder;
tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
tDecodeSMqDataRsp(&decoder, &pRspWrapper->dataRsp);
/*tDecodeSMqDataBlkRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRspWrapper->dataRsp);*/
} else { } else {
ASSERT(rspType == TMQ_MSG_TYPE__POLL_META_RSP); ASSERT(rspType == TMQ_MSG_TYPE__POLL_META_RSP);
memcpy(&pRspWrapper->metaRsp, pMsg->pData, sizeof(SMqRspHead)); memcpy(&pRspWrapper->metaRsp, pMsg->pData, sizeof(SMqRspHead));
@ -1226,9 +1180,11 @@ bool tmqUpdateEp2(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) {
for (int32_t j = 0; j < vgNumCur; j++) { for (int32_t j = 0; j < vgNumCur; j++) {
SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j); SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j);
sprintf(vgKey, "%s:%d", pTopicCur->topicName, pVgCur->vgId); sprintf(vgKey, "%s:%d", pTopicCur->topicName, pVgCur->vgId);
tscDebug("consumer %ld epoch %d vg %d vgKey is %s, offset is %ld", tmq->consumerId, epoch, pVgCur->vgId, vgKey, char buf[50];
pVgCur->currentOffset); tFormatOffset(buf, 50, &pVgCur->currentOffsetNew);
taosHashPut(pHash, vgKey, strlen(vgKey), &pVgCur->currentOffset, sizeof(int64_t)); tscDebug("consumer %ld epoch %d vg %d vgKey is %s, offset is %s", tmq->consumerId, epoch, pVgCur->vgId, vgKey,
buf);
taosHashPut(pHash, vgKey, strlen(vgKey), &pVgCur->currentOffsetNew, sizeof(STqOffsetVal));
} }
} }
} }
@ -1257,7 +1213,7 @@ bool tmqUpdateEp2(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) {
pVgEp->vgId, offset, vgKey); pVgEp->vgId, offset, vgKey);
SMqClientVg clientVg = { SMqClientVg clientVg = {
.pollCnt = 0, .pollCnt = 0,
.currentOffset = offset, .currentOffsetNew.type = tmq->resetOffsetCfg,
.vgId = pVgEp->vgId, .vgId = pVgEp->vgId,
.epSet = pVgEp->epSet, .epSet = pVgEp->epSet,
.vgStatus = TMQ_VG_STATUS__IDLE, .vgStatus = TMQ_VG_STATUS__IDLE,
@ -1281,7 +1237,7 @@ bool tmqUpdateEp2(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) {
return set; return set;
} }
#if 1 #if 0
bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) { bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) {
/*printf("call update ep %d\n", epoch);*/ /*printf("call update ep %d\n", epoch);*/
bool set = false; bool set = false;
@ -1516,16 +1472,16 @@ int32_t tmq_seek(tmq_t* tmq, const tmq_topic_vgroup_t* offset) {
#endif #endif
SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t timeout, SMqClientTopic* pTopic, SMqClientVg* pVg) { SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t timeout, SMqClientTopic* pTopic, SMqClientVg* pVg) {
int64_t reqOffset; /*int64_t reqOffset;*/
if (pVg->currentOffset >= 0) { /*if (pVg->currentOffset >= 0) {*/
reqOffset = pVg->currentOffset; /*reqOffset = pVg->currentOffset;*/
} else { /*} else {*/
/*if (tmq->resetOffsetCfg == TMQ_CONF__RESET_OFFSET__NONE) {*/ /*if (tmq->resetOffsetCfg == TMQ_CONF__RESET_OFFSET__NONE) {*/
/*tscError("unable to poll since no committed offset but reset offset is set to none");*/ /*tscError("unable to poll since no committed offset but reset offset is set to none");*/
/*return NULL;*/ /*return NULL;*/
/*}*/ /*}*/
reqOffset = tmq->resetOffsetCfg; /*reqOffset = tmq->resetOffsetCfg;*/
} /*}*/
SMqPollReq* pReq = taosMemoryCalloc(1, sizeof(SMqPollReq)); SMqPollReq* pReq = taosMemoryCalloc(1, sizeof(SMqPollReq));
if (pReq == NULL) { if (pReq == NULL) {
@ -1544,7 +1500,8 @@ SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t timeout, SMqClientTopic*
pReq->timeout = timeout; pReq->timeout = timeout;
pReq->consumerId = tmq->consumerId; pReq->consumerId = tmq->consumerId;
pReq->epoch = tmq->epoch; pReq->epoch = tmq->epoch;
pReq->currentOffset = reqOffset; /*pReq->currentOffset = reqOffset;*/
pReq->reqOffset = pVg->currentOffsetNew;
pReq->reqId = generateRequestId(); pReq->reqId = generateRequestId();
pReq->useSnapshot = tmq->useSnapshot; pReq->useSnapshot = tmq->useSnapshot;
@ -1572,7 +1529,7 @@ SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) {
tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN); tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
pRspObj->vgId = pWrapper->vgHandle->vgId; pRspObj->vgId = pWrapper->vgHandle->vgId;
pRspObj->resIter = -1; pRspObj->resIter = -1;
memcpy(&pRspObj->rsp, &pWrapper->dataRsp, sizeof(SMqDataBlkRsp)); memcpy(&pRspObj->rsp, &pWrapper->dataRsp, sizeof(SMqDataRsp));
pRspObj->resInfo.totalRows = 0; pRspObj->resInfo.totalRows = 0;
pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI; pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
@ -1645,8 +1602,11 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
int64_t transporterId = 0; int64_t transporterId = 0;
/*printf("send poll\n");*/ /*printf("send poll\n");*/
tscDebug("consumer %ld send poll to %s : vg %d, epoch %d, req offset %ld, reqId %lu", tmq->consumerId,
pTopic->topicName, pVg->vgId, tmq->epoch, pVg->currentOffset, pReq->reqId); char offsetFormatBuf[50];
tFormatOffset(offsetFormatBuf, 50, &pVg->currentOffsetNew);
tscDebug("consumer %ld send poll to %s : vg %d, epoch %d, req offset %s, reqId %lu", tmq->consumerId,
pTopic->topicName, pVg->vgId, tmq->epoch, offsetFormatBuf, pReq->reqId);
/*printf("send vg %d %ld\n", pVg->vgId, pVg->currentOffset);*/ /*printf("send vg %d %ld\n", pVg->vgId, pVg->currentOffset);*/
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo); asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
pVg->pollCnt++; pVg->pollCnt++;
@ -1695,7 +1655,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
if (pollRspWrapper->dataRsp.head.epoch == consumerEpoch) { if (pollRspWrapper->dataRsp.head.epoch == consumerEpoch) {
SMqClientVg* pVg = pollRspWrapper->vgHandle; SMqClientVg* pVg = pollRspWrapper->vgHandle;
/*printf("vg %d offset %ld up to %ld\n", pVg->vgId, pVg->currentOffset, rspMsg->msg.rspOffset);*/ /*printf("vg %d offset %ld up to %ld\n", pVg->vgId, pVg->currentOffset, rspMsg->msg.rspOffset);*/
pVg->currentOffset = pollRspWrapper->dataRsp.rspOffset; pVg->currentOffsetNew = pollRspWrapper->dataRsp.rspOffset;
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
if (pollRspWrapper->dataRsp.blockNum == 0) { if (pollRspWrapper->dataRsp.blockNum == 0) {
taosFreeQitem(pollRspWrapper); taosFreeQitem(pollRspWrapper);
@ -1717,7 +1677,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
if (pollRspWrapper->metaRsp.head.epoch == consumerEpoch) { if (pollRspWrapper->metaRsp.head.epoch == consumerEpoch) {
SMqClientVg* pVg = pollRspWrapper->vgHandle; SMqClientVg* pVg = pollRspWrapper->vgHandle;
/*printf("vg %d offset %ld up to %ld\n", pVg->vgId, pVg->currentOffset, rspMsg->msg.rspOffset);*/ /*printf("vg %d offset %ld up to %ld\n", pVg->vgId, pVg->currentOffset, rspMsg->msg.rspOffset);*/
pVg->currentOffset = pollRspWrapper->metaRsp.rspOffset; pVg->currentOffsetNew = pollRspWrapper->metaRsp.rspOffsetNew;
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
// build rsp // build rsp
SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper); SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper);

View File

@ -2300,7 +2300,6 @@ int32_t tDeserializeSServerVerRsp(void *buf, int32_t bufLen, SServerVerRsp *pRsp
return 0; return 0;
} }
int32_t tSerializeSQnodeListRsp(void *buf, int32_t bufLen, SQnodeListRsp *pRsp) { int32_t tSerializeSQnodeListRsp(void *buf, int32_t bufLen, SQnodeListRsp *pRsp) {
SEncoder encoder = {0}; SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen); tEncoderInit(&encoder, buf, bufLen);
@ -2387,7 +2386,6 @@ int32_t tDeserializeSDnodeListRsp(void *buf, int32_t bufLen, SDnodeListRsp *pRsp
void tFreeSDnodeListRsp(SDnodeListRsp *pRsp) { taosArrayDestroy(pRsp->dnodeList); } void tFreeSDnodeListRsp(SDnodeListRsp *pRsp) { taosArrayDestroy(pRsp->dnodeList); }
int32_t tSerializeSCompactDbReq(void *buf, int32_t bufLen, SCompactDbReq *pReq) { int32_t tSerializeSCompactDbReq(void *buf, int32_t bufLen, SCompactDbReq *pReq) {
SEncoder encoder = {0}; SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen); tEncoderInit(&encoder, buf, bufLen);
@ -2921,7 +2919,6 @@ int32_t tDecodeSVariablesInfo(SDecoder* pDecoder, SVariablesInfo* pInfo) {
return 0; return 0;
} }
int32_t tSerializeSShowVariablesRsp(void *buf, int32_t bufLen, SShowVariablesRsp *pRsp) { int32_t tSerializeSShowVariablesRsp(void *buf, int32_t bufLen, SShowVariablesRsp *pRsp) {
SEncoder encoder = {0}; SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen); tEncoderInit(&encoder, buf, bufLen);
@ -5360,30 +5357,149 @@ void tFreeSMAlterStbRsp(SMAlterStbRsp *pRsp) {
} }
} }
int32_t tEncodeSTqOffset(SEncoder *pEncoder, const STqOffset *pOffset) { int32_t tEncodeSTqOffsetVal(SEncoder *pEncoder, const STqOffsetVal *pOffsetVal) {
if (tEncodeI8(pEncoder, pOffset->type) < 0) return -1; if (tEncodeI8(pEncoder, pOffsetVal->type) < 0) return -1;
if (pOffset->type == TMQ_OFFSET__SNAPSHOT) { if (pOffsetVal->type == TMQ_OFFSET__SNAPSHOT_DATA) {
if (tEncodeI64(pEncoder, pOffset->uid) < 0) return -1; if (tEncodeI64(pEncoder, pOffsetVal->uid) < 0) return -1;
if (tEncodeI64(pEncoder, pOffset->ts) < 0) return -1; if (tEncodeI64(pEncoder, pOffsetVal->ts) < 0) return -1;
} else if (pOffset->type == TMQ_OFFSET__LOG) { } else if (pOffsetVal->type == TMQ_OFFSET__LOG) {
if (tEncodeI64(pEncoder, pOffset->version) < 0) return -1; if (tEncodeI64(pEncoder, pOffsetVal->version) < 0) return -1;
} else if (pOffsetVal->type < 0) {
// do nothing
} else { } else {
ASSERT(0); ASSERT(0);
} }
return 0;
}
int32_t tDecodeSTqOffsetVal(SDecoder *pDecoder, STqOffsetVal *pOffsetVal) {
if (tDecodeI8(pDecoder, &pOffsetVal->type) < 0) return -1;
if (pOffsetVal->type == TMQ_OFFSET__SNAPSHOT_DATA) {
if (tDecodeI64(pDecoder, &pOffsetVal->uid) < 0) return -1;
if (tDecodeI64(pDecoder, &pOffsetVal->ts) < 0) return -1;
} else if (pOffsetVal->type == TMQ_OFFSET__LOG) {
if (tDecodeI64(pDecoder, &pOffsetVal->version) < 0) return -1;
} else if (pOffsetVal->type < 0) {
// do nothing
} else {
ASSERT(0);
}
return 0;
}
#if 1
int32_t tFormatOffset(char *buf, int32_t maxLen, const STqOffsetVal *pVal) {
if (pVal->type == TMQ_OFFSET__RESET_NONE) {
snprintf(buf, maxLen, "offset(reset to none)");
} else if (pVal->type == TMQ_OFFSET__RESET_EARLIEAST) {
snprintf(buf, maxLen, "offset(reset to earlieast)");
} else if (pVal->type == TMQ_OFFSET__RESET_LATEST) {
snprintf(buf, maxLen, "offset(reset to latest)");
} else if (pVal->type == TMQ_OFFSET__LOG) {
snprintf(buf, maxLen, "offset(log) ver:%ld", pVal->version);
} else if (pVal->type == TMQ_OFFSET__SNAPSHOT_DATA) {
snprintf(buf, maxLen, "offset(snapshot data) uid:%ld, ts:%ld", pVal->uid, pVal->ts);
} else if (pVal->type == TMQ_OFFSET__SNAPSHOT_META) {
snprintf(buf, maxLen, "offset(snapshot meta) uid:%ld, ts:%ld", pVal->uid, pVal->ts);
} else {
ASSERT(0);
}
return 0;
}
#endif
bool tOffsetEqual(const STqOffsetVal *pLeft, const STqOffsetVal *pRight) {
if (pLeft->type == pRight->type) {
if (pLeft->type == TMQ_OFFSET__LOG) {
return pLeft->version == pRight->version;
} else if (pLeft->type == TMQ_OFFSET__SNAPSHOT_DATA) {
return pLeft->uid == pRight->uid && pLeft->ts == pRight->ts;
} else if (pLeft->type == TMQ_OFFSET__SNAPSHOT_META) {
ASSERT(0);
// TODO
return pLeft->uid == pRight->uid && pLeft->ts == pRight->ts;
}
}
return false;
}
int32_t tEncodeSTqOffset(SEncoder *pEncoder, const STqOffset *pOffset) {
if (tEncodeSTqOffsetVal(pEncoder, &pOffset->val) < 0) return -1;
if (tEncodeCStr(pEncoder, pOffset->subKey) < 0) return -1; if (tEncodeCStr(pEncoder, pOffset->subKey) < 0) return -1;
return 0; return 0;
} }
int32_t tDecodeSTqOffset(SDecoder *pDecoder, STqOffset *pOffset) { int32_t tDecodeSTqOffset(SDecoder *pDecoder, STqOffset *pOffset) {
if (tDecodeI8(pDecoder, &pOffset->type) < 0) return -1; if (tDecodeSTqOffsetVal(pDecoder, &pOffset->val) < 0) return -1;
if (pOffset->type == TMQ_OFFSET__SNAPSHOT) {
if (tDecodeI64(pDecoder, &pOffset->uid) < 0) return -1;
if (tDecodeI64(pDecoder, &pOffset->ts) < 0) return -1;
} else if (pOffset->type == TMQ_OFFSET__LOG) {
if (tDecodeI64(pDecoder, &pOffset->version) < 0) return -1;
} else {
ASSERT(0);
}
if (tDecodeCStrTo(pDecoder, pOffset->subKey) < 0) return -1; if (tDecodeCStrTo(pDecoder, pOffset->subKey) < 0) return -1;
return 0; return 0;
} }
int32_t tEncodeSMqDataRsp(SEncoder *pEncoder, const SMqDataRsp *pRsp) {
if (tEncodeSTqOffsetVal(pEncoder, &pRsp->reqOffset) < 0) return -1;
if (tEncodeSTqOffsetVal(pEncoder, &pRsp->rspOffset) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->skipLogNum) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->blockNum) < 0) return -1;
if (pRsp->blockNum != 0) {
if (tEncodeI8(pEncoder, pRsp->withTbName) < 0) return -1;
if (tEncodeI8(pEncoder, pRsp->withSchema) < 0) return -1;
for (int32_t i = 0; i < pRsp->blockNum; i++) {
int32_t bLen = *(int32_t *)taosArrayGet(pRsp->blockDataLen, i);
void *data = taosArrayGetP(pRsp->blockData, i);
if (tEncodeBinary(pEncoder, (const uint8_t *)data, bLen) < 0) return -1;
if (pRsp->withSchema) {
SSchemaWrapper *pSW = (SSchemaWrapper *)taosArrayGetP(pRsp->blockSchema, i);
if (tEncodeSSchemaWrapper(pEncoder, pSW) < 0) return -1;
}
if (pRsp->withTbName) {
char *tbName = (char *)taosArrayGetP(pRsp->blockTbName, i);
if (tEncodeCStr(pEncoder, tbName) < 0) return -1;
}
}
}
return 0;
}
int32_t tDecodeSMqDataRsp(SDecoder *pDecoder, SMqDataRsp *pRsp) {
if (tDecodeSTqOffsetVal(pDecoder, &pRsp->reqOffset) < 0) return -1;
if (tDecodeSTqOffsetVal(pDecoder, &pRsp->rspOffset) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->skipLogNum) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->blockNum) < 0) return -1;
if (pRsp->blockNum != 0) {
pRsp->blockData = taosArrayInit(pRsp->blockNum, sizeof(void *));
pRsp->blockDataLen = taosArrayInit(pRsp->blockNum, sizeof(int32_t));
if (tDecodeI8(pDecoder, &pRsp->withTbName) < 0) return -1;
if (tDecodeI8(pDecoder, &pRsp->withSchema) < 0) return -1;
if (pRsp->withTbName) {
pRsp->blockTbName = taosArrayInit(pRsp->blockNum, sizeof(void *));
}
if (pRsp->withSchema) {
pRsp->blockSchema = taosArrayInit(pRsp->blockNum, sizeof(void *));
}
for (int32_t i = 0; i < pRsp->blockNum; i++) {
void *data;
uint64_t bLen;
if (tDecodeBinaryAlloc(pDecoder, &data, &bLen) < 0) return -1;
taosArrayPush(pRsp->blockData, &data);
int32_t len = bLen;
taosArrayPush(pRsp->blockDataLen, &len);
if (pRsp->withSchema) {
SSchemaWrapper *pSW = (SSchemaWrapper *)taosMemoryCalloc(1, sizeof(SSchemaWrapper));
if (pSW == NULL) return -1;
if (tDecodeSSchemaWrapper(pDecoder, pSW) < 0) return -1;
taosArrayPush(pRsp->blockSchema, &pSW);
}
if (pRsp->withTbName) {
char *tbName;
if (tDecodeCStrAlloc(pDecoder, &tbName) < 0) return -1;
taosArrayPush(pRsp->blockTbName, &tbName);
}
}
}
return 0;
}

View File

@ -139,19 +139,19 @@ void tsdbCleanupReadHandle(tsdbReaderT queryHandle);
// tq // tq
typedef struct STqReadHandle STqReadHandle; typedef struct STqReadHandle SStreamReader;
STqReadHandle *tqInitSubmitMsgScanner(SMeta *pMeta); SStreamReader *tqInitSubmitMsgScanner(SMeta *pMeta);
void tqReadHandleSetColIdList(STqReadHandle *pReadHandle, SArray *pColIdList); void tqReadHandleSetColIdList(SStreamReader *pReadHandle, SArray *pColIdList);
int32_t tqReadHandleSetTbUidList(STqReadHandle *pHandle, const SArray *tbUidList); int32_t tqReadHandleSetTbUidList(SStreamReader *pHandle, const SArray *tbUidList);
int32_t tqReadHandleAddTbUidList(STqReadHandle *pHandle, const SArray *tbUidList); int32_t tqReadHandleAddTbUidList(SStreamReader *pHandle, const SArray *tbUidList);
int32_t tqReadHandleRemoveTbUidList(STqReadHandle *pHandle, const SArray *tbUidList); int32_t tqReadHandleRemoveTbUidList(SStreamReader *pHandle, const SArray *tbUidList);
int32_t tqReadHandleSetMsg(STqReadHandle *pHandle, SSubmitReq *pMsg, int64_t ver); int32_t tqReadHandleSetMsg(SStreamReader *pHandle, SSubmitReq *pMsg, int64_t ver);
bool tqNextDataBlock(STqReadHandle *pHandle); bool tqNextDataBlock(SStreamReader *pHandle);
bool tqNextDataBlockFilterOut(STqReadHandle *pHandle, SHashObj *filterOutUids); bool tqNextDataBlockFilterOut(SStreamReader *pHandle, SHashObj *filterOutUids);
int32_t tqRetrieveDataBlock(SSDataBlock *pBlock, STqReadHandle *pHandle); int32_t tqRetrieveDataBlock(SSDataBlock *pBlock, SStreamReader *pHandle);
// sma // sma
int32_t smaGetTSmaDays(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days); int32_t smaGetTSmaDays(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days);

View File

@ -39,6 +39,16 @@ extern "C" {
#define tqInfo(...) do { if (tqDebugFlag & DEBUG_INFO) { taosPrintLog("TQ ", DEBUG_INFO, 255, __VA_ARGS__); }} while(0) #define tqInfo(...) do { if (tqDebugFlag & DEBUG_INFO) { taosPrintLog("TQ ", DEBUG_INFO, 255, __VA_ARGS__); }} while(0)
#define tqDebug(...) do { if (tqDebugFlag & DEBUG_DEBUG) { taosPrintLog("TQ ", DEBUG_DEBUG, tqDebugFlag, __VA_ARGS__); }} while(0) #define tqDebug(...) do { if (tqDebugFlag & DEBUG_DEBUG) { taosPrintLog("TQ ", DEBUG_DEBUG, tqDebugFlag, __VA_ARGS__); }} while(0)
#define tqTrace(...) do { if (tqDebugFlag & DEBUG_TRACE) { taosPrintLog("TQ ", DEBUG_TRACE, tqDebugFlag, __VA_ARGS__); }} while(0) #define tqTrace(...) do { if (tqDebugFlag & DEBUG_TRACE) { taosPrintLog("TQ ", DEBUG_TRACE, tqDebugFlag, __VA_ARGS__); }} while(0)
#define IS_META_MSG(x) ( \
x == TDMT_VND_CREATE_STB \
|| x == TDMT_VND_ALTER_STB \
|| x == TDMT_VND_DROP_STB \
|| x == TDMT_VND_CREATE_TABLE \
|| x == TDMT_VND_ALTER_TABLE \
|| x == TDMT_VND_DROP_TABLE \
|| x == TDMT_VND_DROP_TTL_TABLE \
)
// clang-format on // clang-format on
typedef struct STqOffsetStore STqOffsetStore; typedef struct STqOffsetStore STqOffsetStore;
@ -101,12 +111,13 @@ typedef struct {
typedef struct { typedef struct {
int8_t subType; int8_t subType;
STqReadHandle* pExecReader[5]; SStreamReader* pExecReader[5];
union { union {
STqExecCol execCol; STqExecCol execCol;
STqExecTb execTb; STqExecTb execTb;
STqExecDb execDb; STqExecDb execDb;
}; };
} STqExecHandle; } STqExecHandle;
typedef struct { typedef struct {
@ -149,9 +160,9 @@ static STqMgmt tqMgmt = {0};
int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalHead** pHeadWithCkSum); int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalHead** pHeadWithCkSum);
// tqExec // tqExec
int32_t tqDataExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataBlkRsp* pRsp, int32_t workerId); int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataRsp* pRsp, int32_t workerId);
int32_t tqScanSnapshot(STQ* pTq, const STqExecHandle* pExec, SMqDataBlkRsp* pRsp, int32_t workerId); int32_t tqScanSnapshot(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, int32_t workerId);
int32_t tqSendPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataBlkRsp* pRsp); int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp);
// tqMeta // tqMeta
int32_t tqMetaOpen(STQ* pTq); int32_t tqMetaOpen(STQ* pTq);

View File

@ -306,7 +306,7 @@ int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
STqReadHandle *pReadHandle = tqInitSubmitMsgScanner(pMeta); SStreamReader *pReadHandle = tqInitSubmitMsgScanner(pMeta);
if (!pReadHandle) { if (!pReadHandle) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _err; goto _err;
@ -590,8 +590,8 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
tdRefSmaStat(pSma, (SSmaStat *)pStat); tdRefSmaStat(pSma, (SSmaStat *)pStat);
SSDataBlock dataBlock = {.info.type = STREAM_GET_ALL}; SSDataBlock dataBlock = {.info.type = STREAM_GET_ALL};
qSetStreamInput(pItem->taskInfo, &dataBlock, STREAM_DATA_TYPE_SSDATA_BLOCK, false); qSetStreamInput(pItem->taskInfo, &dataBlock, STREAM_INPUT__DATA_BLOCK, false);
tdFetchAndSubmitRSmaResult(pItem, STREAM_DATA_TYPE_SSDATA_BLOCK); tdFetchAndSubmitRSmaResult(pItem, STREAM_INPUT__DATA_BLOCK);
tdUnRefSmaStat(pSma, (SSmaStat *)pStat); tdUnRefSmaStat(pSma, (SSmaStat *)pStat);
@ -611,12 +611,12 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType
smaDebug("vgId:%d, execute rsma %" PRIi8 " task for qTaskInfo:%p suid:%" PRIu64, SMA_VID(pSma), level, smaDebug("vgId:%d, execute rsma %" PRIi8 " task for qTaskInfo:%p suid:%" PRIu64, SMA_VID(pSma), level,
pItem->taskInfo, suid); pItem->taskInfo, suid);
if (qSetStreamInput(pItem->taskInfo, pMsg, inputType, true) < 0) { // STREAM_DATA_TYPE_SUBMIT_BLOCK if (qSetStreamInput(pItem->taskInfo, pMsg, inputType, true) < 0) { // INPUT__DATA_SUBMIT
smaError("vgId:%d, rsma % " PRIi8 " qSetStreamInput failed since %s", SMA_VID(pSma), level, tstrerror(terrno)); smaError("vgId:%d, rsma % " PRIi8 " qSetStreamInput failed since %s", SMA_VID(pSma), level, tstrerror(terrno));
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
tdFetchAndSubmitRSmaResult(pItem, STREAM_DATA_TYPE_SUBMIT_BLOCK); tdFetchAndSubmitRSmaResult(pItem, STREAM_INPUT__DATA_SUBMIT);
atomic_store_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE); atomic_store_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE);
smaDebug("vgId:%d, process rsma insert", SMA_VID(pSma)); smaDebug("vgId:%d, process rsma insert", SMA_VID(pSma));
@ -654,7 +654,7 @@ static int32_t tdExecuteRSma(SSma *pSma, const void *pMsg, int32_t inputType, tb
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (inputType == STREAM_DATA_TYPE_SUBMIT_BLOCK) { if (inputType == STREAM_INPUT__DATA_SUBMIT) {
tdExecuteRSmaImpl(pSma, pMsg, inputType, &pRSmaInfo->items[0], suid, TSDB_RETENTION_L1); tdExecuteRSmaImpl(pSma, pMsg, inputType, &pRSmaInfo->items[0], suid, TSDB_RETENTION_L1);
tdExecuteRSmaImpl(pSma, pMsg, inputType, &pRSmaInfo->items[1], suid, TSDB_RETENTION_L2); tdExecuteRSmaImpl(pSma, pMsg, inputType, &pRSmaInfo->items[1], suid, TSDB_RETENTION_L2);
} }
@ -675,7 +675,7 @@ int32_t tdProcessRSmaSubmit(SSma *pSma, void *pMsg, int32_t inputType) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (inputType == STREAM_DATA_TYPE_SUBMIT_BLOCK) { if (inputType == STREAM_INPUT__DATA_SUBMIT) {
STbUidStore uidStore = {0}; STbUidStore uidStore = {0};
tdFetchSubmitReqSuids(pMsg, &uidStore); tdFetchSubmitReqSuids(pMsg, &uidStore);

View File

@ -113,8 +113,23 @@ int32_t tqSendMetaPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq,
return 0; return 0;
} }
int32_t tqSendPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataBlkRsp* pRsp) { int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp) {
int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqDataBlkRsp(NULL, pRsp); ASSERT(taosArrayGetSize(pRsp->blockData) == pRsp->blockNum);
ASSERT(taosArrayGetSize(pRsp->blockDataLen) == pRsp->blockNum);
if (pRsp->withSchema) {
ASSERT(taosArrayGetSize(pRsp->blockSchema) == pRsp->blockNum);
} else {
ASSERT(taosArrayGetSize(pRsp->blockSchema) == 0);
}
int32_t len;
int32_t code;
tEncodeSize(tEncodeSMqDataRsp, pRsp, len, code);
if (code < 0) {
return -1;
}
int32_t tlen = sizeof(SMqRspHead) + len;
void* buf = rpcMallocCont(tlen); void* buf = rpcMallocCont(tlen);
if (buf == NULL) { if (buf == NULL) {
return -1; return -1;
@ -125,18 +140,26 @@ int32_t tqSendPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con
((SMqRspHead*)buf)->consumerId = pReq->consumerId; ((SMqRspHead*)buf)->consumerId = pReq->consumerId;
void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead)); void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
tEncodeSMqDataBlkRsp(&abuf, pRsp);
SRpcMsg resp = { SEncoder encoder;
tEncoderInit(&encoder, abuf, tlen);
tEncodeSMqDataRsp(&encoder, pRsp);
/*tEncodeSMqDataBlkRsp(&abuf, pRsp);*/
SRpcMsg rsp = {
.info = pMsg->info, .info = pMsg->info,
.pCont = buf, .pCont = buf,
.contLen = tlen, .contLen = tlen,
.code = 0, .code = 0,
}; };
tmsgSendRsp(&resp); tmsgSendRsp(&rsp);
tqDebug("vg %d from consumer %ld (epoch %d) send rsp, block num: %d, reqOffset: %ld, rspOffset: %ld", char buf1[50];
TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, pRsp->reqOffset, pRsp->rspOffset); char buf2[50];
tFormatOffset(buf1, 50, &pRsp->reqOffset);
tFormatOffset(buf2, 50, &pRsp->rspOffset);
tqDebug("vg %d from consumer %ld (epoch %d) send rsp, block num: %d, reqOffset: %s, rspOffset: %s",
TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2);
return 0; return 0;
} }
@ -151,17 +174,17 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen) {
} }
tDecoderClear(&decoder); tDecoderClear(&decoder);
if (offset.type == TMQ_OFFSET__SNAPSHOT) { if (offset.val.type == TMQ_OFFSET__SNAPSHOT_DATA) {
tqDebug("receive offset commit msg to %s on vg %d, offset(type:snapshot) uid: %ld, ts: %ld", offset.subKey, tqDebug("receive offset commit msg to %s on vg %d, offset(type:snapshot) uid: %ld, ts: %ld", offset.subKey,
TD_VID(pTq->pVnode), offset.uid, offset.ts); TD_VID(pTq->pVnode), offset.val.uid, offset.val.ts);
} else if (offset.type == TMQ_OFFSET__LOG) { } else if (offset.val.type == TMQ_OFFSET__LOG) {
tqDebug("receive offset commit msg to %s on vg %d, offset(type:log) version: %ld", offset.subKey, tqDebug("receive offset commit msg to %s on vg %d, offset(type:log) version: %ld", offset.subKey,
TD_VID(pTq->pVnode), offset.version); TD_VID(pTq->pVnode), offset.val.version);
} else { } else {
ASSERT(0); ASSERT(0);
} }
STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, offset.subKey); STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, offset.subKey);
if (pOffset == NULL || pOffset->version < offset.version) { if (pOffset == NULL || pOffset->val.version < offset.val.version) {
if (tqOffsetWrite(pTq->pOffsetStore, &offset) < 0) { if (tqOffsetWrite(pTq->pOffsetStore, &offset) < 0) {
ASSERT(0); ASSERT(0);
return -1; return -1;
@ -171,6 +194,237 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen) {
return 0; return 0;
} }
static int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq, int8_t subType) {
pRsp->reqOffset = pReq->reqOffset;
pRsp->blockData = taosArrayInit(0, sizeof(void*));
pRsp->blockDataLen = taosArrayInit(0, sizeof(int32_t));
if (pRsp->blockData == NULL || pRsp->blockDataLen == NULL) {
return -1;
}
pRsp->withTbName = pReq->withTbName;
if (pRsp->withTbName) {
pRsp->blockTbName = taosArrayInit(0, sizeof(void*));
if (pRsp->blockTbName == NULL) {
// TODO free
return -1;
}
}
if (subType == TOPIC_SUB_TYPE__COLUMN) {
pRsp->withSchema = false;
} else {
pRsp->withSchema = true;
pRsp->blockSchema = taosArrayInit(0, sizeof(void*));
if (pRsp->blockSchema == NULL) {
// TODO free
return -1;
}
}
return 0;
}
static int32_t tqInitMetaRsp(SMqMetaRsp* pRsp, const SMqPollReq* pReq) { return 0; }
static FORCE_INLINE void tqOffsetResetToData(STqOffsetVal* pOffsetVal, int64_t uid, int64_t ts) {
pOffsetVal->type = TMQ_OFFSET__SNAPSHOT_DATA;
pOffsetVal->uid = uid;
pOffsetVal->ts = ts;
}
static FORCE_INLINE void tqOffsetResetToLog(STqOffsetVal* pOffsetVal, int64_t ver) {
pOffsetVal->type = TMQ_OFFSET__LOG;
pOffsetVal->version = ver;
}
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
SMqPollReq* pReq = pMsg->pCont;
int64_t consumerId = pReq->consumerId;
int64_t timeout = pReq->timeout;
int32_t reqEpoch = pReq->epoch;
int32_t code = 0;
STqOffsetVal reqOffset = pReq->reqOffset;
STqOffsetVal fetchOffsetNew;
// 1.find handle
char buf[50];
tFormatOffset(buf, 50, &reqOffset);
tqDebug("tmq poll: consumer %ld (epoch %d) recv poll req in vg %d, req offset %s", consumerId, pReq->epoch,
TD_VID(pTq->pVnode), buf);
STqHandle* pHandle = taosHashGet(pTq->handles, pReq->subKey, strlen(pReq->subKey));
/*ASSERT(pHandle);*/
if (pHandle == NULL) {
tqError("tmq poll: no consumer handle for consumer %ld in vg %d, subkey %s", consumerId, TD_VID(pTq->pVnode),
pReq->subKey);
return -1;
}
// check rebalance
if (pHandle->consumerId != consumerId) {
tqError("tmq poll: consumer handle mismatch for consumer %ld in vg %d, subkey %s, handle consumer id %ld",
consumerId, TD_VID(pTq->pVnode), pReq->subKey, pHandle->consumerId);
return -1;
}
// update epoch if need
int32_t consumerEpoch = atomic_load_32(&pHandle->epoch);
while (consumerEpoch < reqEpoch) {
consumerEpoch = atomic_val_compare_exchange_32(&pHandle->epoch, consumerEpoch, reqEpoch);
}
// 2.reset offset if needed
if (reqOffset.type > 0) {
fetchOffsetNew = reqOffset;
} else {
STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, pReq->subKey);
if (pOffset != NULL) {
fetchOffsetNew = pOffset->val;
char formatBuf[50];
tFormatOffset(formatBuf, 50, &fetchOffsetNew);
tqDebug("tmq poll: consumer %ld, offset reset to %s", consumerId, formatBuf);
} else {
if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEAST) {
if (pReq->useSnapshot) {
if (!pHandle->fetchMeta) {
tqOffsetResetToData(&fetchOffsetNew, 0, 0);
} else {
// reset to meta
ASSERT(0);
}
} else {
tqOffsetResetToLog(&fetchOffsetNew, walGetFirstVer(pTq->pVnode->pWal));
}
} else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
tqOffsetResetToLog(&fetchOffsetNew, walGetLastVer(pTq->pVnode->pWal));
} else if (reqOffset.type == TMQ_OFFSET__RESET_NONE) {
tqError("tmq poll: no offset committed for consumer %ld in vg %d, subkey %s, reset none failed", consumerId,
TD_VID(pTq->pVnode), pReq->subKey);
terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
return -1;
}
}
}
// 3.query
SMqDataRsp dataRsp = {0};
tqInitDataRsp(&dataRsp, pReq, pHandle->execHandle.subType);
if (fetchOffsetNew.type == TMQ_OFFSET__LOG) {
int64_t fetchVer = fetchOffsetNew.version + 1;
SWalHead* pHeadWithCkSum = taosMemoryMalloc(sizeof(SWalHead) + 2048);
if (pHeadWithCkSum == NULL) {
return -1;
}
walSetReaderCapacity(pHandle->pWalReader, 2048);
while (1) {
consumerEpoch = atomic_load_32(&pHandle->epoch);
if (consumerEpoch > reqEpoch) {
tqWarn("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, found new consumer epoch %d, discard req epoch %d",
consumerId, pReq->epoch, TD_VID(pTq->pVnode), fetchVer, consumerEpoch, reqEpoch);
break;
}
if (tqFetchLog(pTq, pHandle, &fetchVer, &pHeadWithCkSum) < 0) {
// TODO add push mgr
tqOffsetResetToLog(&dataRsp.rspOffset, fetchVer);
if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) {
code = -1;
}
goto OVER;
}
SWalReadHead* pHead = &pHeadWithCkSum->head;
tqDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d offset %ld msgType %d", consumerId, pReq->epoch,
TD_VID(pTq->pVnode), fetchVer, pHead->msgType);
if (pHead->msgType == TDMT_VND_SUBMIT) {
SSubmitReq* pCont = (SSubmitReq*)&pHead->body;
if (tqLogScanExec(pTq, &pHandle->execHandle, pCont, &dataRsp, workerId) < 0) {
ASSERT(0);
}
// TODO batch optimization:
// TODO continue scan until meeting batch requirement
if (dataRsp.blockNum > 0 /* threshold */) {
tqOffsetResetToLog(&dataRsp.rspOffset, fetchVer);
if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) {
code = -1;
}
} else {
fetchVer++;
}
goto OVER;
} else {
ASSERT(pHandle->fetchMeta);
ASSERT(IS_META_MSG(pHead->msgType));
tqInfo("fetch meta msg, ver: %ld, type: %d", pHead->version, pHead->msgType);
SMqMetaRsp metaRsp = {0};
metaRsp.reqOffset = pReq->reqOffset.version;
/*tqOffsetResetToLog(&metaR)*/
metaRsp.rspOffset = fetchVer;
metaRsp.resMsgType = pHead->msgType;
metaRsp.metaRspLen = pHead->bodyLen;
metaRsp.metaRsp = pHead->body;
if (tqSendMetaPollRsp(pTq, pMsg, pReq, &metaRsp) < 0) {
code = -1;
goto OVER;
}
code = 0;
goto OVER;
}
}
taosMemoryFree(pHeadWithCkSum);
} else if (fetchOffsetNew.type == TMQ_OFFSET__SNAPSHOT_DATA) {
// 1. set uid and ts
// 2. get data (rebuild reader if needed)
// 3. get new uid and ts
char formatBuf[50];
tFormatOffset(formatBuf, 50, &dataRsp.reqOffset);
tqInfo("retrieve using snapshot req offset %s", formatBuf);
if (tqScanSnapshot(pTq, &pHandle->execHandle, &dataRsp, workerId) < 0) {
ASSERT(0);
}
// 4. send rsp
if (dataRsp.blockNum != 0) {
tqOffsetResetToData(&dataRsp.rspOffset, 0, 0);
if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) {
code = -1;
}
}
} else if (fetchOffsetNew.type == TMQ_OFFSET__SNAPSHOT_META) {
ASSERT(0);
}
OVER:
// TODO wrap in destroy func
taosArrayDestroy(dataRsp.blockDataLen);
taosArrayDestroyP(dataRsp.blockData, (FDelete)taosMemoryFree);
if (dataRsp.withSchema) {
taosArrayDestroyP(dataRsp.blockSchema, (FDelete)tDeleteSSchemaWrapper);
}
if (dataRsp.withTbName) {
taosArrayDestroyP(dataRsp.blockTbName, (FDelete)taosMemoryFree);
}
return code;
}
#if 0
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
SMqPollReq* pReq = pMsg->pCont; SMqPollReq* pReq = pMsg->pCont;
int64_t consumerId = pReq->consumerId; int64_t consumerId = pReq->consumerId;
@ -185,10 +439,10 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
} else { } else {
STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, pReq->subKey); STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, pReq->subKey);
if (pOffset != NULL) { if (pOffset != NULL) {
ASSERT(pOffset->type == TMQ_OFFSET__LOG); ASSERT(pOffset->val.type == TMQ_OFFSET__LOG);
tqDebug("consumer %ld, restore offset of %s on vg %d, offset(type:log) version: %ld", consumerId, pReq->subKey, tqDebug("consumer %ld, restore offset of %s on vg %d, offset(type:log) version: %ld", consumerId, pReq->subKey,
TD_VID(pTq->pVnode), pOffset->version); TD_VID(pTq->pVnode), pOffset->val.version);
fetchOffset = pOffset->version + 1; fetchOffset = pOffset->val.version + 1;
} else { } else {
if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__EARLIEAST) { if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__EARLIEAST) {
fetchOffset = walGetFirstVer(pTq->pWal); fetchOffset = walGetFirstVer(pTq->pWal);
@ -241,12 +495,11 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
if (rsp.withTbName) { if (rsp.withTbName) {
rsp.blockTbName = taosArrayInit(0, sizeof(void*)); rsp.blockTbName = taosArrayInit(0, sizeof(void*));
} }
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
rsp.withSchema = false; rsp.withSchema = false;
rsp.withTag = false;
} else { } else {
rsp.withSchema = true; rsp.withSchema = true;
rsp.withTag = false;
rsp.blockSchema = taosArrayInit(0, sizeof(void*)); rsp.blockSchema = taosArrayInit(0, sizeof(void*));
} }
@ -302,10 +555,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
} }
} else { } else {
ASSERT(pHandle->fetchMeta); ASSERT(pHandle->fetchMeta);
ASSERT(pHead->msgType == TDMT_VND_CREATE_STB || pHead->msgType == TDMT_VND_ALTER_STB || ASSERT(IS_META_MSG(pHead->msgType));
pHead->msgType == TDMT_VND_DROP_STB || pHead->msgType == TDMT_VND_CREATE_TABLE ||
pHead->msgType == TDMT_VND_ALTER_TABLE || pHead->msgType == TDMT_VND_DROP_TABLE ||
pHead->msgType == TDMT_VND_DROP_TTL_TABLE);
tqInfo("fetch meta msg, ver: %ld, type: %d", pHead->version, pHead->msgType); tqInfo("fetch meta msg, ver: %ld, type: %d", pHead->version, pHead->msgType);
SMqMetaRsp metaRsp = {0}; SMqMetaRsp metaRsp = {0};
metaRsp.reqOffset = pReq->currentOffset; metaRsp.reqOffset = pReq->currentOffset;
@ -341,7 +591,7 @@ SEND_RSP:
rsp.rspOffset = fetchOffset; rsp.rspOffset = fetchOffset;
if (tqSendPollRsp(pTq, pMsg, pReq, &rsp) < 0) { if (tqSendDataRsp(pTq, pMsg, pReq, &rsp) < 0) {
code = -1; code = -1;
} }
OVER: OVER:
@ -359,6 +609,7 @@ OVER:
return code; return code;
} }
#endif
int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen) { int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen) {
SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg; SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
@ -403,7 +654,6 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
.reader = pHandle->execHandle.pExecReader[i], .reader = pHandle->execHandle.pExecReader[i],
.meta = pTq->pVnode->pMeta, .meta = pTq->pVnode->pMeta,
.vnode = pTq->pVnode, .vnode = pTq->pVnode,
// .initTsdbReader = 1,
}; };
pHandle->execHandle.execCol.task[i] = qCreateStreamExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle); pHandle->execHandle.execCol.task[i] = qCreateStreamExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle);
ASSERT(pHandle->execHandle.execCol.task[i]); ASSERT(pHandle->execHandle.execCol.task[i]);
@ -474,12 +724,11 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen) {
if (pTask->execType != TASK_EXEC__NONE) { if (pTask->execType != TASK_EXEC__NONE) {
// expand runners // expand runners
if (pTask->isDataScan) { if (pTask->isDataScan) {
STqReadHandle* pStreamReader = tqInitSubmitMsgScanner(pTq->pVnode->pMeta); SStreamReader* pStreamReader = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);
SReadHandle handle = { SReadHandle handle = {
.reader = pStreamReader, .reader = pStreamReader,
.meta = pTq->pVnode->pMeta, .meta = pTq->pVnode->pMeta,
.vnode = pTq->pVnode, .vnode = pTq->pVnode,
// .initTsdbReader = 1,
}; };
/*pTask->exec.inputHandle = pStreamReader;*/ /*pTask->exec.inputHandle = pStreamReader;*/
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle); pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle);

View File

@ -15,7 +15,7 @@
#include "tq.h" #include "tq.h"
static int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataBlkRsp* pRsp) { static int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp) {
int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock); int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock);
void* buf = taosMemoryCalloc(1, dataStrLen); void* buf = taosMemoryCalloc(1, dataStrLen);
if (buf == NULL) return -1; if (buf == NULL) return -1;
@ -37,7 +37,7 @@ static int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataBlkRsp* pRs
return 0; return 0;
} }
static int32_t tqAddBlockSchemaToRsp(const STqExecHandle* pExec, int32_t workerId, SMqDataBlkRsp* pRsp) { static int32_t tqAddBlockSchemaToRsp(const STqExecHandle* pExec, int32_t workerId, SMqDataRsp* pRsp) {
SSchemaWrapper* pSW = tCloneSSchemaWrapper(pExec->pExecReader[workerId]->pSchemaWrapper); SSchemaWrapper* pSW = tCloneSSchemaWrapper(pExec->pExecReader[workerId]->pSchemaWrapper);
if (pSW == NULL) { if (pSW == NULL) {
return -1; return -1;
@ -46,10 +46,9 @@ static int32_t tqAddBlockSchemaToRsp(const STqExecHandle* pExec, int32_t workerI
return 0; return 0;
} }
static int32_t tqAddTbNameToRsp(const STQ* pTq, const STqExecHandle* pExec, SMqDataBlkRsp* pRsp, int32_t workerId) { static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp, int32_t workerId) {
SMetaReader mr = {0}; SMetaReader mr = {0};
metaReaderInit(&mr, pTq->pVnode->pMeta, 0); metaReaderInit(&mr, pTq->pVnode->pMeta, 0);
int64_t uid = pExec->pExecReader[workerId]->msgIter.uid;
if (metaGetTableEntryByUid(&mr, uid) < 0) { if (metaGetTableEntryByUid(&mr, uid) < 0) {
ASSERT(0); ASSERT(0);
return -1; return -1;
@ -60,13 +59,13 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, const STqExecHandle* pExec, SMqD
return 0; return 0;
} }
int32_t tqScanSnapshot(STQ* pTq, const STqExecHandle* pExec, SMqDataBlkRsp* pRsp, int32_t workerId) { int32_t tqScanSnapshot(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, int32_t workerId) {
ASSERT(pExec->subType == TOPIC_SUB_TYPE__COLUMN); ASSERT(pExec->subType == TOPIC_SUB_TYPE__COLUMN);
qTaskInfo_t task = pExec->execCol.task[workerId]; qTaskInfo_t task = pExec->execCol.task[workerId];
// TODO set uid and ts
if (qStreamScanSnapshot(task) < 0) { if (qStreamScanSnapshot(task) < 0) {
ASSERT(0); ASSERT(0);
} }
// set version
while (1) { while (1) {
SSDataBlock* pDataBlock = NULL; SSDataBlock* pDataBlock = NULL;
uint64_t ts = 0; uint64_t ts = 0;
@ -79,17 +78,24 @@ int32_t tqScanSnapshot(STQ* pTq, const STqExecHandle* pExec, SMqDataBlkRsp* pRsp
ASSERT(taosArrayGetSize(pDataBlock->pDataBlock) != 0); ASSERT(taosArrayGetSize(pDataBlock->pDataBlock) != 0);
tqAddBlockDataToRsp(pDataBlock, pRsp); tqAddBlockDataToRsp(pDataBlock, pRsp);
if (pRsp->withTbName) {
// TODO
pRsp->withTbName = 0;
/*int64_t uid = 0;*/
/*tqAddTbNameToRsp(pTq, uid, pRsp, workerId);*/
}
pRsp->blockNum++; pRsp->blockNum++;
} }
return 0; return 0;
} }
int32_t tqDataExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataBlkRsp* pRsp, int32_t workerId) { int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataRsp* pRsp, int32_t workerId) {
if (pExec->subType == TOPIC_SUB_TYPE__COLUMN) { if (pExec->subType == TOPIC_SUB_TYPE__COLUMN) {
qTaskInfo_t task = pExec->execCol.task[workerId]; qTaskInfo_t task = pExec->execCol.task[workerId];
ASSERT(task); ASSERT(task);
qSetStreamInput(task, pReq, STREAM_DATA_TYPE_SUBMIT_BLOCK, false); qSetStreamInput(task, pReq, STREAM_INPUT__DATA_SUBMIT, false);
while (1) { while (1) {
SSDataBlock* pDataBlock = NULL; SSDataBlock* pDataBlock = NULL;
uint64_t ts = 0; uint64_t ts = 0;
@ -102,13 +108,14 @@ int32_t tqDataExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataBlkR
tqAddBlockDataToRsp(pDataBlock, pRsp); tqAddBlockDataToRsp(pDataBlock, pRsp);
if (pRsp->withTbName) { if (pRsp->withTbName) {
tqAddTbNameToRsp(pTq, pExec, pRsp, workerId); int64_t uid = pExec->pExecReader[workerId]->msgIter.uid;
tqAddTbNameToRsp(pTq, uid, pRsp, workerId);
} }
pRsp->blockNum++; pRsp->blockNum++;
} }
} else if (pExec->subType == TOPIC_SUB_TYPE__TABLE) { } else if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
pRsp->withSchema = 1; pRsp->withSchema = 1;
STqReadHandle* pReader = pExec->pExecReader[workerId]; SStreamReader* pReader = pExec->pExecReader[workerId];
tqReadHandleSetMsg(pReader, pReq, 0); tqReadHandleSetMsg(pReader, pReq, 0);
while (tqNextDataBlock(pReader)) { while (tqNextDataBlock(pReader)) {
SSDataBlock block = {0}; SSDataBlock block = {0};
@ -118,14 +125,15 @@ int32_t tqDataExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataBlkR
} }
tqAddBlockDataToRsp(&block, pRsp); tqAddBlockDataToRsp(&block, pRsp);
if (pRsp->withTbName) { if (pRsp->withTbName) {
tqAddTbNameToRsp(pTq, pExec, pRsp, workerId); int64_t uid = pExec->pExecReader[workerId]->msgIter.uid;
tqAddTbNameToRsp(pTq, uid, pRsp, workerId);
} }
tqAddBlockSchemaToRsp(pExec, workerId, pRsp); tqAddBlockSchemaToRsp(pExec, workerId, pRsp);
pRsp->blockNum++; pRsp->blockNum++;
} }
} else if (pExec->subType == TOPIC_SUB_TYPE__DB) { } else if (pExec->subType == TOPIC_SUB_TYPE__DB) {
pRsp->withSchema = 1; pRsp->withSchema = 1;
STqReadHandle* pReader = pExec->pExecReader[workerId]; SStreamReader* pReader = pExec->pExecReader[workerId];
tqReadHandleSetMsg(pReader, pReq, 0); tqReadHandleSetMsg(pReader, pReq, 0);
while (tqNextDataBlockFilterOut(pReader, pExec->execDb.pFilterOutTbUid)) { while (tqNextDataBlockFilterOut(pReader, pExec->execDb.pFilterOutTbUid)) {
SSDataBlock block = {0}; SSDataBlock block = {0};
@ -135,7 +143,8 @@ int32_t tqDataExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataBlkR
} }
tqAddBlockDataToRsp(&block, pRsp); tqAddBlockDataToRsp(&block, pRsp);
if (pRsp->withTbName) { if (pRsp->withTbName) {
tqAddTbNameToRsp(pTq, pExec, pRsp, workerId); int64_t uid = pExec->pExecReader[workerId]->msgIter.uid;
tqAddTbNameToRsp(pTq, uid, pRsp, workerId);
} }
tqAddBlockSchemaToRsp(pExec, workerId, pRsp); tqAddBlockSchemaToRsp(pExec, workerId, pRsp);
pRsp->blockNum++; pRsp->blockNum++;

View File

@ -92,8 +92,8 @@ STqOffset* tqOffsetRead(STqOffsetStore* pStore, const char* subscribeKey) {
} }
int32_t tqOffsetWrite(STqOffsetStore* pStore, const STqOffset* pOffset) { int32_t tqOffsetWrite(STqOffsetStore* pStore, const STqOffset* pOffset) {
ASSERT(pOffset->type == TMQ_OFFSET__LOG); /*ASSERT(pOffset->val.type == TMQ_OFFSET__LOG);*/
ASSERT(pOffset->version >= 0); /*ASSERT(pOffset->val.version >= 0);*/
return taosHashPut(pStore->pHash, pOffset->subKey, strlen(pOffset->subKey), pOffset, sizeof(STqOffset)); return taosHashPut(pStore->pHash, pOffset->subKey, strlen(pOffset->subKey), pOffset, sizeof(STqOffset));
} }

View File

@ -15,16 +15,17 @@
#include "tq.h" #include "tq.h"
#if 0
void tqTmrRspFunc(void* param, void* tmrId) { void tqTmrRspFunc(void* param, void* tmrId) {
STqHandle* pHandle = (STqHandle*)param; STqHandle* pHandle = (STqHandle*)param;
atomic_store_8(&pHandle->pushHandle.tmrStopped, 1); atomic_store_8(&pHandle->pushHandle.tmrStopped, 1);
} }
static int32_t tqLoopExecFromQueue(STQ* pTq, STqHandle* pHandle, SStreamDataSubmit** ppSubmit, SMqDataBlkRsp* pRsp) { static int32_t tqLoopExecFromQueue(STQ* pTq, STqHandle* pHandle, SStreamDataSubmit** ppSubmit, SMqDataRsp* pRsp) {
SStreamDataSubmit* pSubmit = *ppSubmit; SStreamDataSubmit* pSubmit = *ppSubmit;
while (pSubmit != NULL) { while (pSubmit != NULL) {
ASSERT(pSubmit->ver == pHandle->pushHandle.processedVer + 1); ASSERT(pSubmit->ver == pHandle->pushHandle.processedVer + 1);
if (tqDataExec(pTq, &pHandle->execHandle, pSubmit->data, pRsp, 0) < 0) { if (tqLogScanExec(pTq, &pHandle->execHandle, pSubmit->data, pRsp, 0) < 0) {
/*ASSERT(0);*/ /*ASSERT(0);*/
} }
// update processed // update processed
@ -43,7 +44,7 @@ static int32_t tqLoopExecFromQueue(STQ* pTq, STqHandle* pHandle, SStreamDataSubm
} }
int32_t tqExecFromInputQ(STQ* pTq, STqHandle* pHandle) { int32_t tqExecFromInputQ(STQ* pTq, STqHandle* pHandle) {
SMqDataBlkRsp rsp = {0}; SMqDataRsp rsp = {0};
// 1. guard and set status executing // 1. guard and set status executing
int8_t execStatus = atomic_val_compare_exchange_8(&pHandle->pushHandle.execStatus, TASK_EXEC_STATUS__IDLE, int8_t execStatus = atomic_val_compare_exchange_8(&pHandle->pushHandle.execStatus, TASK_EXEC_STATUS__IDLE,
TASK_EXEC_STATUS__EXECUTING); TASK_EXEC_STATUS__EXECUTING);
@ -175,13 +176,13 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_
taosWLockLatch(&pHandle->pushHandle.lock); taosWLockLatch(&pHandle->pushHandle.lock);
SMqDataBlkRsp rsp = {0}; SMqDataRsp rsp = {0};
rsp.reqOffset = pHandle->pushHandle.reqOffset; rsp.reqOffset = pHandle->pushHandle.reqOffset;
rsp.blockData = taosArrayInit(0, sizeof(void*)); rsp.blockData = taosArrayInit(0, sizeof(void*));
rsp.blockDataLen = taosArrayInit(0, sizeof(int32_t)); rsp.blockDataLen = taosArrayInit(0, sizeof(int32_t));
if (msgType == TDMT_VND_SUBMIT) { if (msgType == TDMT_VND_SUBMIT) {
tqDataExec(pTq, &pHandle->execHandle, pReq, &rsp, workerId); tqLogScanExec(pTq, &pHandle->execHandle, pReq, &rsp, workerId);
} else { } else {
// TODO // TODO
ASSERT(0); ASSERT(0);
@ -233,6 +234,7 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_
return 0; return 0;
} }
#endif
int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) { int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) {
if (msgType == TDMT_VND_SUBMIT) { if (msgType == TDMT_VND_SUBMIT) {

View File

@ -44,10 +44,7 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalHead*
} else { } else {
if (pHandle->fetchMeta) { if (pHandle->fetchMeta) {
SWalReadHead* pHead = &((*ppHeadWithCkSum)->head); SWalReadHead* pHead = &((*ppHeadWithCkSum)->head);
if (pHead->msgType == TDMT_VND_CREATE_STB || pHead->msgType == TDMT_VND_ALTER_STB || if (IS_META_MSG(pHead->msgType)) {
pHead->msgType == TDMT_VND_DROP_STB || pHead->msgType == TDMT_VND_CREATE_TABLE ||
pHead->msgType == TDMT_VND_ALTER_TABLE || pHead->msgType == TDMT_VND_DROP_TABLE ||
pHead->msgType == TDMT_VND_DROP_TTL_TABLE) {
code = walFetchBody(pHandle->pWalReader, ppHeadWithCkSum); code = walFetchBody(pHandle->pWalReader, ppHeadWithCkSum);
if (code < 0) { if (code < 0) {
@ -76,8 +73,8 @@ END:
return code; return code;
} }
STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta) { SStreamReader* tqInitSubmitMsgScanner(SMeta* pMeta) {
STqReadHandle* pReadHandle = taosMemoryMalloc(sizeof(STqReadHandle)); SStreamReader* pReadHandle = taosMemoryMalloc(sizeof(SStreamReader));
if (pReadHandle == NULL) { if (pReadHandle == NULL) {
return NULL; return NULL;
} }
@ -85,15 +82,15 @@ STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta) {
pReadHandle->pMsg = NULL; pReadHandle->pMsg = NULL;
pReadHandle->ver = -1; pReadHandle->ver = -1;
pReadHandle->pColIdList = NULL; pReadHandle->pColIdList = NULL;
pReadHandle->cachedSchemaVer = -1; pReadHandle->cachedSchemaVer = 0;
pReadHandle->cachedSchemaSuid = -1; pReadHandle->cachedSchemaSuid = 0;
pReadHandle->pSchema = NULL; pReadHandle->pSchema = NULL;
pReadHandle->pSchemaWrapper = NULL; pReadHandle->pSchemaWrapper = NULL;
pReadHandle->tbIdHash = NULL; pReadHandle->tbIdHash = NULL;
return pReadHandle; return pReadHandle;
} }
int32_t tqReadHandleSetMsg(STqReadHandle* pReadHandle, SSubmitReq* pMsg, int64_t ver) { int32_t tqReadHandleSetMsg(SStreamReader* pReadHandle, SSubmitReq* pMsg, int64_t ver) {
pReadHandle->pMsg = pMsg; pReadHandle->pMsg = pMsg;
if (tInitSubmitMsgIter(pMsg, &pReadHandle->msgIter) < 0) return -1; if (tInitSubmitMsgIter(pMsg, &pReadHandle->msgIter) < 0) return -1;
@ -108,7 +105,7 @@ int32_t tqReadHandleSetMsg(STqReadHandle* pReadHandle, SSubmitReq* pMsg, int64_t
return 0; return 0;
} }
bool tqNextDataBlock(STqReadHandle* pHandle) { bool tqNextDataBlock(SStreamReader* pHandle) {
if (pHandle->pMsg == NULL) return false; if (pHandle->pMsg == NULL) return false;
while (1) { while (1) {
if (tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock) < 0) { if (tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock) < 0) {
@ -130,7 +127,7 @@ bool tqNextDataBlock(STqReadHandle* pHandle) {
return false; return false;
} }
bool tqNextDataBlockFilterOut(STqReadHandle* pHandle, SHashObj* filterOutUids) { bool tqNextDataBlockFilterOut(SStreamReader* pHandle, SHashObj* filterOutUids) {
while (1) { while (1) {
if (tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock) < 0) { if (tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock) < 0) {
return false; return false;
@ -146,7 +143,7 @@ bool tqNextDataBlockFilterOut(STqReadHandle* pHandle, SHashObj* filterOutUids) {
return false; return false;
} }
int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReadHandle* pHandle) { int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, SStreamReader* pHandle) {
// TODO: cache multiple schema // TODO: cache multiple schema
int32_t sversion = htonl(pHandle->pBlock->sversion); int32_t sversion = htonl(pHandle->pBlock->sversion);
if (pHandle->cachedSchemaSuid == 0 || pHandle->cachedSchemaVer != sversion || if (pHandle->cachedSchemaSuid == 0 || pHandle->cachedSchemaVer != sversion ||
@ -256,9 +253,9 @@ FAIL:
return -1; return -1;
} }
void tqReadHandleSetColIdList(STqReadHandle* pReadHandle, SArray* pColIdList) { pReadHandle->pColIdList = pColIdList; } void tqReadHandleSetColIdList(SStreamReader* pReadHandle, SArray* pColIdList) { pReadHandle->pColIdList = pColIdList; }
int tqReadHandleSetTbUidList(STqReadHandle* pHandle, const SArray* tbUidList) { int tqReadHandleSetTbUidList(SStreamReader* pHandle, const SArray* tbUidList) {
if (pHandle->tbIdHash) { if (pHandle->tbIdHash) {
taosHashClear(pHandle->tbIdHash); taosHashClear(pHandle->tbIdHash);
} }
@ -277,7 +274,7 @@ int tqReadHandleSetTbUidList(STqReadHandle* pHandle, const SArray* tbUidList) {
return 0; return 0;
} }
int tqReadHandleAddTbUidList(STqReadHandle* pHandle, const SArray* tbUidList) { int tqReadHandleAddTbUidList(SStreamReader* pHandle, const SArray* tbUidList) {
if (pHandle->tbIdHash == NULL) { if (pHandle->tbIdHash == NULL) {
pHandle->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); pHandle->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
if (pHandle->tbIdHash == NULL) { if (pHandle->tbIdHash == NULL) {
@ -294,7 +291,7 @@ int tqReadHandleAddTbUidList(STqReadHandle* pHandle, const SArray* tbUidList) {
return 0; return 0;
} }
int tqReadHandleRemoveTbUidList(STqReadHandle* pHandle, const SArray* tbUidList) { int tqReadHandleRemoveTbUidList(SStreamReader* pHandle, const SArray* tbUidList) {
ASSERT(pHandle->tbIdHash != NULL); ASSERT(pHandle->tbIdHash != NULL);
for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) { for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {

View File

@ -780,7 +780,7 @@ _exit:
// TODO: the partial success scenario and the error case // TODO: the partial success scenario and the error case
// TODO: refactor // TODO: refactor
if ((terrno == TSDB_CODE_SUCCESS) && (pRsp->code == TSDB_CODE_SUCCESS)) { if ((terrno == TSDB_CODE_SUCCESS) && (pRsp->code == TSDB_CODE_SUCCESS)) {
tdProcessRSmaSubmit(pVnode->pSma, pReq, STREAM_DATA_TYPE_SUBMIT_BLOCK); tdProcessRSmaSubmit(pVnode->pSma, pReq, STREAM_INPUT__DATA_SUBMIT);
} }
return 0; return 0;

View File

@ -44,12 +44,12 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
// prevent setting a different type of block // prevent setting a different type of block
pInfo->blockType = type; pInfo->blockType = type;
if (type == STREAM_DATA_TYPE_SUBMIT_BLOCK) { if (type == STREAM_INPUT__DATA_SUBMIT) {
if (tqReadHandleSetMsg(pInfo->streamBlockReader, input, 0) < 0) { if (tqReadHandleSetMsg(pInfo->streamBlockReader, input, 0) < 0) {
qError("submit msg messed up when initing stream block, %s" PRIx64, id); qError("submit msg messed up when initing stream block, %s" PRIx64, id);
return TSDB_CODE_QRY_APP_ERROR; return TSDB_CODE_QRY_APP_ERROR;
} }
} else if (type == STREAM_DATA_TYPE_SSDATA_BLOCK) { } else if (type == STREAM_INPUT__DATA_BLOCK) {
for (int32_t i = 0; i < numOfBlocks; ++i) { for (int32_t i = 0; i < numOfBlocks; ++i) {
SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i]; SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i];
@ -60,9 +60,9 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
taosArrayAddAll(p->pDataBlock, pDataBlock->pDataBlock); taosArrayAddAll(p->pDataBlock, pDataBlock->pDataBlock);
taosArrayPush(pInfo->pBlockLists, &p); taosArrayPush(pInfo->pBlockLists, &p);
} }
} else if (type == STREAM_DATA_TYPE_FROM_SNAPSHOT) { } else if (type == STREAM_INPUT__DATA_SCAN) {
// do nothing // do nothing
ASSERT(pInfo->blockType == STREAM_DATA_TYPE_FROM_SNAPSHOT); ASSERT(pInfo->blockType == STREAM_INPUT__DATA_SCAN);
} else { } else {
ASSERT(0); ASSERT(0);
} }
@ -76,7 +76,7 @@ int32_t qStreamScanSnapshot(qTaskInfo_t tinfo) {
return TSDB_CODE_QRY_APP_ERROR; return TSDB_CODE_QRY_APP_ERROR;
} }
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
return doSetStreamBlock(pTaskInfo->pRoot, NULL, 0, STREAM_DATA_TYPE_FROM_SNAPSHOT, 0, NULL); return doSetStreamBlock(pTaskInfo->pRoot, NULL, 0, STREAM_INPUT__DATA_SCAN, 0, NULL);
} }
int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type, bool assignUid) { int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type, bool assignUid) {

View File

@ -962,7 +962,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
size_t total = taosArrayGetSize(pInfo->pBlockLists); size_t total = taosArrayGetSize(pInfo->pBlockLists);
// TODO: refactor // TODO: refactor
if (pInfo->blockType == STREAM_DATA_TYPE_SSDATA_BLOCK) { if (pInfo->blockType == STREAM_INPUT__DATA_BLOCK) {
if (pInfo->validBlockIndex >= total) { if (pInfo->validBlockIndex >= total) {
/*doClearBufferedBlocks(pInfo);*/ /*doClearBufferedBlocks(pInfo);*/
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
@ -973,7 +973,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
SSDataBlock* pBlock = taosArrayGetP(pInfo->pBlockLists, current); SSDataBlock* pBlock = taosArrayGetP(pInfo->pBlockLists, current);
blockDataUpdateTsWindow(pBlock, 0); blockDataUpdateTsWindow(pBlock, 0);
if (pBlock->info.type == STREAM_RETRIEVE) { if (pBlock->info.type == STREAM_RETRIEVE) {
pInfo->blockType = STREAM_DATA_TYPE_SUBMIT_BLOCK; pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RETRIEVE; pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RETRIEVE;
copyDataBlock(pInfo->pPullDataRes, pBlock); copyDataBlock(pInfo->pPullDataRes, pBlock);
pInfo->pullDataResIndex = 0; pInfo->pullDataResIndex = 0;
@ -981,7 +981,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
updateInfoAddCloseWindowSBF(pInfo->pUpdateInfo); updateInfoAddCloseWindowSBF(pInfo->pUpdateInfo);
} }
return pBlock; return pBlock;
} else if (pInfo->blockType == STREAM_DATA_TYPE_SUBMIT_BLOCK) { } else if (pInfo->blockType == STREAM_INPUT__DATA_SUBMIT) {
if (pInfo->scanMode == STREAM_SCAN_FROM_RES) { if (pInfo->scanMode == STREAM_SCAN_FROM_RES) {
blockDataDestroy(pInfo->pUpdateRes); blockDataDestroy(pInfo->pUpdateRes);
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
@ -1133,7 +1133,9 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
return (pBlockInfo->rows == 0) ? NULL : pInfo->pRes; return (pBlockInfo->rows == 0) ? NULL : pInfo->pRes;
} else if (pInfo->blockType == STREAM_DATA_TYPE_FROM_SNAPSHOT) { } else if (pInfo->blockType == STREAM_INPUT__DATA_SCAN) {
// check reader last status
// if not match, reset status
SSDataBlock* pResult = doTableScan(pInfo->pSnapshotReadOp); SSDataBlock* pResult = doTableScan(pInfo->pSnapshotReadOp);
return pResult && pResult->info.rows > 0 ? pResult : NULL; return pResult && pResult->info.rows > 0 ? pResult : NULL;
@ -1213,7 +1215,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
pInfo->tableUid = pScanPhyNode->uid; pInfo->tableUid = pScanPhyNode->uid;
// set the extract column id to streamHandle // set the extract column id to streamHandle
tqReadHandleSetColIdList((STqReadHandle*)pHandle->reader, pColIds); tqReadHandleSetColIdList((SStreamReader*)pHandle->reader, pColIds);
SArray* tableIdList = extractTableIdList(&pTaskInfo->tableqinfoList); SArray* tableIdList = extractTableIdList(&pTaskInfo->tableqinfoList);
int32_t code = tqReadHandleSetTbUidList(pHandle->reader, tableIdList); int32_t code = tqReadHandleSetTbUidList(pHandle->reader, tableIdList);
if (code != 0) { if (code != 0) {

View File

@ -971,8 +971,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
if (pInfo->scalarSupp.pExprInfo != NULL) { if (pInfo->scalarSupp.pExprInfo != NULL) {
SExprSupp* pExprSup = &pInfo->scalarSupp; SExprSupp* pExprSup = &pInfo->scalarSupp;
projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
pExprSup->numOfExprs, NULL);
} }
// the pDataBlock are always the same one, no need to call this again // the pDataBlock are always the same one, no need to call this again
@ -1512,7 +1511,8 @@ void increaseTs(SqlFunctionCtx* pCtx) {
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
STimeWindowAggSupp* pTwAggSupp, SIntervalPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, bool isStream) { STimeWindowAggSupp* pTwAggSupp, SIntervalPhysiNode* pPhyNode,
SExecTaskInfo* pTaskInfo, bool isStream) {
SIntervalAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SIntervalAggOperatorInfo)); SIntervalAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SIntervalAggOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) { if (pInfo == NULL || pOperator == NULL) {
@ -2527,7 +2527,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
break; break;
} }
continue; continue;
} else if (pBlock->info.type == STREAM_PUSH_EMPTY && IS_FINAL_OP(pInfo)) { } else if (pBlock->info.type == STREAM_PUSH_OVER && IS_FINAL_OP(pInfo)) {
processPushEmpty(pBlock, pInfo->pPullDataMap); processPushEmpty(pBlock, pInfo->pPullDataMap);
continue; continue;
} }

View File

@ -1052,7 +1052,8 @@ static int32_t translateUniqueMode(SFunctionNode* pFunc, char* pErrBuf, int32_t
SNode* pPara = nodesListGetNode(pFunc->pParameterList, 0); SNode* pPara = nodesListGetNode(pFunc->pParameterList, 0);
if (!nodesExprHasColumn(pPara)) { if (!nodesExprHasColumn(pPara)) {
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, "The parameters of %s must contain columns", isUnique ? "UNIQUE" : "MODE"); return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, "The parameters of %s must contain columns",
isUnique ? "UNIQUE" : "MODE");
} }
pFunc->node.resType = ((SExprNode*)pPara)->resType; pFunc->node.resType = ((SExprNode*)pPara)->resType;
@ -1639,7 +1640,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{ {
.name = "leastsquares", .name = "leastsquares",
.type = FUNCTION_TYPE_LEASTSQUARES, .type = FUNCTION_TYPE_LEASTSQUARES,
.classification = FUNC_MGT_AGG_FUNC, .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_FORBID_STREAM_FUNC,
.translateFunc = translateLeastSQR, .translateFunc = translateLeastSQR,
.getEnvFunc = getLeastSQRFuncEnv, .getEnvFunc = getLeastSQRFuncEnv,
.initFunc = leastSQRFunctionSetup, .initFunc = leastSQRFunctionSetup,

View File

@ -17,21 +17,20 @@
static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes) { static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes) {
void* exec = pTask->exec.executor; void* exec = pTask->exec.executor;
bool hasData = false;
// set input // set input
SStreamQueueItem* pItem = (SStreamQueueItem*)data; SStreamQueueItem* pItem = (SStreamQueueItem*)data;
if (pItem->type == STREAM_INPUT__TRIGGER) { if (pItem->type == STREAM_INPUT__TRIGGER) {
SStreamTrigger* pTrigger = (SStreamTrigger*)data; SStreamTrigger* pTrigger = (SStreamTrigger*)data;
qSetMultiStreamInput(exec, pTrigger->pBlock, 1, STREAM_DATA_TYPE_SSDATA_BLOCK, false); qSetMultiStreamInput(exec, pTrigger->pBlock, 1, STREAM_INPUT__DATA_BLOCK, false);
} else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) { } else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
ASSERT(pTask->isDataScan); ASSERT(pTask->isDataScan);
SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)data; SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)data;
qSetStreamInput(exec, pSubmit->data, STREAM_DATA_TYPE_SUBMIT_BLOCK, false); qSetStreamInput(exec, pSubmit->data, STREAM_INPUT__DATA_SUBMIT, false);
} else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) { } else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
SStreamDataBlock* pBlock = (SStreamDataBlock*)data; SStreamDataBlock* pBlock = (SStreamDataBlock*)data;
SArray* blocks = pBlock->blocks; SArray* blocks = pBlock->blocks;
qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_DATA_TYPE_SSDATA_BLOCK, false); qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_INPUT__DATA_BLOCK, false);
} else if (pItem->type == STREAM_INPUT__DROP) { } else if (pItem->type == STREAM_INPUT__DROP) {
// TODO exec drop // TODO exec drop
return 0; return 0;
@ -47,19 +46,15 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes)
if (output == NULL) { if (output == NULL) {
if (pItem->type == STREAM_INPUT__DATA_RETRIEVE) { if (pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
SSDataBlock block = {0}; SSDataBlock block = {0};
/*block.info.type = STREAM_PUSH_EMPTY;*/
// block.info.childId = pTask->selfChildId;
SStreamDataBlock* pRetrieveBlock = (SStreamDataBlock*)data; SStreamDataBlock* pRetrieveBlock = (SStreamDataBlock*)data;
ASSERT(taosArrayGetSize(pRetrieveBlock->blocks) == 1); ASSERT(taosArrayGetSize(pRetrieveBlock->blocks) == 1);
/*SSDataBlock* pBlock = createOneDataBlock(taosArrayGet(pRetrieveBlock->blocks, 0), true);*/
assignOneDataBlock(&block, taosArrayGet(pRetrieveBlock->blocks, 0)); assignOneDataBlock(&block, taosArrayGet(pRetrieveBlock->blocks, 0));
block.info.type = STREAM_PUSH_EMPTY; block.info.type = STREAM_PUSH_OVER;
block.info.childId = pTask->selfChildId; block.info.childId = pTask->selfChildId;
taosArrayPush(pRes, &block); taosArrayPush(pRes, &block);
} }
break; break;
} }
hasData = true;
if (output->info.type == STREAM_RETRIEVE) { if (output->info.type == STREAM_RETRIEVE) {
if (streamBroadcastToChildren(pTask, output) < 0) { if (streamBroadcastToChildren(pTask, output) < 0) {
@ -73,9 +68,6 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes)
assignOneDataBlock(&block, output); assignOneDataBlock(&block, output);
block.info.childId = pTask->selfChildId; block.info.childId = pTask->selfChildId;
taosArrayPush(pRes, &block); taosArrayPush(pRes, &block);
/*SSDataBlock* outputCopy = createOneDataBlock(output, true);*/
/*outputCopy->info.childId = pTask->selfChildId;*/
/*taosArrayPush(pRes, outputCopy);*/
} }
return 0; return 0;
} }
@ -164,4 +156,3 @@ FAIL:
atomic_store_8(&pTask->execStatus, TASK_EXEC_STATUS__IDLE); atomic_store_8(&pTask->execStatus, TASK_EXEC_STATUS__IDLE);
return -1; return -1;
} }

View File

@ -86,9 +86,7 @@ int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
ASSERT(pTask->sinkType == TASK_SINK__NONE); ASSERT(pTask->sinkType == TASK_SINK__NONE);
} }
if (pTask->dispatchType == TASK_DISPATCH__INPLACE) { if (pTask->dispatchType == TASK_DISPATCH__FIXED) {
if (tEncodeI32(pEncoder, pTask->inplaceDispatcher.taskId) < 0) return -1;
} else if (pTask->dispatchType == TASK_DISPATCH__FIXED) {
if (tEncodeI32(pEncoder, pTask->fixedEpDispatcher.taskId) < 0) return -1; if (tEncodeI32(pEncoder, pTask->fixedEpDispatcher.taskId) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->fixedEpDispatcher.nodeId) < 0) return -1; if (tEncodeI32(pEncoder, pTask->fixedEpDispatcher.nodeId) < 0) return -1;
if (tEncodeSEpSet(pEncoder, &pTask->fixedEpDispatcher.epSet) < 0) return -1; if (tEncodeSEpSet(pEncoder, &pTask->fixedEpDispatcher.epSet) < 0) return -1;
@ -147,9 +145,7 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
ASSERT(pTask->sinkType == TASK_SINK__NONE); ASSERT(pTask->sinkType == TASK_SINK__NONE);
} }
if (pTask->dispatchType == TASK_DISPATCH__INPLACE) { if (pTask->dispatchType == TASK_DISPATCH__FIXED) {
if (tDecodeI32(pDecoder, &pTask->inplaceDispatcher.taskId) < 0) return -1;
} else if (pTask->dispatchType == TASK_DISPATCH__FIXED) {
if (tDecodeI32(pDecoder, &pTask->fixedEpDispatcher.taskId) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->fixedEpDispatcher.taskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->fixedEpDispatcher.nodeId) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->fixedEpDispatcher.nodeId) < 0) return -1;
if (tDecodeSEpSet(pDecoder, &pTask->fixedEpDispatcher.epSet) < 0) return -1; if (tDecodeSEpSet(pDecoder, &pTask->fixedEpDispatcher.epSet) < 0) return -1;