Merge pull request #12563 from taosdata/feature/stream
enh(tmq): cascade drop
This commit is contained in:
commit
44b75e6dea
|
@ -2097,6 +2097,18 @@ enum {
|
|||
TOPIC_SUB_TYPE__TABLE,
|
||||
};
|
||||
|
||||
typedef struct {
|
||||
SMsgHead head;
|
||||
int64_t leftForVer;
|
||||
int32_t vgId;
|
||||
int64_t consumerId;
|
||||
char subKey[TSDB_SUBSCRIBE_KEY_LEN];
|
||||
} SMqVDeleteReq;
|
||||
|
||||
typedef struct {
|
||||
int8_t reserved;
|
||||
} SMqVDeleteRsp;
|
||||
|
||||
typedef struct {
|
||||
int64_t leftForVer;
|
||||
int32_t vgId;
|
||||
|
@ -2532,11 +2544,15 @@ static FORCE_INLINE void tDeleteSMqAskEpRsp(SMqAskEpRsp* pRsp) {
|
|||
}
|
||||
|
||||
typedef struct {
|
||||
void* data;
|
||||
int64_t streamId;
|
||||
int32_t taskId;
|
||||
int32_t sourceVg;
|
||||
int64_t sourceVer;
|
||||
SArray* data; // SArray<SSDataBlock>
|
||||
} SStreamDispatchReq;
|
||||
|
||||
typedef struct {
|
||||
int8_t status;
|
||||
int8_t inputStatus;
|
||||
} SStreamDispatchRsp;
|
||||
|
||||
#define TD_AUTO_CREATE_TABLE 0x1
|
||||
|
|
|
@ -178,6 +178,7 @@ enum {
|
|||
TD_DEF_MSG_TYPE(TDMT_VND_MQ_CONNECT, "vnode-mq-connect", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_MQ_DISCONNECT, "vnode-mq-disconnect", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_MQ_VG_CHANGE, "vnode-mq-vg-change", SMqRebVgReq, SMqRebVgRsp)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_MQ_VG_DELETE, "vnode-mq-vg-delete", SMqVDeleteReq, SMqVDeleteRsp)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_RES_READY, "vnode-res-ready", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_TASKS_STATUS, "vnode-tasks-status", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_TASK, "vnode-cancel-task", NULL, NULL)
|
||||
|
|
|
@ -281,6 +281,7 @@ int32_t* taosGetErrno();
|
|||
#define TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03E8)
|
||||
#define TSDB_CODE_MND_OFFSET_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03E9)
|
||||
#define TSDB_CODE_MND_CONSUMER_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x03EA)
|
||||
#define TSDB_CODE_MND_TOPIC_SUBSCRIBED TAOS_DEF_ERROR_CODE(0, 0x03EB)
|
||||
|
||||
// mnode-stream
|
||||
#define TSDB_CODE_MND_STREAM_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03F0)
|
||||
|
|
|
@ -216,6 +216,7 @@ SArray *mmGetMsgHandles() {
|
|||
if (dmSetMgmtHandle(pArray, TDMT_MND_MQ_COMMIT_OFFSET, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_MQ_ASK_EP, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_CHANGE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_DELETE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_STREAM, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_DEPLOY_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_GET_DB_CFG, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER;
|
||||
|
|
|
@ -308,6 +308,7 @@ SArray *vmGetMsgHandles() {
|
|||
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_SMA, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_SUBMIT_RSMA, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_CHANGE, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_DELETE, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_CONSUME, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_DEPLOY, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_HEARTBEAT, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER;
|
||||
|
|
|
@ -459,6 +459,7 @@ typedef struct {
|
|||
char* ast;
|
||||
char* physicalPlan;
|
||||
SSchemaWrapper schema;
|
||||
int32_t refConsumerCnt;
|
||||
} SMqTopicObj;
|
||||
|
||||
typedef struct {
|
||||
|
|
|
@ -38,6 +38,9 @@ static FORCE_INLINE int32_t mndMakePartitionKey(char *key, const char *cgroup, c
|
|||
}
|
||||
|
||||
int32_t mndDropOffsetByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb);
|
||||
int32_t mndDropOffsetByTopic(SMnode *pMnode, STrans *pTrans, const char *topic);
|
||||
|
||||
bool mndOffsetFromTopic(SMqOffsetObj *pOffset, const char *topic);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -32,6 +32,7 @@ void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub);
|
|||
int32_t mndMakeSubscribeKey(char *key, const char *cgroup, const char *topicName);
|
||||
|
||||
int32_t mndDropSubByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb);
|
||||
int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topic);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -35,6 +35,8 @@ int32_t mndDropTopicByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb);
|
|||
|
||||
const char *mndTopicGetShowName(const char topic[TSDB_TOPIC_FNAME_LEN]);
|
||||
|
||||
int32_t mndSetTopicRedoLogs(SMnode *pMnode, STrans *pTrans, SMqTopicObj *pTopic);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -399,6 +399,9 @@ static int32_t mndProcessSubscribeReq(SNodeMsg *pMsg) {
|
|||
|
||||
int32_t newTopicNum = taosArrayGetSize(newSub);
|
||||
// check topic existance
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_SUBSCRIBE, &pMsg->rpcMsg);
|
||||
if (pTrans == NULL) goto SUBSCRIBE_OVER;
|
||||
|
||||
for (int32_t i = 0; i < newTopicNum; i++) {
|
||||
char *topic = taosArrayGetP(newSub, i);
|
||||
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
|
||||
|
@ -406,7 +409,14 @@ static int32_t mndProcessSubscribeReq(SNodeMsg *pMsg) {
|
|||
terrno = TSDB_CODE_MND_TOPIC_NOT_EXIST;
|
||||
goto SUBSCRIBE_OVER;
|
||||
}
|
||||
// TODO lock topic to prevent drop
|
||||
|
||||
// ref topic to prevent drop
|
||||
// TODO make topic complete
|
||||
SMqTopicObj topicObj = {0};
|
||||
memcpy(&topicObj, pTopic, sizeof(SMqTopicObj));
|
||||
topicObj.refConsumerCnt = pTopic->refConsumerCnt + 1;
|
||||
if (mndSetTopicRedoLogs(pMnode, pTrans, &topicObj) != 0) goto SUBSCRIBE_OVER;
|
||||
|
||||
mndReleaseTopic(pMnode, pTopic);
|
||||
}
|
||||
|
||||
|
@ -422,8 +432,6 @@ static int32_t mndProcessSubscribeReq(SNodeMsg *pMsg) {
|
|||
taosArrayPush(pConsumerNew->assignedTopics, &newTopicCopy);
|
||||
}
|
||||
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_SUBSCRIBE, &pMsg->rpcMsg);
|
||||
if (pTrans == NULL) goto SUBSCRIBE_OVER;
|
||||
if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto SUBSCRIBE_OVER;
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) goto SUBSCRIBE_OVER;
|
||||
|
||||
|
@ -494,8 +502,6 @@ static int32_t mndProcessSubscribeReq(SNodeMsg *pMsg) {
|
|||
goto SUBSCRIBE_OVER;
|
||||
}
|
||||
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_SUBSCRIBE, &pMsg->rpcMsg);
|
||||
if (pTrans == NULL) goto SUBSCRIBE_OVER;
|
||||
if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto SUBSCRIBE_OVER;
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) goto SUBSCRIBE_OVER;
|
||||
}
|
||||
|
@ -503,6 +509,8 @@ static int32_t mndProcessSubscribeReq(SNodeMsg *pMsg) {
|
|||
code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
||||
|
||||
SUBSCRIBE_OVER:
|
||||
mndTransDrop(pTrans);
|
||||
|
||||
if (pConsumerOld) {
|
||||
/*taosRUnLockLatch(&pConsumerOld->lock);*/
|
||||
mndReleaseConsumer(pMnode, pConsumerOld);
|
||||
|
|
|
@ -50,6 +50,14 @@ int32_t mndInitOffset(SMnode *pMnode) {
|
|||
|
||||
void mndCleanupOffset(SMnode *pMnode) {}
|
||||
|
||||
bool mndOffsetFromTopic(SMqOffsetObj *pOffset, const char *topic) {
|
||||
int32_t i = 0;
|
||||
while (pOffset->key[i] != ':') i++;
|
||||
while (pOffset->key[i] != ':') i++;
|
||||
if (strcmp(&pOffset->key[i + 1], topic) == 0) return true;
|
||||
return false;
|
||||
}
|
||||
|
||||
SSdbRaw *mndOffsetActionEncode(SMqOffsetObj *pOffset) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
void *buf = NULL;
|
||||
|
@ -134,10 +142,11 @@ int32_t mndCreateOffsets(STrans *pTrans, const char *cgroup, const char *topicNa
|
|||
int32_t sz = taosArrayGetSize(vgs);
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
int32_t vgId = *(int32_t *)taosArrayGet(vgs, i);
|
||||
SMqOffsetObj offsetObj;
|
||||
SMqOffsetObj offsetObj = {0};
|
||||
if (mndMakePartitionKey(offsetObj.key, cgroup, topicName, vgId) < 0) {
|
||||
return -1;
|
||||
}
|
||||
// TODO assign db
|
||||
offsetObj.offset = -1;
|
||||
SSdbRaw *pOffsetRaw = mndOffsetActionEncode(&offsetObj);
|
||||
if (pOffsetRaw == NULL) {
|
||||
|
@ -240,6 +249,14 @@ static int32_t mndSetDropOffsetCommitLogs(SMnode *pMnode, STrans *pTrans, SMqOff
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndSetDropOffsetRedoLogs(SMnode *pMnode, STrans *pTrans, SMqOffsetObj *pOffset) {
|
||||
SSdbRaw *pRedoRaw = mndOffsetActionEncode(pOffset);
|
||||
if (pRedoRaw == NULL) return -1;
|
||||
if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1;
|
||||
if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPED) != 0) return -1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t mndDropOffsetByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
|
||||
int32_t code = -1;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
|
@ -247,7 +264,7 @@ int32_t mndDropOffsetByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
|
|||
void *pIter = NULL;
|
||||
SMqOffsetObj *pOffset = NULL;
|
||||
while (1) {
|
||||
pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pOffset);
|
||||
pIter = sdbFetch(pSdb, SDB_OFFSET, pIter, (void **)&pOffset);
|
||||
if (pIter == NULL) break;
|
||||
|
||||
if (pOffset->dbUid != pDb->uid) {
|
||||
|
@ -256,8 +273,39 @@ int32_t mndDropOffsetByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
|
|||
}
|
||||
|
||||
if (mndSetDropOffsetCommitLogs(pMnode, pTrans, pOffset) < 0) {
|
||||
sdbRelease(pSdb, pOffset);
|
||||
goto END;
|
||||
}
|
||||
|
||||
sdbRelease(pSdb, pOffset);
|
||||
}
|
||||
|
||||
code = 0;
|
||||
END:
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t mndDropOffsetByTopic(SMnode *pMnode, STrans *pTrans, const char *topic) {
|
||||
int32_t code = -1;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
|
||||
void *pIter = NULL;
|
||||
SMqOffsetObj *pOffset = NULL;
|
||||
while (1) {
|
||||
pIter = sdbFetch(pSdb, SDB_OFFSET, pIter, (void **)&pOffset);
|
||||
if (pIter == NULL) break;
|
||||
|
||||
if (!mndOffsetFromTopic(pOffset, topic)) {
|
||||
sdbRelease(pSdb, pOffset);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (mndSetDropOffsetRedoLogs(pMnode, pTrans, pOffset) < 0) {
|
||||
sdbRelease(pSdb, pOffset);
|
||||
goto END;
|
||||
}
|
||||
|
||||
sdbRelease(pSdb, pOffset);
|
||||
}
|
||||
|
||||
code = 0;
|
||||
|
|
|
@ -73,6 +73,7 @@ int32_t mndInitSubscribe(SMnode *pMnode) {
|
|||
.deleteFp = (SdbDeleteFp)mndSubActionDelete};
|
||||
|
||||
mndSetMsgHandle(pMnode, TDMT_VND_MQ_VG_CHANGE_RSP, mndProcessSubscribeInternalRsp);
|
||||
mndSetMsgHandle(pMnode, TDMT_VND_MQ_VG_DELETE_RSP, mndProcessSubscribeInternalRsp);
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_MQ_DO_REBALANCE, mndProcessRebalanceReq);
|
||||
|
||||
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_SUBSCRIPTIONS, mndRetrieveSubscribe);
|
||||
|
@ -389,7 +390,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
|
|||
}
|
||||
|
||||
static int32_t mndPersistRebResult(SMnode *pMnode, SNodeMsg *pMsg, const SMqRebOutputObj *pOutput) {
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_REBALANCE, &pMsg->rpcMsg);
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_REBALANCE, &pMsg->rpcMsg);
|
||||
if (pTrans == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
@ -458,6 +459,20 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SNodeMsg *pMsg, const SMqRebO
|
|||
goto REB_FAIL;
|
||||
}
|
||||
}
|
||||
if (consumerNum) {
|
||||
char topic[TSDB_TOPIC_FNAME_LEN];
|
||||
char cgroup[TSDB_CGROUP_LEN];
|
||||
mndSplitSubscribeKey(pOutput->pSub->key, topic, cgroup, true);
|
||||
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
|
||||
if (pTopic) {
|
||||
// TODO make topic complete
|
||||
SMqTopicObj topicObj = {0};
|
||||
memcpy(&topicObj, pTopic, sizeof(SMqTopicObj));
|
||||
topicObj.refConsumerCnt = pTopic->refConsumerCnt - consumerNum;
|
||||
if (mndSetTopicRedoLogs(pMnode, pTrans, &topicObj) != 0) goto REB_FAIL;
|
||||
}
|
||||
}
|
||||
|
||||
// 4. TODO commit log: modification log
|
||||
|
||||
// 5. set cb
|
||||
|
@ -688,6 +703,14 @@ static int32_t mndProcessSubscribeInternalRsp(SNodeMsg *pRsp) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndSetDropSubRedoLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub) {
|
||||
SSdbRaw *pRedoRaw = mndSubActionEncode(pSub);
|
||||
if (pRedoRaw == NULL) return -1;
|
||||
if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1;
|
||||
if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPED) != 0) return -1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndSetDropSubCommitLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub) {
|
||||
SSdbRaw *pCommitRaw = mndSubActionEncode(pSub);
|
||||
if (pCommitRaw == NULL) return -1;
|
||||
|
@ -712,6 +735,57 @@ int32_t mndDropSubByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
|
|||
}
|
||||
|
||||
if (mndSetDropSubCommitLogs(pMnode, pTrans, pSub) < 0) {
|
||||
sdbRelease(pSdb, pSub);
|
||||
goto END;
|
||||
}
|
||||
}
|
||||
|
||||
code = 0;
|
||||
END:
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topicName) {
|
||||
int32_t code = -1;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
|
||||
void *pIter = NULL;
|
||||
SMqSubscribeObj *pSub = NULL;
|
||||
while (1) {
|
||||
pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pSub);
|
||||
if (pIter == NULL) break;
|
||||
|
||||
char topic[TSDB_TOPIC_FNAME_LEN];
|
||||
char cgroup[TSDB_CGROUP_LEN];
|
||||
mndSplitSubscribeKey(pSub->key, topic, cgroup, true);
|
||||
if (strcmp(topic, topicName) != 0) {
|
||||
sdbRelease(pSdb, pSub);
|
||||
continue;
|
||||
}
|
||||
|
||||
// iter all vnode to delete handle
|
||||
ASSERT(taosHashGetSize(pSub->consumerHash) == 0);
|
||||
int32_t sz = taosArrayGetSize(pSub->unassignedVgs);
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
SMqVgEp *pVgEp = taosArrayGetP(pSub->unassignedVgs, i);
|
||||
SMqVDeleteReq *pReq = taosMemoryCalloc(1, sizeof(SMqVDeleteReq));
|
||||
pReq->head.vgId = htonl(pVgEp->vgId);
|
||||
pReq->vgId = pVgEp->vgId;
|
||||
pReq->consumerId = -1;
|
||||
memcpy(pReq->subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
|
||||
STransAction action = {0};
|
||||
action.epSet = pVgEp->epSet;
|
||||
action.pCont = pReq;
|
||||
action.contLen = sizeof(SMqVDeleteReq);
|
||||
action.msgType = TDMT_VND_MQ_VG_DELETE;
|
||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||
taosMemoryFree(pReq);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
if (mndSetDropSubRedoLogs(pMnode, pTrans, pSub) < 0) {
|
||||
sdbRelease(pSdb, pSub);
|
||||
goto END;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,8 +18,10 @@
|
|||
#include "mndDb.h"
|
||||
#include "mndDnode.h"
|
||||
#include "mndMnode.h"
|
||||
#include "mndOffset.h"
|
||||
#include "mndShow.h"
|
||||
#include "mndStb.h"
|
||||
#include "mndSubscribe.h"
|
||||
#include "mndTrans.h"
|
||||
#include "mndUser.h"
|
||||
#include "mndVgroup.h"
|
||||
|
@ -106,6 +108,7 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
|
|||
taosEncodeSSchemaWrapper(&aswBuf, &pTopic->schema);
|
||||
SDB_SET_INT32(pRaw, dataPos, schemaLen, TOPIC_ENCODE_OVER);
|
||||
SDB_SET_BINARY(pRaw, dataPos, swBuf, schemaLen, TOPIC_ENCODE_OVER);
|
||||
SDB_SET_INT32(pRaw, dataPos, pTopic->refConsumerCnt, TOPIC_ENCODE_OVER);
|
||||
|
||||
SDB_SET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_ENCODE_OVER);
|
||||
SDB_SET_DATALEN(pRaw, dataPos, TOPIC_ENCODE_OVER);
|
||||
|
@ -190,6 +193,8 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
|
|||
goto TOPIC_DECODE_OVER;
|
||||
}
|
||||
|
||||
SDB_GET_INT32(pRaw, dataPos, &pTopic->refConsumerCnt, TOPIC_DECODE_OVER);
|
||||
|
||||
SDB_GET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_DECODE_OVER);
|
||||
|
||||
terrno = TSDB_CODE_SUCCESS;
|
||||
|
@ -220,11 +225,13 @@ static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pOldTopic, SMqTopic
|
|||
atomic_exchange_64(&pOldTopic->updateTime, pNewTopic->updateTime);
|
||||
atomic_exchange_32(&pOldTopic->version, pNewTopic->version);
|
||||
|
||||
taosWLockLatch(&pOldTopic->lock);
|
||||
atomic_store_32(&pOldTopic->refConsumerCnt, pNewTopic->refConsumerCnt);
|
||||
|
||||
/*taosWLockLatch(&pOldTopic->lock);*/
|
||||
|
||||
// TODO handle update
|
||||
|
||||
taosWUnLockLatch(&pOldTopic->lock);
|
||||
/*taosWUnLockLatch(&pOldTopic->lock);*/
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -292,6 +299,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SNodeMsg *pReq, SCMCreateTopicReq
|
|||
topicObj.version = 1;
|
||||
topicObj.sql = strdup(pCreate->sql);
|
||||
topicObj.sqlLen = strlen(pCreate->sql) + 1;
|
||||
topicObj.refConsumerCnt = 0;
|
||||
|
||||
if (pCreate->ast && pCreate->ast[0]) {
|
||||
topicObj.ast = strdup(pCreate->ast);
|
||||
|
@ -436,15 +444,7 @@ CREATE_TOPIC_OVER:
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t mndDropTopic(SMnode *pMnode, SNodeMsg *pReq, SMqTopicObj *pTopic) {
|
||||
// TODO: cannot drop when subscribed
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_DROP_TOPIC, &pReq->rpcMsg);
|
||||
if (pTrans == NULL) {
|
||||
mError("topic:%s, failed to drop since %s", pTopic->name, terrstr());
|
||||
return -1;
|
||||
}
|
||||
mDebug("trans:%d, used to drop topic:%s", pTrans->id, pTopic->name);
|
||||
|
||||
static int32_t mndDropTopic(SMnode *pMnode, STrans *pTrans, SNodeMsg *pReq, SMqTopicObj *pTopic) {
|
||||
SSdbRaw *pRedoRaw = mndTopicActionEncode(pTopic);
|
||||
if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) {
|
||||
mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr());
|
||||
|
@ -465,6 +465,7 @@ static int32_t mndDropTopic(SMnode *pMnode, SNodeMsg *pReq, SMqTopicObj *pTopic)
|
|||
|
||||
static int32_t mndProcessDropTopicReq(SNodeMsg *pReq) {
|
||||
SMnode *pMnode = pReq->pNode;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
SMDropTopicReq dropReq = {0};
|
||||
|
||||
if (tDeserializeSMDropTopicReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &dropReq) != 0) {
|
||||
|
@ -485,10 +486,35 @@ static int32_t mndProcessDropTopicReq(SNodeMsg *pReq) {
|
|||
return -1;
|
||||
}
|
||||
}
|
||||
// TODO: check ref
|
||||
|
||||
int32_t code = mndDropTopic(pMnode, pReq, pTopic);
|
||||
// TODO: iterate and drop related subscriptions and offsets
|
||||
// check ref
|
||||
if (pTopic->refConsumerCnt != 0) {
|
||||
terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED;
|
||||
mError("topic:%s, failed to drop since %s", dropReq.name, terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_DROP_TOPIC, &pReq->rpcMsg);
|
||||
if (pTrans == NULL) {
|
||||
mError("topic:%s, failed to drop since %s", pTopic->name, terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
mDebug("trans:%d, used to drop topic:%s", pTrans->id, pTopic->name);
|
||||
|
||||
#if 1
|
||||
if (mndDropOffsetByTopic(pMnode, pTrans, dropReq.name) < 0) {
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
#endif
|
||||
|
||||
if (mndDropSubByTopic(pMnode, pTrans, dropReq.name) < 0) {
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
|
||||
int32_t code = mndDropTopic(pMnode, pTrans, pReq, pTopic);
|
||||
mndReleaseTopic(pMnode, pTopic);
|
||||
|
||||
if (code != 0) {
|
||||
|
@ -577,6 +603,15 @@ static int32_t mndRetrieveTopic(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pB
|
|||
return numOfRows;
|
||||
}
|
||||
|
||||
int32_t mndSetTopicRedoLogs(SMnode *pMnode, STrans *pTrans, SMqTopicObj *pTopic) {
|
||||
SSdbRaw *pRedoRaw = mndTopicActionEncode(pTopic);
|
||||
if (pRedoRaw == NULL) return -1;
|
||||
if (mndTransAppendCommitlog(pTrans, pRedoRaw) != 0) return -1;
|
||||
if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY) != 0) return -1;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndSetDropTopicCommitLogs(SMnode *pMnode, STrans *pTrans, SMqTopicObj *pTopic) {
|
||||
SSdbRaw *pCommitRaw = mndTopicActionEncode(pTopic);
|
||||
if (pCommitRaw == NULL) return -1;
|
||||
|
|
|
@ -116,6 +116,7 @@ void tqClose(STQ*);
|
|||
int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver);
|
||||
int tqCommit(STQ*);
|
||||
int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen);
|
||||
int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen);
|
||||
int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen, int32_t workerId);
|
||||
int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen);
|
||||
int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen, int32_t workerId);
|
||||
|
|
|
@ -861,6 +861,14 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
|||
}
|
||||
#endif
|
||||
|
||||
int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen) {
|
||||
SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
|
||||
|
||||
int32_t code = taosHashRemove(pTq->execs, pReq->subKey, strlen(pReq->subKey));
|
||||
ASSERT(code == 0);
|
||||
return 0;
|
||||
}
|
||||
|
||||
// TODO: persist meta into tdb
|
||||
int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
|
||||
SMqRebVgReq req = {0};
|
||||
|
@ -1087,35 +1095,6 @@ int32_t tqProcessStreamTrigger2(STQ* pTq, SSubmitReq* pReq, int64_t ver) {
|
|||
}
|
||||
|
||||
int32_t tqProcessTaskExec2(STQ* pTq, char* msg, int32_t msgLen) {
|
||||
SStreamTaskExecReq req = {0};
|
||||
tDecodeSStreamTaskExecReq(msg, &req);
|
||||
int32_t taskId = req.taskId;
|
||||
|
||||
SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
|
||||
ASSERT(pTask);
|
||||
ASSERT(pTask->inputType == TASK_INPUT_TYPE__DATA_BLOCK);
|
||||
|
||||
// enqueue
|
||||
int32_t inputStatus = streamEnqueueDataBlk(pTask, (SStreamDataBlock*)req.data);
|
||||
if (inputStatus == TASK_INPUT_STATUS__BLOCKED) {
|
||||
// TODO rsp blocked
|
||||
//
|
||||
return 0;
|
||||
}
|
||||
|
||||
// try exec
|
||||
int8_t execStatus = atomic_val_compare_exchange_8(&pTask->status, TASK_STATUS__IDLE, TASK_STATUS__EXECUTING);
|
||||
if (execStatus == TASK_STATUS__IDLE) {
|
||||
if (streamTaskRun(pTask) < 0) {
|
||||
atomic_store_8(&pTask->status, TASK_STATUS__CLOSING);
|
||||
|
||||
goto FAIL;
|
||||
}
|
||||
} else if (execStatus == TASK_STATUS__EXECUTING) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
// TODO rsp success
|
||||
return 0;
|
||||
FAIL:
|
||||
return -1;
|
||||
}
|
||||
|
|
|
@ -101,6 +101,11 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg
|
|||
// TODO: handle error
|
||||
}
|
||||
break;
|
||||
case TDMT_VND_MQ_VG_DELETE:
|
||||
if (tqProcessVgDeleteReq(pVnode->pTq, pMsg->pCont, pMsg->contLen) < 0) {
|
||||
// TODO: handle error
|
||||
}
|
||||
break;
|
||||
case TDMT_VND_TASK_DEPLOY: {
|
||||
if (tqProcessTaskDeploy(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
|
||||
pMsg->contLen - sizeof(SMsgHead)) < 0) {
|
||||
|
|
|
@ -35,7 +35,7 @@ void* streamDataBlockDecode(const void* buf, SStreamDataBlock* pInput) {
|
|||
return (void*)buf;
|
||||
}
|
||||
|
||||
static int32_t streamBuildDispatchMsg(SStreamTask* pTask, SArray* data, SRpcMsg* pMsg, SEpSet** ppEpSet) {
|
||||
static int32_t streamBuildExecMsg(SStreamTask* pTask, SArray* data, SRpcMsg* pMsg, SEpSet** ppEpSet) {
|
||||
SStreamTaskExecReq req = {
|
||||
.streamId = pTask->streamId,
|
||||
.data = data,
|
||||
|
@ -107,7 +107,7 @@ static int32_t streamShuffleDispatch(SStreamTask* pTask, SMsgCb* pMsgCb, SHashOb
|
|||
SArray* pData = *(SArray**)pIter;
|
||||
SRpcMsg dispatchMsg = {0};
|
||||
SEpSet* pEpSet;
|
||||
if (streamBuildDispatchMsg(pTask, pData, &dispatchMsg, &pEpSet) < 0) {
|
||||
if (streamBuildExecMsg(pTask, pData, &dispatchMsg, &pEpSet) < 0) {
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
|
@ -133,7 +133,7 @@ int32_t streamEnqueueDataBlk(SStreamTask* pTask, SStreamDataBlock* input) {
|
|||
return inputStatus;
|
||||
}
|
||||
|
||||
int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes) {
|
||||
static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes) {
|
||||
void* exec = pTask->exec.runners[0].executor;
|
||||
|
||||
// set input
|
||||
|
@ -265,87 +265,42 @@ FAIL:
|
|||
return -1;
|
||||
}
|
||||
|
||||
int32_t streamTaskDispatchDown(SStreamTask* pTask, SMsgCb* pMsgCb) {
|
||||
//
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t streamTaskSink(SStreamTask* pTask) {
|
||||
//
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t streamTaskProcessInputReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDataBlock* pBlock, SRpcMsg* pRsp) {
|
||||
// 1. handle input
|
||||
// 1.1 enqueue
|
||||
taosWriteQitem(pTask->inputQ, pBlock);
|
||||
// 1.2 calc back pressure
|
||||
// 1.3 rsp by input status
|
||||
int8_t inputStatus = atomic_load_8(&pTask->inputStatus);
|
||||
SStreamDispatchRsp* pCont = rpcMallocCont(sizeof(SStreamDispatchRsp));
|
||||
pCont->status = inputStatus;
|
||||
pRsp->pCont = pCont;
|
||||
pRsp->contLen = sizeof(SStreamDispatchRsp);
|
||||
tmsgSendRsp(pRsp);
|
||||
// 2. try exec
|
||||
// 2.1. idle: exec
|
||||
// 2.2. executing: return
|
||||
// 2.3. closing: keep trying
|
||||
int32_t streamTaskSink(SStreamTask* pTask, SMsgCb* pMsgCb) {
|
||||
bool firstRun = 1;
|
||||
while (1) {
|
||||
int8_t execStatus = atomic_val_compare_exchange_8(&pTask->status, TASK_STATUS__IDLE, TASK_STATUS__EXECUTING);
|
||||
if (execStatus == TASK_STATUS__IDLE) {
|
||||
void* exec = pTask->exec.runners[0].executor;
|
||||
SArray* pRes = taosArrayInit(0, sizeof(void*));
|
||||
const SArray* blocks = pBlock->blocks;
|
||||
qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_DATA_TYPE_SSDATA_BLOCK);
|
||||
while (1) {
|
||||
SSDataBlock* output;
|
||||
uint64_t ts = 0;
|
||||
if (qExecTask(exec, &output, &ts) < 0) {
|
||||
ASSERT(false);
|
||||
SStreamDataBlock* pBlock = NULL;
|
||||
if (!firstRun) {
|
||||
taosReadAllQitems(pTask->outputQ, pTask->outputQAll);
|
||||
}
|
||||
if (output == NULL) break;
|
||||
taosArrayPush(pRes, &output);
|
||||
}
|
||||
// TODO: wrap destroy block
|
||||
taosArrayDestroyP(pBlock->blocks, (FDelete)blockDataDestroy);
|
||||
|
||||
if (taosArrayGetSize(pRes) != 0) {
|
||||
SArray** resQ = taosAllocateQitem(sizeof(void**), DEF_QITEM);
|
||||
*resQ = pRes;
|
||||
taosWriteQitem(pTask->outputQ, resQ);
|
||||
}
|
||||
|
||||
} else if (execStatus == TASK_STATUS__CLOSING) {
|
||||
taosGetQitem(pTask->outputQAll, (void**)&pBlock);
|
||||
if (pBlock == NULL) {
|
||||
if (firstRun) {
|
||||
firstRun = 0;
|
||||
continue;
|
||||
} else if (execStatus == TASK_STATUS__EXECUTING)
|
||||
} else {
|
||||
break;
|
||||
else {
|
||||
ASSERT(0);
|
||||
}
|
||||
}
|
||||
// 3. handle output
|
||||
// 3.1 check and set status
|
||||
// 3.2 dispatch / sink
|
||||
STaosQall* qall = taosAllocateQall();
|
||||
taosReadAllQitems(pTask->outputQ, qall);
|
||||
SArray** ppRes = NULL;
|
||||
while (1) {
|
||||
taosGetQitem(qall, (void**)&ppRes);
|
||||
if (ppRes == NULL) break;
|
||||
|
||||
SArray* pRes = *ppRes;
|
||||
SArray* pRes = pBlock->blocks;
|
||||
|
||||
// sink
|
||||
if (pTask->sinkType == TASK_SINK__TABLE) {
|
||||
pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, pBlock->sourceVer, pRes);
|
||||
// blockDebugShowData(pRes);
|
||||
pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pRes);
|
||||
} else if (pTask->sinkType == TASK_SINK__SMA) {
|
||||
pTask->smaSink.smaSink(pTask->ahandle, pTask->smaSink.smaId, pRes);
|
||||
//
|
||||
} else if (pTask->sinkType == TASK_SINK__FETCH) {
|
||||
//
|
||||
} else {
|
||||
ASSERT(pTask->sinkType == TASK_SINK__NONE);
|
||||
}
|
||||
|
||||
// dispatch
|
||||
if (pTask->dispatchType == TASK_DISPATCH__INPLACE) {
|
||||
SRpcMsg dispatchMsg = {0};
|
||||
if (streamBuildDispatchMsg(pTask, pRes, &dispatchMsg, NULL) < 0) {
|
||||
if (streamBuildExecMsg(pTask, pRes, &dispatchMsg, NULL) < 0) {
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
|
@ -366,7 +321,7 @@ int32_t streamTaskProcessInputReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDat
|
|||
} else if (pTask->dispatchType == TASK_DISPATCH__FIXED) {
|
||||
SRpcMsg dispatchMsg = {0};
|
||||
SEpSet* pEpSet = NULL;
|
||||
if (streamBuildDispatchMsg(pTask, pRes, &dispatchMsg, &pEpSet) < 0) {
|
||||
if (streamBuildExecMsg(pTask, pRes, &dispatchMsg, &pEpSet) < 0) {
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
|
@ -401,12 +356,53 @@ int32_t streamTaskProcessInputReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDat
|
|||
ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE);
|
||||
}
|
||||
}
|
||||
//
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t streamTaskProcessDispatchRsp(SStreamTask* pTask, char* msg, int32_t msgLen) {
|
||||
//
|
||||
int32_t streamTaskEnqueue(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp) {
|
||||
SStreamDataBlock* pBlock = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
|
||||
int8_t status;
|
||||
|
||||
// 1.1 update status
|
||||
// TODO cal backpressure
|
||||
if (pBlock == NULL) {
|
||||
atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED);
|
||||
status = TASK_INPUT_STATUS__FAILED;
|
||||
} else {
|
||||
status = atomic_load_8(&pTask->inputStatus);
|
||||
}
|
||||
|
||||
// 1.2 enqueue
|
||||
pBlock->type = STREAM_DATA_TYPE_SSDATA_BLOCK;
|
||||
pBlock->sourceVg = pReq->sourceVg;
|
||||
pBlock->sourceVer = pReq->sourceVer;
|
||||
taosWriteQitem(pTask->inputQ, pBlock);
|
||||
|
||||
// 1.3 rsp by input status
|
||||
SStreamDispatchRsp* pCont = rpcMallocCont(sizeof(SStreamDispatchRsp));
|
||||
pCont->inputStatus = status;
|
||||
pRsp->pCont = pCont;
|
||||
pRsp->contLen = sizeof(SStreamDispatchRsp);
|
||||
tmsgSendRsp(pRsp);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t streamTaskProcessDispatchReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDispatchReq* pReq, SRpcMsg* pRsp) {
|
||||
// 1. handle input
|
||||
streamTaskEnqueue(pTask, pReq, pRsp);
|
||||
|
||||
// 2. try exec
|
||||
// 2.1. idle: exec
|
||||
// 2.2. executing: return
|
||||
// 2.3. closing: keep trying
|
||||
streamTaskExec2(pTask, pMsgCb);
|
||||
|
||||
// 3. handle output
|
||||
// 3.1 check and set status
|
||||
// 3.2 dispatch / sink
|
||||
streamTaskSink(pTask, pMsgCb);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -415,64 +411,6 @@ int32_t streamTaskProcessRecoverReq(SStreamTask* pTask, char* msg) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t streamTaskRun(SStreamTask* pTask) {
|
||||
SArray* pRes = NULL;
|
||||
if (pTask->execType == TASK_EXEC__PIPE || pTask->execType == TASK_EXEC__MERGE) {
|
||||
// TODO remove multi runner
|
||||
void* exec = pTask->exec.runners[0].executor;
|
||||
|
||||
int8_t status = atomic_val_compare_exchange_8(&pTask->status, TASK_STATUS__IDLE, TASK_STATUS__EXECUTING);
|
||||
if (status == TASK_STATUS__IDLE) {
|
||||
pRes = taosArrayInit(0, sizeof(void*));
|
||||
if (pRes == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
void* input = NULL;
|
||||
taosWriteQitem(pTask->inputQ, &input);
|
||||
if (input == NULL) return 0;
|
||||
|
||||
// TODO: fix type
|
||||
if (pTask->sourceType == TASK_SOURCE__SCAN) {
|
||||
SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)input;
|
||||
qSetStreamInput(exec, pSubmit->data, STREAM_DATA_TYPE_SUBMIT_BLOCK);
|
||||
while (1) {
|
||||
SSDataBlock* output;
|
||||
uint64_t ts = 0;
|
||||
if (qExecTask(exec, &output, &ts) < 0) {
|
||||
ASSERT(false);
|
||||
}
|
||||
if (output == NULL) break;
|
||||
taosArrayPush(pRes, &output);
|
||||
}
|
||||
streamDataSubmitRefDec(pSubmit);
|
||||
} else {
|
||||
SStreamDataBlock* pStreamBlock = (SStreamDataBlock*)input;
|
||||
const SArray* blocks = pStreamBlock->blocks;
|
||||
qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_DATA_TYPE_SSDATA_BLOCK);
|
||||
while (1) {
|
||||
SSDataBlock* output;
|
||||
uint64_t ts = 0;
|
||||
if (qExecTask(exec, &output, &ts) < 0) {
|
||||
ASSERT(false);
|
||||
}
|
||||
if (output == NULL) break;
|
||||
taosArrayPush(pRes, &output);
|
||||
}
|
||||
// TODO: wrap destroy block
|
||||
taosArrayDestroyP(pStreamBlock->blocks, (FDelete)blockDataDestroy);
|
||||
}
|
||||
|
||||
if (taosArrayGetSize(pRes) != 0) {
|
||||
SArray** resQ = taosAllocateQitem(sizeof(void**), DEF_QITEM);
|
||||
*resQ = pRes;
|
||||
taosWriteQitem(pTask->outputQ, resQ);
|
||||
}
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, int32_t inputType, int32_t workId) {
|
||||
SArray* pRes = NULL;
|
||||
// source
|
||||
|
@ -545,7 +483,7 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in
|
|||
|
||||
if (pTask->dispatchType == TASK_DISPATCH__INPLACE) {
|
||||
SRpcMsg dispatchMsg = {0};
|
||||
if (streamBuildDispatchMsg(pTask, pRes, &dispatchMsg, NULL) < 0) {
|
||||
if (streamBuildExecMsg(pTask, pRes, &dispatchMsg, NULL) < 0) {
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
|
@ -566,7 +504,7 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in
|
|||
} else if (pTask->dispatchType == TASK_DISPATCH__FIXED) {
|
||||
SRpcMsg dispatchMsg = {0};
|
||||
SEpSet* pEpSet = NULL;
|
||||
if (streamBuildDispatchMsg(pTask, pRes, &dispatchMsg, &pEpSet) < 0) {
|
||||
if (streamBuildExecMsg(pTask, pRes, &dispatchMsg, &pEpSet) < 0) {
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
|
|
|
@ -285,6 +285,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TOPIC_QUERY, "Topic with invalid qu
|
|||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TOPIC_OPTION, "Topic with invalid option")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_CONSUMER_NOT_EXIST, "Consumer not exist")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_CONSUMER_NOT_READY, "Consumer waiting for rebalance")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOPIC_SUBSCRIBED, "Topic subscribed cannot be dropped")
|
||||
|
||||
// mnode-sma
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SMA_ALREADY_EXIST, "SMA already exists")
|
||||
|
|
Loading…
Reference in New Issue