Merge pull request #14046 from taosdata/feature/stream

feat(stream): drop stream
This commit is contained in:
Liu Jicong 2022-06-21 15:18:30 +08:00 committed by GitHub
commit 1644294e25
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 427 additions and 147 deletions

View File

@ -252,6 +252,16 @@ DLL_EXPORT void tmq_conf_set_auto_commit_cb(tmq_conf_t *conf, tmq_comm
/* -------------------------TMQ MSG HANDLE INTERFACE---------------------- */
enum tmq_res_t {
TMQ_RES_INVALID = -1,
TMQ_RES_DATA = 1,
TMQ_RES_TABLE_META = 2,
};
typedef enum tmq_res_t tmq_res_t;
DLL_EXPORT tmq_res_t tmq_get_res_type(TAOS_RES *res);
DLL_EXPORT int32_t tmq_get_raw_meta(TAOS_RES *res, const void **raw_meta, int32_t *raw_meta_len);
DLL_EXPORT const char *tmq_get_topic_name(TAOS_RES *res);
DLL_EXPORT const char *tmq_get_db_name(TAOS_RES *res);
DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res);

View File

@ -34,6 +34,7 @@ enum {
enum {
TMQ_MSG_TYPE__DUMMY = 0,
TMQ_MSG_TYPE__POLL_RSP,
TMQ_MSG_TYPE__POLL_META_RSP,
TMQ_MSG_TYPE__EP_RSP,
TMQ_MSG_TYPE__END_RSP,
};

View File

@ -1560,6 +1560,7 @@ typedef struct {
char name[TSDB_TOPIC_FNAME_LEN]; // accout.topic
int8_t igExists;
int8_t subType;
int8_t withMeta;
char* sql;
char subDbName[TSDB_DB_FNAME_LEN];
union {
@ -2306,6 +2307,7 @@ typedef struct {
int64_t newConsumerId;
char subKey[TSDB_SUBSCRIBE_KEY_LEN];
int8_t subType;
int8_t withMeta;
char* qmsg;
int64_t suid;
} SMqRebVgReq;
@ -2318,6 +2320,7 @@ static FORCE_INLINE int32_t tEncodeSMqRebVgReq(void** buf, const SMqRebVgReq* pR
tlen += taosEncodeFixedI64(buf, pReq->newConsumerId);
tlen += taosEncodeString(buf, pReq->subKey);
tlen += taosEncodeFixedI8(buf, pReq->subType);
tlen += taosEncodeFixedI8(buf, pReq->withMeta);
if (pReq->subType == TOPIC_SUB_TYPE__COLUMN) {
tlen += taosEncodeString(buf, pReq->qmsg);
} else if (pReq->subType == TOPIC_SUB_TYPE__TABLE) {
@ -2333,6 +2336,7 @@ static FORCE_INLINE void* tDecodeSMqRebVgReq(const void* buf, SMqRebVgReq* pReq)
buf = taosDecodeFixedI64(buf, &pReq->newConsumerId);
buf = taosDecodeStringTo(buf, pReq->subKey);
buf = taosDecodeFixedI8(buf, &pReq->subType);
buf = taosDecodeFixedI8(buf, &pReq->withMeta);
if (pReq->subType == TOPIC_SUB_TYPE__COLUMN) {
buf = taosDecodeString(buf, &pReq->qmsg);
} else if (pReq->subType == TOPIC_SUB_TYPE__TABLE) {
@ -2677,6 +2681,34 @@ static FORCE_INLINE void tDeleteSMqSubTopicEp(SMqSubTopicEp* pSubTopicEp) {
taosArrayDestroy(pSubTopicEp->vgs);
}
typedef struct {
SMqRspHead head;
int64_t reqOffset;
int64_t rspOffset;
int16_t resMsgType;
int32_t metaRspLen;
void* metaRsp;
} SMqMetaRsp;
static FORCE_INLINE int32_t tEncodeSMqMetaRsp(void** buf, const SMqMetaRsp* pRsp) {
int32_t tlen = 0;
tlen += taosEncodeFixedI64(buf, pRsp->reqOffset);
tlen += taosEncodeFixedI64(buf, pRsp->rspOffset);
tlen += taosEncodeFixedI16(buf, pRsp->resMsgType);
tlen += taosEncodeFixedI32(buf, pRsp->metaRspLen);
tlen += taosEncodeBinary(buf, pRsp->metaRsp, pRsp->metaRspLen);
return tlen;
}
static FORCE_INLINE void* tDecodeSMqMetaRsp(const void* buf, SMqMetaRsp* pRsp) {
buf = taosDecodeFixedI64(buf, &pRsp->reqOffset);
buf = taosDecodeFixedI64(buf, &pRsp->rspOffset);
buf = taosDecodeFixedI16(buf, &pRsp->resMsgType);
buf = taosDecodeFixedI32(buf, &pRsp->metaRspLen);
buf = taosDecodeBinary(buf, &pRsp->metaRsp, pRsp->metaRspLen);
return (void*)buf;
}
typedef struct {
SMqRspHead head;
int64_t reqOffset;

View File

@ -187,7 +187,6 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_CONSUME, "vnode-consume", SMqPollReq, SMqDataBlkRsp)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRIGGER, "vnode-stream-trigger", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_DISPATCH_WRITE, "vnode-stream-task-dispatch-write", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TASK_DROP, "vnode-stream-task-drop", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_CREATE_SMA, "vnode-create-sma", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_SMA, "vnode-cancel-sma", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_DROP_SMA, "vnode-drop-sma", NULL, NULL)
@ -204,6 +203,7 @@ enum {
//shared by snode and vnode
TD_NEW_MSG_SEG(TDMT_STREAM_MSG)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_DEPLOY, "stream-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_DROP, "stream-task-drop", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_RUN, "stream-task-run", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_DISPATCH, "stream-task-dispatch", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_RECOVER, "stream-task-recover", NULL, NULL)

View File

@ -30,9 +30,14 @@ extern "C" {
typedef struct SStreamTask SStreamTask;
enum {
TASK_STATUS__IDLE = 1,
TASK_STATUS__EXECUTING,
TASK_STATUS__CLOSING,
TASK_STATUS__NORMAL = 0,
TASK_STATUS__DROPPING,
};
enum {
TASK_EXEC_STATUS__IDLE = 1,
TASK_EXEC_STATUS__EXECUTING,
TASK_EXEC_STATUS__CLOSING,
};
enum {
@ -50,16 +55,12 @@ enum {
TASK_OUTPUT_STATUS__BLOCKED,
};
enum {
STREAM_CREATED_BY__USER = 1,
STREAM_CREATED_BY__SMA,
};
enum {
STREAM_INPUT__DATA_SUBMIT = 1,
STREAM_INPUT__DATA_BLOCK,
STREAM_INPUT__TRIGGER,
STREAM_INPUT__CHECKPOINT,
STREAM_INPUT__DROP,
};
typedef struct {
@ -237,7 +238,9 @@ struct SStreamTask {
int64_t streamId;
int32_t taskId;
int8_t inputType;
int8_t status;
int8_t taskStatus;
int8_t execStatus;
int8_t execType;
int8_t sinkType;

View File

@ -20,9 +20,9 @@
extern "C" {
#endif
#include "catalog.h"
#include "parser.h"
#include "planner.h"
#include "catalog.h"
#include "query.h"
#include "taos.h"
#include "tcommon.h"
@ -51,10 +51,12 @@ extern "C" {
enum {
RES_TYPE__QUERY = 1,
RES_TYPE__TMQ,
RES_TYPE__TMQ_META,
};
#define TD_RES_QUERY(res) (*(int8_t*)res == RES_TYPE__QUERY)
#define TD_RES_TMQ(res) (*(int8_t*)res == RES_TYPE__TMQ)
#define TD_RES_QUERY(res) (*(int8_t*)res == RES_TYPE__QUERY)
#define TD_RES_TMQ(res) (*(int8_t*)res == RES_TYPE__TMQ)
#define TD_RES_TMQ_META(res) (*(int8_t*)res == RES_TYPE__TMQ_META)
typedef struct SAppInstInfo SAppInstInfo;
@ -66,9 +68,9 @@ typedef struct {
int64_t reportBytes; // not implemented
int64_t startTime;
// ctl
SRWLatch lock; // lock is used in serialization
SRWLatch lock; // lock is used in serialization
SAppInstInfo* pAppInstInfo;
SHashObj* activeInfo; // hash<SClientHbKey, SClientHbReq>
SHashObj* activeInfo; // hash<SClientHbKey, SClientHbReq>
} SAppHbMgr;
typedef int32_t (*FHbRspHandle)(SAppHbMgr* pAppHbMgr, SClientHbRsp* pRsp);
@ -76,13 +78,13 @@ typedef int32_t (*FHbRspHandle)(SAppHbMgr* pAppHbMgr, SClientHbRsp* pRsp);
typedef int32_t (*FHbReqHandle)(SClientHbKey* connKey, void* param, SClientHbReq* req);
typedef struct {
int8_t inited;
int64_t appId;
int8_t inited;
int64_t appId;
// ctl
int8_t threadStop;
TdThread thread;
TdThreadMutex lock; // used when app init and cleanup
SHashObj *appSummary;
TdThreadMutex lock; // used when app init and cleanup
SHashObj* appSummary;
SArray* appHbMgrs; // SArray<SAppHbMgr*> one for each cluster
FHbReqHandle reqHandle[CONN_TYPE__MAX];
FHbRspHandle rspHandle[CONN_TYPE__MAX];
@ -129,7 +131,7 @@ typedef struct STscObj {
int8_t connType;
int32_t acctId;
uint32_t connId;
TAOS *id; // ref ID returned by taosAddRef
TAOS* id; // ref ID returned by taosAddRef
TdThreadMutex mutex; // used to protect the operation on db
int32_t numOfReqs; // number of sqlObj bound to this connection
SAppInstInfo* pAppInfo;
@ -188,6 +190,14 @@ typedef struct {
SReqResultInfo resInfo;
} SMqRspObj;
typedef struct {
int8_t resType;
char topic[TSDB_TOPIC_FNAME_LEN];
char db[TSDB_DB_FNAME_LEN];
int32_t vgId;
SMqMetaRsp metaRsp;
} SMqMetaRspObj;
typedef struct SRequestObj {
int8_t resType; // query or tmq
uint64_t requestId;
@ -206,9 +216,9 @@ typedef struct SRequestObj {
SRequestSendRecvBody body;
bool stableQuery;
bool killed;
uint32_t prevCode; //previous error code: todo refactor, add update flag for catalog
uint32_t retry;
bool killed;
uint32_t prevCode; // previous error code: todo refactor, add update flag for catalog
uint32_t retry;
} SRequestObj;
typedef struct SSyncQueryParam {
@ -216,15 +226,15 @@ typedef struct SSyncQueryParam {
SRequestObj* pRequest;
} SSyncQueryParam;
void* doAsyncFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4);
void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4);
void* doAsyncFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4);
void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4);
void doSetOneRowPtr(SReqResultInfo* pResultInfo);
void setResPrecision(SReqResultInfo* pResInfo, int32_t precision);
int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4,
bool freeAfterUse);
void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols);
void doFreeReqResultInfo(SReqResultInfo* pResInfo);
void doSetOneRowPtr(SReqResultInfo* pResultInfo);
void setResPrecision(SReqResultInfo* pResInfo, int32_t precision);
int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4,
bool freeAfterUse);
void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols);
void doFreeReqResultInfo(SReqResultInfo* pResInfo);
SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen);
static FORCE_INLINE SReqResultInfo* tmqGetCurResInfo(TAOS_RES* res) {
@ -289,7 +299,7 @@ bool persistConnForSpecificMsg(void* parenct, tmsg_t msgType);
void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet);
STscObj* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db,
uint16_t port, int connType);
uint16_t port, int connType);
SRequestObj* launchQuery(STscObj* pTscObj, const char* sql, int sqlLen);
@ -299,7 +309,7 @@ int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArra
int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj** pRequest);
void taos_close_internal(void *taos);
void taos_close_internal(void* taos);
// --- heartbeat
// global, called by mgmt
@ -320,12 +330,12 @@ void hbMgrInitMqHbRspHandle();
SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQuery, void** res);
int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList);
void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData *pResultMeta);
void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultMeta);
int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest);
int32_t updateQnodeList(SAppInstInfo* pInfo, SArray* pNodeList);
void doAsyncQuery(SRequestObj* pRequest, bool forceUpdateMeta);
int32_t removeMeta(STscObj* pTscObj, SArray* tbList);// todo move to clientImpl.c and become a static function
int32_t handleAlterTbExecRes(void* res, struct SCatalog* pCatalog);// todo move to xxx
int32_t removeMeta(STscObj* pTscObj, SArray* tbList); // todo move to clientImpl.c and become a static function
int32_t handleAlterTbExecRes(void* res, struct SCatalog* pCatalog); // todo move to xxx
bool qnodeRequired(SRequestObj* pRequest);
#ifdef __cplusplus

View File

@ -17,6 +17,7 @@
#include "clientInt.h"
#include "clientLog.h"
#include "clientStmt.h"
#include "functionMgt.h"
#include "os.h"
#include "query.h"
#include "scheduler.h"
@ -25,7 +26,6 @@
#include "tref.h"
#include "trpc.h"
#include "version.h"
#include "functionMgt.h"
#define TSC_VAR_NOT_RELEASE 1
#define TSC_VAR_RELEASED 0
@ -97,11 +97,11 @@ TAOS *taos_connect(const char *ip, const char *user, const char *pass, const cha
pass = TSDB_DEFAULT_PASS;
}
STscObj* pObj = taos_connect_internal(ip, user, pass, NULL, db, port, CONN_TYPE__QUERY);
STscObj *pObj = taos_connect_internal(ip, user, pass, NULL, db, port, CONN_TYPE__QUERY);
if (pObj) {
return pObj->id;
}
return NULL;
}
@ -111,41 +111,40 @@ void taos_close_internal(void *taos) {
}
STscObj *pTscObj = (STscObj *)taos;
tscDebug("0x%" PRIx64 " try to close connection, numOfReq:%d", *(int64_t*)pTscObj->id, pTscObj->numOfReqs);
tscDebug("0x%" PRIx64 " try to close connection, numOfReq:%d", *(int64_t *)pTscObj->id, pTscObj->numOfReqs);
taosRemoveRef(clientConnRefPool, *(int64_t*)pTscObj->id);
taosRemoveRef(clientConnRefPool, *(int64_t *)pTscObj->id);
}
void taos_close(TAOS *taos) {
if (taos == NULL) {
return;
}
STscObj* pObj = acquireTscObj(*(int64_t*)taos);
STscObj *pObj = acquireTscObj(*(int64_t *)taos);
if (NULL == pObj) {
return;
}
taos_close_internal(pObj);
releaseTscObj(*(int64_t*)taos);
releaseTscObj(*(int64_t *)taos);
taosMemoryFree(taos);
}
int taos_errno(TAOS_RES *tres) {
if (tres == NULL) {
int taos_errno(TAOS_RES *res) {
if (res == NULL || TD_RES_TMQ_META(res)) {
return terrno;
}
if (TD_RES_TMQ(tres)) {
if (TD_RES_TMQ(res)) {
return 0;
}
return ((SRequestObj *)tres)->code;
return ((SRequestObj *)res)->code;
}
const char *taos_errstr(TAOS_RES *res) {
if (res == NULL) {
if (res == NULL || TD_RES_TMQ_META(res)) {
return (const char *)tstrerror(terrno);
}
@ -179,11 +178,15 @@ void taos_free_result(TAOS_RES *res) {
if (pRsp->rsp.withSchema) taosArrayDestroyP(pRsp->rsp.blockSchema, (FDelete)tDeleteSSchemaWrapper);
pRsp->resInfo.pRspMsg = NULL;
doFreeReqResultInfo(&pRsp->resInfo);
} else if (TD_RES_TMQ_META(res)) {
SMqMetaRspObj *pRspObj = (SMqMetaRspObj *)res;
taosMemoryFree(pRspObj->metaRsp.metaRsp);
taosMemoryFree(pRspObj);
}
}
int taos_field_count(TAOS_RES *res) {
if (res == NULL) {
if (res == NULL || TD_RES_TMQ_META(res)) {
return 0;
}
@ -194,7 +197,7 @@ int taos_field_count(TAOS_RES *res) {
int taos_num_fields(TAOS_RES *res) { return taos_field_count(res); }
TAOS_FIELD *taos_fetch_fields(TAOS_RES *res) {
if (taos_num_fields(res) == 0) {
if (taos_num_fields(res) == 0 || TD_RES_TMQ_META(res)) {
return NULL;
}
@ -215,8 +218,8 @@ TAOS_RES *taos_query(TAOS *taos, const char *sql) {
terrno = TSDB_CODE_TSC_DISCONNECTED;
return NULL;
}
STscObj* pTscObj = acquireTscObj(*(int64_t*)taos);
STscObj *pTscObj = acquireTscObj(*(int64_t *)taos);
if (pTscObj == NULL || sql == NULL) {
terrno = TSDB_CODE_TSC_DISCONNECTED;
return NULL;
@ -229,21 +232,21 @@ TAOS_RES *taos_query(TAOS *taos, const char *sql) {
taos_query_a(taos, sql, syncQueryFn, param);
tsem_wait(&param->sem);
releaseTscObj(*(int64_t*)taos);
releaseTscObj(*(int64_t *)taos);
return param->pRequest;
#else
size_t sqlLen = strlen(sql);
if (sqlLen > (size_t)TSDB_MAX_ALLOWED_SQL_LEN) {
releaseTscObj(*(int64_t*)taos);
releaseTscObj(*(int64_t *)taos);
tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN);
terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
return NULL;
}
TAOS_RES* pRes = execQuery(pTscObj, sql, sqlLen);
TAOS_RES *pRes = execQuery(pTscObj, sql, sqlLen);
releaseTscObj(*(int64_t*)taos);
releaseTscObj(*(int64_t *)taos);
return pRes;
#endif
@ -380,7 +383,7 @@ int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields)
}
int *taos_fetch_lengths(TAOS_RES *res) {
if (res == NULL) {
if (res == NULL || TD_RES_TMQ_META(res)) {
return NULL;
}
@ -389,7 +392,7 @@ int *taos_fetch_lengths(TAOS_RES *res) {
}
TAOS_ROW *taos_result_block(TAOS_RES *res) {
if (res == NULL) {
if (res == NULL || TD_RES_TMQ_META(res)) {
terrno = TSDB_CODE_INVALID_PARA;
return NULL;
}
@ -438,7 +441,7 @@ const char *taos_data_type(int type) {
const char *taos_get_client_info() { return version; }
int taos_affected_rows(TAOS_RES *res) {
if (res == NULL || TD_RES_TMQ(res)) {
if (res == NULL || TD_RES_TMQ(res) || TD_RES_TMQ_META(res)) {
return 0;
}
@ -448,7 +451,7 @@ int taos_affected_rows(TAOS_RES *res) {
}
int taos_result_precision(TAOS_RES *res) {
if (res == NULL) {
if (res == NULL || TD_RES_TMQ_META(res)) {
return TSDB_TIME_PRECISION_MILLI;
}
@ -463,15 +466,15 @@ int taos_result_precision(TAOS_RES *res) {
}
int taos_select_db(TAOS *taos, const char *db) {
STscObj* pObj = acquireTscObj(*(int64_t*)taos);
STscObj *pObj = acquireTscObj(*(int64_t *)taos);
if (pObj == NULL) {
releaseTscObj(*(int64_t*)taos);
releaseTscObj(*(int64_t *)taos);
terrno = TSDB_CODE_TSC_DISCONNECTED;
return TSDB_CODE_TSC_DISCONNECTED;
}
if (db == NULL || strlen(db) == 0) {
releaseTscObj(*(int64_t*)taos);
releaseTscObj(*(int64_t *)taos);
terrno = TSDB_CODE_TSC_INVALID_INPUT;
return terrno;
}
@ -483,19 +486,19 @@ int taos_select_db(TAOS *taos, const char *db) {
int32_t code = taos_errno(pRequest);
taos_free_result(pRequest);
releaseTscObj(*(int64_t*)taos);
releaseTscObj(*(int64_t *)taos);
return code;
}
void taos_stop_query(TAOS_RES *res) {
if (res == NULL) {
if (res == NULL || TD_RES_TMQ(res) || TD_RES_TMQ_META(res)) {
return;
}
SRequestObj *pRequest = (SRequestObj *)res;
pRequest->killed = true;
int32_t numOfFields = taos_num_fields(pRequest);
int32_t numOfFields = taos_num_fields(pRequest);
// It is not a query, no need to stop.
if (numOfFields == 0) {
tscDebug("request %" PRIx64 " no need to be killed since not query", pRequest->requestId);
@ -510,6 +513,9 @@ void taos_stop_query(TAOS_RES *res) {
}
bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col) {
if (res == NULL || TD_RES_TMQ_META(res)) {
return true;
}
SReqResultInfo *pResultInfo = tscGetCurResInfo(res);
if (col >= pResultInfo->numOfCols || col < 0 || row >= pResultInfo->numOfRows || row < 0) {
return true;
@ -532,7 +538,7 @@ int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) {
}
int taos_fetch_block_s(TAOS_RES *res, int *numOfRows, TAOS_ROW *rows) {
if (res == NULL) {
if (res == NULL || TD_RES_TMQ_META(res)) {
return 0;
}
@ -575,7 +581,7 @@ int taos_fetch_block_s(TAOS_RES *res, int *numOfRows, TAOS_ROW *rows) {
}
int taos_fetch_raw_block(TAOS_RES *res, int *numOfRows, void **pData) {
if (res == NULL) {
if (res == NULL || TD_RES_TMQ_META(res)) {
return 0;
}
@ -615,7 +621,7 @@ int taos_fetch_raw_block(TAOS_RES *res, int *numOfRows, void **pData) {
}
int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex) {
if (res == NULL) {
if (res == NULL || TD_RES_TMQ_META(res)) {
return 0;
}
@ -636,7 +642,7 @@ int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex) {
int taos_validate_sql(TAOS *taos, const char *sql) { return true; }
void taos_reset_current_db(TAOS *taos) {
STscObj* pTscObj = acquireTscObj(*(int64_t*)taos);
STscObj *pTscObj = acquireTscObj(*(int64_t *)taos);
if (pTscObj == NULL) {
terrno = TSDB_CODE_TSC_DISCONNECTED;
return;
@ -644,17 +650,17 @@ void taos_reset_current_db(TAOS *taos) {
resetConnectDB(pTscObj);
releaseTscObj(*(int64_t*)taos);
releaseTscObj(*(int64_t *)taos);
}
const char *taos_get_server_info(TAOS *taos) {
STscObj* pTscObj = acquireTscObj(*(int64_t*)taos);
STscObj *pTscObj = acquireTscObj(*(int64_t *)taos);
if (pTscObj == NULL) {
terrno = TSDB_CODE_TSC_DISCONNECTED;
return NULL;
}
releaseTscObj(*(int64_t*)taos);
releaseTscObj(*(int64_t *)taos);
return pTscObj->ver;
}
@ -682,7 +688,7 @@ static void destorySqlParseWrapper(SqlParseWrapper *pWrapper) {
void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) {
tscDebug("enter meta callback, code %s", tstrerror(code));
SqlParseWrapper *pWrapper = (SqlParseWrapper *)param;
SQuery *pQuery = pWrapper->pQuery;
SRequestObj *pRequest = pWrapper->pRequest;
@ -723,11 +729,11 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) {
}
void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param) {
STscObj* pTscObj = acquireTscObj(*(int64_t*)taos);
STscObj *pTscObj = acquireTscObj(*(int64_t *)taos);
if (pTscObj == NULL || sql == NULL || NULL == fp) {
terrno = TSDB_CODE_INVALID_PARA;
if (pTscObj) {
releaseTscObj(*(int64_t*)taos);
releaseTscObj(*(int64_t *)taos);
} else {
terrno = TSDB_CODE_TSC_DISCONNECTED;
}
@ -745,7 +751,7 @@ void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param
}
SRequestObj *pRequest = NULL;
int32_t code = buildRequest(pTscObj, sql, sqlLen, &pRequest);
int32_t code = buildRequest(pTscObj, sql, sqlLen, &pRequest);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
fp(param, NULL, terrno);
@ -849,8 +855,8 @@ static void fetchCallback(void *pResult, void *param, int32_t code) {
SReqResultInfo *pResultInfo = &pRequest->body.resInfo;
tscDebug("0x%" PRIx64 " enter scheduler fetch cb, code:%d - %s, reqId:0x%" PRIx64,
pRequest->self, code, tstrerror(code), pRequest->requestId);
tscDebug("0x%" PRIx64 " enter scheduler fetch cb, code:%d - %s, reqId:0x%" PRIx64, pRequest->self, code,
tstrerror(code), pRequest->requestId);
pResultInfo->pData = pResult;
pResultInfo->numOfRows = 0;
@ -884,6 +890,7 @@ static void fetchCallback(void *pResult, void *param, int32_t code) {
void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
ASSERT(res != NULL && fp != NULL);
ASSERT(TD_RES_QUERY(res));
SRequestObj *pRequest = res;
pRequest->body.fetchFp = fp;
@ -910,6 +917,7 @@ void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
void taos_fetch_raw_block_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
ASSERT(res != NULL && fp != NULL);
ASSERT(TD_RES_QUERY(res));
SRequestObj *pRequest = res;
pRequest->body.resInfo.convertUcs4 = false;
@ -923,6 +931,7 @@ void taos_fetch_raw_block_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
const void *taos_get_raw_block(TAOS_RES *res) {
ASSERT(res != NULL);
ASSERT(TD_RES_QUERY(res));
SRequestObj *pRequest = res;
return pRequest->body.resInfo.pData;
@ -949,16 +958,16 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
}
TAOS_STMT *taos_stmt_init(TAOS *taos) {
STscObj* pObj = acquireTscObj(*(int64_t*)taos);
STscObj *pObj = acquireTscObj(*(int64_t *)taos);
if (NULL == pObj) {
tscError("invalid parameter for %s", __FUNCTION__);
terrno = TSDB_CODE_TSC_DISCONNECTED;
return NULL;
}
TAOS_STMT* pStmt = stmtInit(pObj);
releaseTscObj(*(int64_t*)taos);
TAOS_STMT *pStmt = stmtInit(pObj);
releaseTscObj(*(int64_t *)taos);
return pStmt;
}

View File

@ -149,7 +149,10 @@ typedef struct {
int32_t epoch;
SMqClientVg* vgHandle;
SMqClientTopic* topicHandle;
SMqDataBlkRsp msg;
union {
SMqDataBlkRsp dataRsp;
SMqMetaRsp metaRsp;
};
} SMqPollRspWrapper;
typedef struct {
@ -1131,6 +1134,11 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
tscWarn("mismatch rsp from vg %d, epoch %d, current epoch %d", vgId, msgEpoch, tmqEpoch);
}
// handle meta rsp
int8_t rspType = ((SMqRspHead*)pMsg->pData)->mqMsgType;
if (rspType == TMQ_MSG_TYPE__POLL_META_RSP) {
}
SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM);
if (pRspWrapper == NULL) {
taosMemoryFree(pMsg->pData);
@ -1138,17 +1146,23 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
goto CREATE_MSG_FAIL;
}
pRspWrapper->tmqRspType = TMQ_MSG_TYPE__POLL_RSP;
pRspWrapper->tmqRspType = rspType;
pRspWrapper->vgHandle = pVg;
pRspWrapper->topicHandle = pTopic;
memcpy(&pRspWrapper->msg, pMsg->pData, sizeof(SMqRspHead));
memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead));
if (rspType == TMQ_MSG_TYPE__POLL_RSP) {
tDecodeSMqDataBlkRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRspWrapper->dataRsp);
} else {
ASSERT(rspType == TMQ_MSG_TYPE__POLL_META_RSP);
tDecodeSMqMetaRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRspWrapper->metaRsp);
}
tDecodeSMqDataBlkRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRspWrapper->msg);
taosMemoryFree(pMsg->pData);
tscDebug("consumer %ld recv poll: vg %d, req offset %ld, rsp offset %ld", tmq->consumerId, pVg->vgId,
pRspWrapper->msg.reqOffset, pRspWrapper->msg.rspOffset);
pRspWrapper->dataRsp.reqOffset, pRspWrapper->dataRsp.rspOffset);
taosWriteQitem(tmq->mqueue, pRspWrapper);
tsem_post(&tmq->rspSem);
@ -1516,6 +1530,17 @@ SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t timeout, SMqClientTopic*
return pReq;
}
SMqMetaRspObj* tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) {
SMqMetaRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqMetaRspObj));
pRspObj->resType = RES_TYPE__TMQ;
tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
pRspObj->vgId = pWrapper->vgHandle->vgId;
memcpy(&pRspObj->metaRsp, &pWrapper->metaRsp, sizeof(SMqMetaRsp));
return pRspObj;
}
SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) {
SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj));
pRspObj->resType = RES_TYPE__TMQ;
@ -1523,11 +1548,11 @@ SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) {
tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
pRspObj->vgId = pWrapper->vgHandle->vgId;
pRspObj->resIter = -1;
memcpy(&pRspObj->rsp, &pWrapper->msg, sizeof(SMqDataBlkRsp));
memcpy(&pRspObj->rsp, &pWrapper->dataRsp, sizeof(SMqDataBlkRsp));
pRspObj->resInfo.totalRows = 0;
pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
if (!pWrapper->msg.withSchema) {
if (!pWrapper->dataRsp.withSchema) {
setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols);
}
@ -1643,12 +1668,12 @@ SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
/*atomic_sub_fetch_32(&tmq->readyRequest, 1);*/
int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
if (pollRspWrapper->msg.head.epoch == consumerEpoch) {
if (pollRspWrapper->dataRsp.head.epoch == consumerEpoch) {
SMqClientVg* pVg = pollRspWrapper->vgHandle;
/*printf("vg %d offset %ld up to %ld\n", pVg->vgId, pVg->currentOffset, rspMsg->msg.rspOffset);*/
pVg->currentOffset = pollRspWrapper->msg.rspOffset;
pVg->currentOffset = pollRspWrapper->dataRsp.rspOffset;
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
if (pollRspWrapper->msg.blockNum == 0) {
if (pollRspWrapper->dataRsp.blockNum == 0) {
taosFreeQitem(pollRspWrapper);
rspWrapper = NULL;
continue;
@ -1658,8 +1683,25 @@ SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
taosFreeQitem(pollRspWrapper);
return pRsp;
} else {
tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d\n", pollRspWrapper->msg.head.epoch,
consumerEpoch);
tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d\n",
pollRspWrapper->dataRsp.head.epoch, consumerEpoch);
taosFreeQitem(pollRspWrapper);
}
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
if (pollRspWrapper->dataRsp.head.epoch == consumerEpoch) {
SMqClientVg* pVg = pollRspWrapper->vgHandle;
/*printf("vg %d offset %ld up to %ld\n", pVg->vgId, pVg->currentOffset, rspMsg->msg.rspOffset);*/
pVg->currentOffset = pollRspWrapper->dataRsp.rspOffset;
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
// build rsp
SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper);
taosFreeQitem(pollRspWrapper);
return pRsp;
} else {
tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d\n",
pollRspWrapper->dataRsp.head.epoch, consumerEpoch);
taosFreeQitem(pollRspWrapper);
}
} else {
@ -1747,10 +1789,23 @@ const char* tmq_err2str(int32_t err) {
}
}
tmq_res_t tmq_get_res_type(TAOS_RES* res) {
if (TD_RES_TMQ(res)) {
return TMQ_RES_DATA;
} else if (TD_RES_TMQ_META(res)) {
return TMQ_RES_TABLE_META;
} else {
return TMQ_RES_INVALID;
}
}
const char* tmq_get_topic_name(TAOS_RES* res) {
if (TD_RES_TMQ(res)) {
SMqRspObj* pRspObj = (SMqRspObj*)res;
return strchr(pRspObj->topic, '.') + 1;
} else if (TD_RES_TMQ_META(res)) {
SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
return strchr(pMetaRspObj->topic, '.') + 1;
} else {
return NULL;
}
@ -1760,6 +1815,9 @@ const char* tmq_get_db_name(TAOS_RES* res) {
if (TD_RES_TMQ(res)) {
SMqRspObj* pRspObj = (SMqRspObj*)res;
return strchr(pRspObj->db, '.') + 1;
} else if (TD_RES_TMQ_META(res)) {
SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
return strchr(pMetaRspObj->db, '.') + 1;
} else {
return NULL;
}
@ -1769,6 +1827,9 @@ int32_t tmq_get_vgroup_id(TAOS_RES* res) {
if (TD_RES_TMQ(res)) {
SMqRspObj* pRspObj = (SMqRspObj*)res;
return pRspObj->vgId;
} else if (TD_RES_TMQ_META(res)) {
SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
return pMetaRspObj->vgId;
} else {
return -1;
}
@ -1786,6 +1847,16 @@ const char* tmq_get_table_name(TAOS_RES* res) {
return NULL;
}
int32_t tmq_get_raw_meta(TAOS_RES* res, const void** raw_meta, int32_t* raw_meta_len) {
if (TD_RES_TMQ_META(res)) {
SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
*raw_meta = pMetaRspObj->metaRsp.metaRsp;
*raw_meta_len = pMetaRspObj->metaRsp.metaRspLen;
return 0;
}
return -1;
}
void tmq_commit_async(tmq_t* tmq, const TAOS_RES* msg, tmq_commit_cb* cb, void* param) {
tmqCommitInner2(tmq, msg, 0, 1, cb, param);
}

View File

@ -2956,6 +2956,7 @@ int32_t tSerializeSCMCreateTopicReq(void *buf, int32_t bufLen, const SCMCreateTo
if (tEncodeCStr(&encoder, pReq->name) < 0) return -1;
if (tEncodeI8(&encoder, pReq->igExists) < 0) return -1;
if (tEncodeI8(&encoder, pReq->subType) < 0) return -1;
if (tEncodeI8(&encoder, pReq->withMeta) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->subDbName) < 0) return -1;
if (TOPIC_SUB_TYPE__DB == pReq->subType) {
} else if (TOPIC_SUB_TYPE__TABLE == pReq->subType) {
@ -2985,6 +2986,7 @@ int32_t tDeserializeSCMCreateTopicReq(void *buf, int32_t bufLen, SCMCreateTopicR
if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->igExists) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->subType) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->withMeta) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->subDbName) < 0) return -1;
if (TOPIC_SUB_TYPE__DB == pReq->subType) {
} else if (TOPIC_SUB_TYPE__TABLE == pReq->subType) {

View File

@ -216,7 +216,7 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_DELETE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TASK, mmPutMsgToFetchQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_DROP_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIRM_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;

View File

@ -95,7 +95,7 @@ SArray *smGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_MON_SM_INFO, smPutNodeMsgToMonitorQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_DROP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RUN, smPutNodeMsgToSharedQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH, smPutNodeMsgToSharedQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH_RSP, smPutNodeMsgToSharedQueue, 1) == NULL) goto _OVER;

View File

@ -351,7 +351,7 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_HEARTBEAT, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TRIGGER, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_DROP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RUN, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;

View File

@ -420,7 +420,8 @@ typedef struct {
int64_t uid;
int64_t dbUid;
int32_t version;
int8_t subType; // column, db or stable
int8_t subType; // column, db or stable
int8_t withMeta; // TODO
SRWLatch lock;
int32_t sqlLen;
int32_t astLen;
@ -487,6 +488,7 @@ typedef struct {
int64_t dbUid;
int32_t vgNum;
int8_t subType;
int8_t withMeta;
int64_t stbUid;
SHashObj* consumerHash; // consumerId -> SMqConsumerEp
SArray* unassignedVgs; // SArray<SMqVgEp*>

View File

@ -381,6 +381,7 @@ SMqSubscribeObj *tCloneSubscribeObj(const SMqSubscribeObj *pSub) {
pSubNew->dbUid = pSub->dbUid;
pSubNew->stbUid = pSub->stbUid;
pSubNew->subType = pSub->subType;
pSubNew->withMeta = pSub->withMeta;
pSubNew->vgNum = pSub->vgNum;
pSubNew->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
@ -414,6 +415,7 @@ int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) {
tlen += taosEncodeFixedI64(buf, pSub->dbUid);
tlen += taosEncodeFixedI32(buf, pSub->vgNum);
tlen += taosEncodeFixedI8(buf, pSub->subType);
tlen += taosEncodeFixedI8(buf, pSub->withMeta);
tlen += taosEncodeFixedI64(buf, pSub->stbUid);
void *pIter = NULL;
@ -440,6 +442,7 @@ void *tDecodeSubscribeObj(const void *buf, SMqSubscribeObj *pSub) {
buf = taosDecodeFixedI64(buf, &pSub->dbUid);
buf = taosDecodeFixedI32(buf, &pSub->vgNum);
buf = taosDecodeFixedI8(buf, &pSub->subType);
buf = taosDecodeFixedI8(buf, &pSub->withMeta);
buf = taosDecodeFixedI64(buf, &pSub->stbUid);
int32_t sz;

View File

@ -54,9 +54,10 @@ int32_t mndInitStream(SMnode *pMnode) {
};
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STREAM, mndProcessCreateStreamReq);
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DEPLOY_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM, mndProcessDropStreamReq);
mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DEPLOY_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DROP_RSP, mndTransProcessRsp);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndRetrieveStream);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndCancelGetNextStream);
@ -477,7 +478,7 @@ static int32_t mndPersistTaskDropReq(STrans *pTrans, SStreamTask *pTask) {
memcpy(&action.epSet, &pTask->epSet, sizeof(SEpSet));
action.pCont = pReq;
action.contLen = sizeof(SVDropStreamTaskReq);
action.msgType = TDMT_VND_STREAM_TASK_DROP;
action.msgType = TDMT_STREAM_TASK_DROP;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq);
return -1;
@ -670,20 +671,24 @@ _OVER:
static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
int32_t code = -1;
SStreamObj *pStream = NULL;
/*SDbObj *pDb = NULL;*/
/*SUserObj *pUser = NULL;*/
SMDropStreamReq dropReq = *(SMDropStreamReq *)pReq->pCont;
SMDropStreamReq dropReq = {0};
if (tDeserializeSMDropStreamReq(pReq->pCont, pReq->contLen, &dropReq) < 0) {
ASSERT(0);
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
pStream = mndAcquireStream(pMnode, dropReq.name);
if (pStream == NULL) {
if (dropReq.igNotExists) {
mDebug("stream:%s, not exist, ignore not exist is set", dropReq.name);
code = 0;
goto DROP_STREAM_OVER;
sdbRelease(pMnode->pSdb, pStream);
return -1;
} else {
terrno = TSDB_CODE_MND_STREAM_NOT_EXIST;
return -1;
@ -701,14 +706,16 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq);
if (pTrans == NULL) {
mError("stream:%s, failed to drop since %s", dropReq.name, terrstr());
return code;
sdbRelease(pMnode->pSdb, pStream);
return -1;
}
mDebug("trans:%d, used to drop stream:%s", pTrans->id, dropReq.name);
// drop all tasks
if (mndDropStreamTasks(pMnode, pTrans, pStream) < 0) {
mError("stream:%s, failed to drop task since %s", dropReq.name, terrstr());
return code;
sdbRelease(pMnode->pSdb, pStream);
return -1;
}
// drop stream
@ -717,8 +724,16 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
return -1;
}
DROP_STREAM_OVER:
return code;
if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, terrstr());
sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans);
return -1;
}
sdbRelease(pMnode->pSdb, pStream);
return TSDB_CODE_ACTION_IN_PROGRESS;
}
int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {

View File

@ -96,6 +96,7 @@ static SMqSubscribeObj *mndCreateSub(SMnode *pMnode, const SMqTopicObj *pTopic,
pSub->dbUid = pTopic->dbUid;
pSub->stbUid = pTopic->stbUid;
pSub->subType = pTopic->subType;
pSub->withMeta = pTopic->withMeta;
ASSERT(pSub->unassignedVgs->size == 0);
ASSERT(taosHashGetSize(pSub->consumerHash) == 0);
@ -120,6 +121,7 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, const SMqSubscri
req.vgId = pRebVg->pVgEp->vgId;
req.qmsg = pRebVg->pVgEp->qmsg;
req.subType = pSub->subType;
req.withMeta = pSub->withMeta;
req.suid = pSub->stbUid;
strncpy(req.subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);

View File

@ -141,6 +141,7 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
SDB_SET_INT64(pRaw, dataPos, pTopic->dbUid, TOPIC_ENCODE_OVER);
SDB_SET_INT32(pRaw, dataPos, pTopic->version, TOPIC_ENCODE_OVER);
SDB_SET_INT8(pRaw, dataPos, pTopic->subType, TOPIC_ENCODE_OVER);
SDB_SET_INT8(pRaw, dataPos, pTopic->withMeta, TOPIC_ENCODE_OVER);
SDB_SET_INT64(pRaw, dataPos, pTopic->stbUid, TOPIC_ENCODE_OVER);
SDB_SET_INT32(pRaw, dataPos, pTopic->sqlLen, TOPIC_ENCODE_OVER);
@ -208,6 +209,7 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT64(pRaw, dataPos, &pTopic->dbUid, TOPIC_DECODE_OVER);
SDB_GET_INT32(pRaw, dataPos, &pTopic->version, TOPIC_DECODE_OVER);
SDB_GET_INT8(pRaw, dataPos, &pTopic->subType, TOPIC_DECODE_OVER);
SDB_GET_INT8(pRaw, dataPos, &pTopic->withMeta, TOPIC_DECODE_OVER);
SDB_GET_INT64(pRaw, dataPos, &pTopic->stbUid, TOPIC_DECODE_OVER);
SDB_GET_INT32(pRaw, dataPos, &pTopic->sqlLen, TOPIC_DECODE_OVER);
@ -357,6 +359,10 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
topicObj.sql = strdup(pCreate->sql);
topicObj.sqlLen = strlen(pCreate->sql) + 1;
topicObj.subType = pCreate->subType;
topicObj.withMeta = pCreate->withMeta;
if (topicObj.withMeta) {
ASSERT(topicObj.subType != TOPIC_SUB_TYPE__COLUMN);
}
if (pCreate->subType == TOPIC_SUB_TYPE__COLUMN) {
topicObj.ast = strdup(pCreate->ast);

View File

@ -92,7 +92,7 @@ static int32_t sndProcessTaskDeployReq(SSnode *pNode, SRpcMsg *pMsg) {
}
tDecoderClear(&decoder);
pTask->status = TASK_STATUS__IDLE;
pTask->execStatus = TASK_EXEC_STATUS__IDLE;
pTask->inputQueue = streamQueueOpen();
pTask->outputQueue = streamQueueOpen();
@ -205,7 +205,7 @@ int32_t sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg) {
switch (pMsg->msgType) {
case TDMT_STREAM_TASK_DEPLOY:
return sndProcessTaskDeployReq(pSnode, pMsg);
case TDMT_VND_STREAM_TASK_DROP:
case TDMT_STREAM_TASK_DROP:
return sndProcessTaskDropReq(pSnode, pMsg);
default:
ASSERT(0);

View File

@ -114,6 +114,7 @@ typedef struct {
char subKey[TSDB_SUBSCRIBE_KEY_LEN];
int64_t consumerId;
int32_t epoch;
int8_t fetchMeta;
// reader
SWalReadHandle* pWalReader;

View File

@ -85,6 +85,34 @@ void tqClose(STQ* pTq) {
}
}
int32_t tqSendMetaPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqMetaRsp* pRsp) {
int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqMetaRsp(NULL, pRsp);
void* buf = rpcMallocCont(tlen);
if (buf == NULL) {
return -1;
}
((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_META_RSP;
((SMqRspHead*)buf)->epoch = pReq->epoch;
((SMqRspHead*)buf)->consumerId = pReq->consumerId;
void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
tEncodeSMqMetaRsp(&abuf, pRsp);
SRpcMsg resp = {
.info = pMsg->info,
.pCont = buf,
.contLen = tlen,
.code = 0,
};
tmsgSendRsp(&resp);
tqDebug("vg %d from consumer %ld (epoch %d) send rsp, res msg type %d, reqOffset: %ld, rspOffset: %ld",
TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->resMsgType, pRsp->reqOffset, pRsp->rspOffset);
return 0;
}
int32_t tqSendPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataBlkRsp* pRsp) {
int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqDataBlkRsp(NULL, pRsp);
void* buf = rpcMallocCont(tlen);
@ -250,8 +278,23 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
/*ASSERT(0);*/
}
} else {
// TODO
ASSERT(0);
ASSERT(pHandle->fetchMeta);
ASSERT(pHead->msgType == TDMT_VND_CREATE_STB || pHead->msgType == TDMT_VND_ALTER_STB ||
pHead->msgType == TDMT_VND_DROP_STB || pHead->msgType == TDMT_VND_CREATE_TABLE ||
pHead->msgType == TDMT_VND_ALTER_TABLE || pHead->msgType == TDMT_VND_DROP_TABLE ||
pHead->msgType == TDMT_VND_DROP_TTL_TABLE);
// return
SMqMetaRsp metaRsp = {0};
metaRsp.reqOffset = pReq->currentOffset;
metaRsp.rspOffset = fetchOffset;
metaRsp.resMsgType = pHead->msgType;
metaRsp.metaRspLen = pHead->bodyLen;
metaRsp.metaRsp = pHead->body;
if (tqSendMetaPollRsp(pTq, pMsg, pReq, &metaRsp) < 0) {
code = -1;
}
code = 0;
goto OVER;
}
// TODO batch optimization:
@ -276,7 +319,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
if (tqSendPollRsp(pTq, pMsg, pReq, &rsp) < 0) {
code = -1;
}
OVER:
// TODO wrap in destroy func
taosArrayDestroy(rsp.blockDataLen);
taosArrayDestroyP(rsp.blockData, (FDelete)taosMemoryFree);
@ -384,7 +427,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen) {
}
tDecoderClear(&decoder);
pTask->status = TASK_STATUS__IDLE;
pTask->execStatus = TASK_EXEC_STATUS__IDLE;
pTask->inputQueue = streamQueueOpen();
pTask->outputQueue = streamQueueOpen();
@ -459,6 +502,9 @@ int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq) {
pIter = taosHashIterate(pTq->pStreamTasks, pIter);
if (pIter == NULL) break;
SStreamTask* pTask = *(SStreamTask**)pIter;
if (atomic_load_8(&pTask->taskStatus) == TASK_STATUS__DROPPING) {
continue;
}
if (pTask->inputType != STREAM_INPUT__DATA_SUBMIT) continue;
if (!failed) {
@ -487,6 +533,9 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
SStreamTaskRunReq* pReq = pMsg->pCont;
int32_t taskId = pReq->taskId;
SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
if (atomic_load_8(&pTask->taskStatus) != TASK_STATUS__NORMAL) {
return 0;
}
streamProcessRunReq(pTask);
return 0;
}
@ -501,9 +550,12 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg) {
tDecodeStreamDispatchReq(&decoder, &req);
int32_t taskId = req.taskId;
SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
SRpcMsg rsp = {
.info = pMsg->info,
.code = 0,
if (atomic_load_8(&pTask->taskStatus) != TASK_STATUS__NORMAL) {
return 0;
}
SRpcMsg rsp = {
.info = pMsg->info,
.code = 0,
};
streamProcessDispatchReq(pTask, &req, &rsp);
return 0;
@ -513,6 +565,9 @@ int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg) {
SStreamTaskRecoverReq* pReq = pMsg->pCont;
int32_t taskId = pReq->taskId;
SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
if (atomic_load_8(&pTask->taskStatus) != TASK_STATUS__NORMAL) {
return 0;
}
streamProcessRecoverReq(pTask, pReq, pMsg);
return 0;
}
@ -521,6 +576,9 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t taskId = pRsp->taskId;
SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
if (atomic_load_8(&pTask->taskStatus) != TASK_STATUS__NORMAL) {
return 0;
}
streamProcessDispatchRsp(pTask, pRsp);
return 0;
}
@ -529,16 +587,32 @@ int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg) {
SStreamTaskRecoverRsp* pRsp = pMsg->pCont;
int32_t taskId = pRsp->taskId;
SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
if (atomic_load_8(&pTask->taskStatus) != TASK_STATUS__NORMAL) {
return 0;
}
streamProcessRecoverRsp(pTask, pRsp);
return 0;
}
int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen) {
SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &pReq->taskId, sizeof(int32_t));
atomic_store_8(&pTask->taskStatus, TASK_STATUS__DROPPING);
// todo
// clear queue
// push drop req into queue
// launch exec to free memory
// remove from hash
return 0;
#if 0
int32_t code = taosHashRemove(pTq->pStreamTasks, &pReq->taskId, sizeof(int32_t));
// set status dropping
ASSERT(code == 0);
if (code == 0) {
// sendrsp
}
return code;
#endif
}

View File

@ -45,9 +45,9 @@ static int32_t tqLoopExecFromQueue(STQ* pTq, STqHandle* pHandle, SStreamDataSubm
int32_t tqExecFromInputQ(STQ* pTq, STqHandle* pHandle) {
SMqDataBlkRsp rsp = {0};
// 1. guard and set status executing
int8_t execStatus =
atomic_val_compare_exchange_8(&pHandle->pushHandle.execStatus, TASK_STATUS__IDLE, TASK_STATUS__EXECUTING);
if (execStatus == TASK_STATUS__IDLE) {
int8_t execStatus = atomic_val_compare_exchange_8(&pHandle->pushHandle.execStatus, TASK_EXEC_STATUS__IDLE,
TASK_EXEC_STATUS__EXECUTING);
if (execStatus == TASK_EXEC_STATUS__IDLE) {
SStreamDataSubmit* pSubmit = NULL;
// 2. check processedVer
// 2.1. if not missed, get msg from queue
@ -68,18 +68,18 @@ int32_t tqExecFromInputQ(STQ* pTq, STqHandle* pHandle) {
goto SEND_RSP;
}
// set exec status closing
atomic_store_8(&pHandle->pushHandle.execStatus, TASK_STATUS__CLOSING);
atomic_store_8(&pHandle->pushHandle.execStatus, TASK_EXEC_STATUS__CLOSING);
// second run
if (tqLoopExecFromQueue(pTq, pHandle, &pSubmit, &rsp) == 0) {
goto SEND_RSP;
}
// set exec status idle
atomic_store_8(&pHandle->pushHandle.execStatus, TASK_STATUS__IDLE);
atomic_store_8(&pHandle->pushHandle.execStatus, TASK_EXEC_STATUS__IDLE);
}
SEND_RSP:
// 4. if get result
// 4.1 set exec input status blocked and exec status idle
atomic_store_8(&pHandle->pushHandle.execStatus, TASK_STATUS__IDLE);
atomic_store_8(&pHandle->pushHandle.execStatus, TASK_EXEC_STATUS__IDLE);
// 4.2 rpc send
rsp.rspOffset = pHandle->pushHandle.processedVer;
/*if (tqSendPollRsp(pTq, pMsg, pReq, &rsp) < 0) {*/
@ -150,7 +150,7 @@ int32_t tqEnqueueAll(STQ* pTq, SSubmitReq* pReq) {
continue;
}
int8_t execStatus = atomic_load_8(&pHandle->pushHandle.execStatus);
if (execStatus == TASK_STATUS__IDLE || execStatus == TASK_STATUS__CLOSING) {
if (execStatus == TASK_EXEC_STATUS__IDLE || execStatus == TASK_EXEC_STATUS__CLOSING) {
tqSendExecReq(pTq, pHandle);
}
}

View File

@ -42,6 +42,25 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalHead*
code = 0;
goto END;
} else {
if (pHandle->fetchMeta) {
SWalReadHead* pHead = &((*ppHeadWithCkSum)->head);
if (pHead->msgType == TDMT_VND_CREATE_STB || pHead->msgType == TDMT_VND_ALTER_STB ||
pHead->msgType == TDMT_VND_DROP_STB || pHead->msgType == TDMT_VND_CREATE_TABLE ||
pHead->msgType == TDMT_VND_ALTER_TABLE || pHead->msgType == TDMT_VND_DROP_TABLE ||
pHead->msgType == TDMT_VND_DROP_TTL_TABLE) {
code = walFetchBody(pHandle->pWalReader, ppHeadWithCkSum);
if (code < 0) {
ASSERT(0);
*fetchOffset = offset;
code = -1;
goto END;
}
*fetchOffset = offset;
code = 0;
goto END;
}
}
code = walSkipFetchBody(pHandle->pWalReader, *ppHeadWithCkSum);
if (code < 0) {
ASSERT(0);

View File

@ -172,7 +172,7 @@ int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
goto _err;
}
} break;
case TDMT_VND_STREAM_TASK_DROP: {
case TDMT_STREAM_TASK_DROP: {
if (tqProcessTaskDropReq(pVnode->pTq, pMsg->pCont, pMsg->contLen) < 0) {
goto _err;
}

View File

@ -50,6 +50,10 @@ void streamCleanUp() {
void streamTriggerByTimer(void* param, void* tmrId) {
SStreamTask* pTask = (void*)param;
if (atomic_load_8(&pTask->taskStatus) == TASK_STATUS__DROPPING) {
return;
}
if (atomic_load_8(&pTask->triggerStatus) == TASK_TRIGGER_STATUS__ACTIVE) {
SStreamTrigger* trigger = taosAllocateQitem(sizeof(SStreamTrigger), DEF_QITEM);
if (trigger == NULL) return;
@ -82,8 +86,8 @@ int32_t streamSetupTrigger(SStreamTask* pTask) {
}
int32_t streamLaunchByWrite(SStreamTask* pTask, int32_t vgId) {
int8_t execStatus = atomic_load_8(&pTask->status);
if (execStatus == TASK_STATUS__IDLE || execStatus == TASK_STATUS__CLOSING) {
int8_t execStatus = atomic_load_8(&pTask->execStatus);
if (execStatus == TASK_EXEC_STATUS__IDLE || execStatus == TASK_EXEC_STATUS__CLOSING) {
SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
if (pRunReq == NULL) return -1;
@ -188,6 +192,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp) {
int32_t streamProcessRunReq(SStreamTask* pTask) {
streamExec(pTask, pTask->pMsgCb);
if (pTask->dispatchType != TASK_DISPATCH__NONE) {
streamDispatch(pTask, pTask->pMsgCb);
}

View File

@ -33,6 +33,9 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes)
ASSERT(pTask->inputType == STREAM_INPUT__DATA_BLOCK);
SArray* blocks = pBlock->blocks;
qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_DATA_TYPE_SSDATA_BLOCK, false);
} else if (pItem->type == STREAM_INPUT__DROP) {
// TODO exec drop
return 0;
}
// exec
@ -58,6 +61,10 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) {
streamTaskExecImpl(pTask, data, pRes);
if (pTask->taskStatus == TASK_STATUS__DROPPING) {
return NULL;
}
if (taosArrayGetSize(pRes) != 0) {
SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
if (qRes == NULL) {
@ -75,12 +82,17 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) {
return NULL;
}
if (pTask->inputType == STREAM_INPUT__DATA_SUBMIT) {
streamDataSubmitRefDec((SStreamDataSubmit*)data);
if (((SStreamQueueItem*)data)->type == STREAM_INPUT__TRIGGER) {
blockDataDestroy(((SStreamTrigger*)data)->pBlock);
taosFreeQitem(data);
} else {
taosArrayDestroyEx(((SStreamDataBlock*)data)->blocks, (FDelete)tDeleteSSDataBlock);
taosFreeQitem(data);
if (pTask->inputType == STREAM_INPUT__DATA_SUBMIT) {
streamDataSubmitRefDec((SStreamDataSubmit*)data);
taosFreeQitem(data);
} else {
taosArrayDestroyEx(((SStreamDataBlock*)data)->blocks, (FDelete)tDeleteSSDataBlock);
taosFreeQitem(data);
}
}
streamQueueProcessSuccess(pTask->inputQueue);
return taosArrayInit(0, sizeof(SSDataBlock));
@ -94,25 +106,26 @@ int32_t streamExec(SStreamTask* pTask, SMsgCb* pMsgCb) {
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
if (pRes == NULL) return -1;
while (1) {
int8_t execStatus = atomic_val_compare_exchange_8(&pTask->status, TASK_STATUS__IDLE, TASK_STATUS__EXECUTING);
if (execStatus == TASK_STATUS__IDLE) {
int8_t execStatus =
atomic_val_compare_exchange_8(&pTask->execStatus, TASK_EXEC_STATUS__IDLE, TASK_EXEC_STATUS__EXECUTING);
if (execStatus == TASK_EXEC_STATUS__IDLE) {
// first run
pRes = streamExecForQall(pTask, pRes);
if (pRes == NULL) goto FAIL;
// set status closing
atomic_store_8(&pTask->status, TASK_STATUS__CLOSING);
atomic_store_8(&pTask->execStatus, TASK_EXEC_STATUS__CLOSING);
// second run, make sure inputQ and qall are cleared
pRes = streamExecForQall(pTask, pRes);
if (pRes == NULL) goto FAIL;
taosArrayDestroy(pRes);
atomic_store_8(&pTask->status, TASK_STATUS__IDLE);
atomic_store_8(&pTask->execStatus, TASK_EXEC_STATUS__IDLE);
return 0;
} else if (execStatus == TASK_STATUS__CLOSING) {
} else if (execStatus == TASK_EXEC_STATUS__CLOSING) {
continue;
} else if (execStatus == TASK_STATUS__EXECUTING) {
} else if (execStatus == TASK_EXEC_STATUS__EXECUTING) {
ASSERT(taosArrayGetSize(pRes) == 0);
taosArrayDestroy(pRes);
return 0;
@ -122,7 +135,7 @@ int32_t streamExec(SStreamTask* pTask, SMsgCb* pMsgCb) {
}
FAIL:
if (pRes) taosArrayDestroy(pRes);
atomic_store_8(&pTask->status, TASK_STATUS__IDLE);
atomic_store_8(&pTask->execStatus, TASK_EXEC_STATUS__IDLE);
return -1;
}

View File

@ -23,7 +23,7 @@ SStreamTask* tNewSStreamTask(int64_t streamId) {
}
pTask->taskId = tGenIdPI32();
pTask->streamId = streamId;
pTask->status = TASK_STATUS__IDLE;
pTask->execStatus = TASK_EXEC_STATUS__IDLE;
pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL;
@ -35,7 +35,8 @@ int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
if (tEncodeI64(pEncoder, pTask->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->taskId) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->inputType) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->status) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->taskStatus) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->execStatus) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->execType) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->sinkType) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->dispatchType) < 0) return -1;
@ -83,7 +84,8 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
if (tDecodeI64(pDecoder, &pTask->streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->taskId) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->inputType) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->status) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->taskStatus) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->execStatus) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->execType) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->sinkType) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->dispatchType) < 0) return -1;