commit
07a80c5bab
|
@ -31,27 +31,27 @@ typedef void TAOS_SUB;
|
||||||
typedef void **TAOS_ROW;
|
typedef void **TAOS_ROW;
|
||||||
|
|
||||||
// Data type definition
|
// Data type definition
|
||||||
#define TSDB_DATA_TYPE_NULL 0 // 1 bytes
|
#define TSDB_DATA_TYPE_NULL 0 // 1 bytes
|
||||||
#define TSDB_DATA_TYPE_BOOL 1 // 1 bytes
|
#define TSDB_DATA_TYPE_BOOL 1 // 1 bytes
|
||||||
#define TSDB_DATA_TYPE_TINYINT 2 // 1 byte
|
#define TSDB_DATA_TYPE_TINYINT 2 // 1 byte
|
||||||
#define TSDB_DATA_TYPE_SMALLINT 3 // 2 bytes
|
#define TSDB_DATA_TYPE_SMALLINT 3 // 2 bytes
|
||||||
#define TSDB_DATA_TYPE_INT 4 // 4 bytes
|
#define TSDB_DATA_TYPE_INT 4 // 4 bytes
|
||||||
#define TSDB_DATA_TYPE_BIGINT 5 // 8 bytes
|
#define TSDB_DATA_TYPE_BIGINT 5 // 8 bytes
|
||||||
#define TSDB_DATA_TYPE_FLOAT 6 // 4 bytes
|
#define TSDB_DATA_TYPE_FLOAT 6 // 4 bytes
|
||||||
#define TSDB_DATA_TYPE_DOUBLE 7 // 8 bytes
|
#define TSDB_DATA_TYPE_DOUBLE 7 // 8 bytes
|
||||||
#define TSDB_DATA_TYPE_VARCHAR 8 // string, alias for varchar
|
#define TSDB_DATA_TYPE_VARCHAR 8 // string, alias for varchar
|
||||||
#define TSDB_DATA_TYPE_TIMESTAMP 9 // 8 bytes
|
#define TSDB_DATA_TYPE_TIMESTAMP 9 // 8 bytes
|
||||||
#define TSDB_DATA_TYPE_NCHAR 10 // unicode string
|
#define TSDB_DATA_TYPE_NCHAR 10 // unicode string
|
||||||
#define TSDB_DATA_TYPE_UTINYINT 11 // 1 byte
|
#define TSDB_DATA_TYPE_UTINYINT 11 // 1 byte
|
||||||
#define TSDB_DATA_TYPE_USMALLINT 12 // 2 bytes
|
#define TSDB_DATA_TYPE_USMALLINT 12 // 2 bytes
|
||||||
#define TSDB_DATA_TYPE_UINT 13 // 4 bytes
|
#define TSDB_DATA_TYPE_UINT 13 // 4 bytes
|
||||||
#define TSDB_DATA_TYPE_UBIGINT 14 // 8 bytes
|
#define TSDB_DATA_TYPE_UBIGINT 14 // 8 bytes
|
||||||
#define TSDB_DATA_TYPE_JSON 15 // json string
|
#define TSDB_DATA_TYPE_JSON 15 // json string
|
||||||
#define TSDB_DATA_TYPE_VARBINARY 16 // binary
|
#define TSDB_DATA_TYPE_VARBINARY 16 // binary
|
||||||
#define TSDB_DATA_TYPE_DECIMAL 17 // decimal
|
#define TSDB_DATA_TYPE_DECIMAL 17 // decimal
|
||||||
#define TSDB_DATA_TYPE_BLOB 18 // binary
|
#define TSDB_DATA_TYPE_BLOB 18 // binary
|
||||||
#define TSDB_DATA_TYPE_MEDIUMBLOB 19
|
#define TSDB_DATA_TYPE_MEDIUMBLOB 19
|
||||||
#define TSDB_DATA_TYPE_BINARY TSDB_DATA_TYPE_VARCHAR // string
|
#define TSDB_DATA_TYPE_BINARY TSDB_DATA_TYPE_VARCHAR // string
|
||||||
|
|
||||||
typedef enum {
|
typedef enum {
|
||||||
TSDB_OPTION_LOCALE,
|
TSDB_OPTION_LOCALE,
|
||||||
|
@ -257,9 +257,15 @@ DLL_EXPORT void tmq_conf_set_offset_commit_cb(tmq_conf_t *conf, tmq_co
|
||||||
void tmqShowMsg(tmq_message_t *tmq_message);
|
void tmqShowMsg(tmq_message_t *tmq_message);
|
||||||
int32_t tmqGetSkipLogNum(tmq_message_t *tmq_message);
|
int32_t tmqGetSkipLogNum(tmq_message_t *tmq_message);
|
||||||
|
|
||||||
typedef void (*TAOS_SUBSCRIBE_CALLBACK)(TAOS_SUB* tsub, TAOS_RES *res, void* param, int code);
|
/* -------------------------TMQ MSG HANDLE INTERFACE---------------------- */
|
||||||
|
|
||||||
DLL_EXPORT int taos_stmt_affected_rows(TAOS_STMT* stmt);
|
DLL_EXPORT TAOS_ROW tmq_get_row(tmq_message_t *message);
|
||||||
|
DLL_EXPORT char *tmq_get_topic_name(tmq_message_t *message);
|
||||||
|
|
||||||
|
/* ---------------------- OTHER ---------------------------- */
|
||||||
|
typedef void (*TAOS_SUBSCRIBE_CALLBACK)(TAOS_SUB *tsub, TAOS_RES *res, void *param, int code);
|
||||||
|
|
||||||
|
DLL_EXPORT int taos_stmt_affected_rows(TAOS_STMT *stmt);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -52,6 +52,21 @@ SEpSet getEpSet_s(SCorEpSet* pEpSet);
|
||||||
BMCharPos(bm_, r_) |= (1u << (7u - BitPos(r_))); \
|
BMCharPos(bm_, r_) |= (1u << (7u - BitPos(r_))); \
|
||||||
} while (0)
|
} while (0)
|
||||||
|
|
||||||
|
static FORCE_INLINE bool colDataIsNull_s(const SColumnInfoData* pColumnInfoData, uint32_t row) {
|
||||||
|
if (!pColumnInfoData->hasNull) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
|
||||||
|
return pColumnInfoData->varmeta.offset[row] == -1;
|
||||||
|
} else {
|
||||||
|
if (pColumnInfoData->nullbitmap == NULL) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return colDataIsNull_f(pColumnInfoData->nullbitmap, row);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static FORCE_INLINE bool colDataIsNull(const SColumnInfoData* pColumnInfoData, uint32_t totalRows, uint32_t row,
|
static FORCE_INLINE bool colDataIsNull(const SColumnInfoData* pColumnInfoData, uint32_t totalRows, uint32_t row,
|
||||||
SColumnDataAgg* pColAgg) {
|
SColumnDataAgg* pColAgg) {
|
||||||
if (!pColumnInfoData->hasNull) {
|
if (!pColumnInfoData->hasNull) {
|
||||||
|
@ -79,10 +94,10 @@ static FORCE_INLINE bool colDataIsNull(const SColumnInfoData* pColumnInfoData, u
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#define BitmapLen(_n) (((_n) + ((1<<NBIT)-1)) >> NBIT)
|
#define BitmapLen(_n) (((_n) + ((1 << NBIT) - 1)) >> NBIT)
|
||||||
|
|
||||||
|
// SColumnInfoData, rowNumber
|
||||||
#define colDataGetData(p1_, r_) \
|
#define colDataGetData(p1_, r_) \
|
||||||
((IS_VAR_DATA_TYPE((p1_)->info.type)) ? ((p1_)->pData + (p1_)->varmeta.offset[(r_)]) \
|
((IS_VAR_DATA_TYPE((p1_)->info.type)) ? ((p1_)->pData + (p1_)->varmeta.offset[(r_)]) \
|
||||||
: ((p1_)->pData + ((r_) * (p1_)->info.bytes)))
|
: ((p1_)->pData + ((r_) * (p1_)->info.bytes)))
|
||||||
|
|
||||||
|
@ -126,4 +141,4 @@ void* blockDataDestroy(SSDataBlock* pBlock);
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#endif /*_TD_COMMON_EP_H_*/
|
#endif /*_TD_COMMON_EP_H_*/
|
||||||
|
|
|
@ -419,7 +419,7 @@ typedef struct {
|
||||||
};
|
};
|
||||||
} SColumnFilterList;
|
} SColumnFilterList;
|
||||||
/*
|
/*
|
||||||
* for client side struct, we only need the column id, type, bytes are not necessary
|
* for client side struct, only column id, type, bytes are necessary
|
||||||
* But for data in vnode side, we need all the following information.
|
* But for data in vnode side, we need all the following information.
|
||||||
*/
|
*/
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
@ -2158,13 +2158,36 @@ typedef struct {
|
||||||
SArray* topics; // SArray<SMqSubTopicEp>
|
SArray* topics; // SArray<SMqSubTopicEp>
|
||||||
} SMqCMGetSubEpRsp;
|
} SMqCMGetSubEpRsp;
|
||||||
|
|
||||||
struct tmq_message_t {
|
typedef struct {
|
||||||
SMqRspHead head;
|
SMqRspHead head;
|
||||||
union {
|
union {
|
||||||
SMqPollRsp consumeRsp;
|
SMqPollRsp consumeRsp;
|
||||||
SMqCMGetSubEpRsp getEpRsp;
|
SMqCMGetSubEpRsp getEpRsp;
|
||||||
};
|
};
|
||||||
void* extra;
|
void* extra;
|
||||||
|
} SMqMsgWrapper;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int32_t curBlock;
|
||||||
|
int32_t curRow;
|
||||||
|
void** uData;
|
||||||
|
} SMqRowIter;
|
||||||
|
|
||||||
|
struct tmq_message_t_v1 {
|
||||||
|
SMqPollRsp rsp;
|
||||||
|
SMqRowIter iter;
|
||||||
|
};
|
||||||
|
|
||||||
|
struct tmq_message_t {
|
||||||
|
SMqRspHead head;
|
||||||
|
union {
|
||||||
|
SMqPollRsp consumeRsp;
|
||||||
|
SMqCMGetSubEpRsp getEpRsp;
|
||||||
|
};
|
||||||
|
void* extra;
|
||||||
|
int32_t curBlock;
|
||||||
|
int32_t curRow;
|
||||||
|
void** uData;
|
||||||
};
|
};
|
||||||
|
|
||||||
static FORCE_INLINE void tDeleteSMqSubTopicEp(SMqSubTopicEp* pSubTopicEp) { taosArrayDestroy(pSubTopicEp->vgs); }
|
static FORCE_INLINE void tDeleteSMqSubTopicEp(SMqSubTopicEp* pSubTopicEp) { taosArrayDestroy(pSubTopicEp->vgs); }
|
||||||
|
|
|
@ -189,6 +189,7 @@ enum {
|
||||||
|
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_SUBSCRIBE, "vnode-subscribe", SMVSubscribeReq, SMVSubscribeRsp)
|
TD_DEF_MSG_TYPE(TDMT_VND_SUBSCRIBE, "vnode-subscribe", SMVSubscribeReq, SMVSubscribeRsp)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_CONSUME, "vnode-consume", SMqCVConsumeReq, SMqCVConsumeRsp)
|
TD_DEF_MSG_TYPE(TDMT_VND_CONSUME, "vnode-consume", SMqCVConsumeReq, SMqCVConsumeRsp)
|
||||||
|
TD_DEF_MSG_TYPE(TDMT_VND_TASK_DEPLOY, "vnode-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp)
|
||||||
|
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_CREATE_SMA, "vnode-create-sma", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_CREATE_SMA, "vnode-create-sma", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_SMA, "vnode-cancel-sma", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_SMA, "vnode-cancel-sma", NULL, NULL)
|
||||||
|
|
|
@ -700,6 +700,10 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||||
}
|
}
|
||||||
memcpy(pRsp, pMsg->pData, sizeof(SMqRspHead));
|
memcpy(pRsp, pMsg->pData, sizeof(SMqRspHead));
|
||||||
tDecodeSMqPollRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRsp->consumeRsp);
|
tDecodeSMqPollRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRsp->consumeRsp);
|
||||||
|
pRsp->curBlock = 0;
|
||||||
|
pRsp->curRow = 0;
|
||||||
|
// TODO: alloc mem
|
||||||
|
/*pRsp->*/
|
||||||
/*printf("rsp commit off:%ld rsp off:%ld has data:%d\n", pRsp->committedOffset, pRsp->rspOffset, pRsp->numOfTopics);*/
|
/*printf("rsp commit off:%ld rsp off:%ld has data:%d\n", pRsp->committedOffset, pRsp->rspOffset, pRsp->numOfTopics);*/
|
||||||
if (pRsp->consumeRsp.numOfTopics == 0) {
|
if (pRsp->consumeRsp.numOfTopics == 0) {
|
||||||
/*printf("no data\n");*/
|
/*printf("no data\n");*/
|
||||||
|
@ -758,9 +762,9 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||||
goto END;
|
goto END;
|
||||||
}
|
}
|
||||||
|
|
||||||
// tmq's epoch is monotomically increase,
|
// tmq's epoch is monotonically increase,
|
||||||
// so it's safe to discard any old epoch msg.
|
// so it's safe to discard any old epoch msg.
|
||||||
// epoch will only increase when received newer epoch ep msg
|
// Epoch will only increase when received newer epoch ep msg
|
||||||
SMqRspHead* head = pMsg->pData;
|
SMqRspHead* head = pMsg->pData;
|
||||||
int32_t epoch = atomic_load_32(&tmq->epoch);
|
int32_t epoch = atomic_load_32(&tmq->epoch);
|
||||||
if (head->epoch <= epoch) {
|
if (head->epoch <= epoch) {
|
||||||
|
@ -1282,6 +1286,34 @@ const char* tmq_err2str(tmq_resp_err_t err) {
|
||||||
return "fail";
|
return "fail";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TAOS_ROW tmq_get_row(tmq_message_t* message) {
|
||||||
|
SMqPollRsp* rsp = &message->consumeRsp;
|
||||||
|
while (1) {
|
||||||
|
if (message->curBlock < taosArrayGetSize(rsp->pBlockData)) {
|
||||||
|
SSDataBlock* pBlock = taosArrayGet(rsp->pBlockData, message->curBlock);
|
||||||
|
if (message->curRow < pBlock->info.rows) {
|
||||||
|
for (int i = 0; i < pBlock->info.numOfCols; i++) {
|
||||||
|
SColumnInfoData* pData = taosArrayGet(pBlock->pDataBlock, i);
|
||||||
|
if (colDataIsNull_s(pData, message->curRow))
|
||||||
|
message->uData[i] = NULL;
|
||||||
|
else {
|
||||||
|
message->uData[i] = colDataGetData(pData, message->curRow);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
message->curRow++;
|
||||||
|
return message->uData;
|
||||||
|
} else {
|
||||||
|
message->curBlock++;
|
||||||
|
message->curRow = 0;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
char* tmq_get_topic_name(tmq_message_t* message) { return "not implemented yet"; }
|
||||||
|
|
||||||
#if 0
|
#if 0
|
||||||
tmq_t* tmqCreateConsumerImpl(TAOS* conn, tmq_conf_t* conf) {
|
tmq_t* tmqCreateConsumerImpl(TAOS* conn, tmq_conf_t* conf) {
|
||||||
tmq_t* pTmq = malloc(sizeof(tmq_t));
|
tmq_t* pTmq = malloc(sizeof(tmq_t));
|
||||||
|
|
|
@ -27,6 +27,22 @@ int32_t tEncodeSStreamObj(SCoder *pEncoder, const SStreamObj *pObj) {
|
||||||
if (tEncodeCStr(pEncoder, pObj->sql) < 0) return -1;
|
if (tEncodeCStr(pEncoder, pObj->sql) < 0) return -1;
|
||||||
if (tEncodeCStr(pEncoder, pObj->logicalPlan) < 0) return -1;
|
if (tEncodeCStr(pEncoder, pObj->logicalPlan) < 0) return -1;
|
||||||
if (tEncodeCStr(pEncoder, pObj->physicalPlan) < 0) return -1;
|
if (tEncodeCStr(pEncoder, pObj->physicalPlan) < 0) return -1;
|
||||||
|
// TODO encode tasks
|
||||||
|
if (pObj->tasks) {
|
||||||
|
int32_t sz = taosArrayGetSize(pObj->tasks);
|
||||||
|
tEncodeI32(pEncoder, sz);
|
||||||
|
for (int32_t i = 0; i < sz; i++) {
|
||||||
|
SArray *pArray = taosArrayGet(pObj->tasks, i);
|
||||||
|
int32_t innerSz = taosArrayGetSize(pArray);
|
||||||
|
tEncodeI32(pEncoder, innerSz);
|
||||||
|
for (int32_t j = 0; j < innerSz; j++) {
|
||||||
|
SStreamTask *pTask = taosArrayGet(pArray, j);
|
||||||
|
tEncodeSStreamTask(pEncoder, pTask);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
tEncodeI32(pEncoder, 0);
|
||||||
|
}
|
||||||
return pEncoder->pos;
|
return pEncoder->pos;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -42,5 +58,23 @@ int32_t tDecodeSStreamObj(SCoder *pDecoder, SStreamObj *pObj) {
|
||||||
if (tDecodeCStrAlloc(pDecoder, &pObj->sql) < 0) return -1;
|
if (tDecodeCStrAlloc(pDecoder, &pObj->sql) < 0) return -1;
|
||||||
if (tDecodeCStrAlloc(pDecoder, &pObj->logicalPlan) < 0) return -1;
|
if (tDecodeCStrAlloc(pDecoder, &pObj->logicalPlan) < 0) return -1;
|
||||||
if (tDecodeCStrAlloc(pDecoder, &pObj->physicalPlan) < 0) return -1;
|
if (tDecodeCStrAlloc(pDecoder, &pObj->physicalPlan) < 0) return -1;
|
||||||
|
int32_t sz;
|
||||||
|
if (tDecodeI32(pDecoder, &sz) < 0) return -1;
|
||||||
|
if (sz != 0) {
|
||||||
|
pObj->tasks = taosArrayInit(sz, sizeof(SArray));
|
||||||
|
for (int32_t i = 0; i < sz; i++) {
|
||||||
|
int32_t innerSz;
|
||||||
|
if (tDecodeI32(pDecoder, &innerSz) < 0) return -1;
|
||||||
|
SArray *pArray = taosArrayInit(innerSz, sizeof(SStreamTask));
|
||||||
|
for (int32_t j = 0; j < innerSz; j++) {
|
||||||
|
SStreamTask task;
|
||||||
|
if (tDecodeSStreamTask(pDecoder, &task) < 0) return -1;
|
||||||
|
taosArrayPush(pArray, &task);
|
||||||
|
}
|
||||||
|
taosArrayPush(pObj->tasks, pArray);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
pObj->tasks = NULL;
|
||||||
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -77,6 +77,32 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
taosArrayPush(taskOneLevel, pTask);
|
taosArrayPush(taskOneLevel, pTask);
|
||||||
|
|
||||||
|
SCoder encoder;
|
||||||
|
tCoderInit(&encoder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER);
|
||||||
|
tEncodeSStreamTask(&encoder, pTask);
|
||||||
|
int32_t tlen = sizeof(SMsgHead) + encoder.pos;
|
||||||
|
tCoderClear(&encoder);
|
||||||
|
void* buf = rpcMallocCont(tlen);
|
||||||
|
if (buf == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
((SMsgHead*)buf)->streamTaskId = pTask->taskId;
|
||||||
|
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
||||||
|
tCoderInit(&encoder, TD_LITTLE_ENDIAN, abuf, tlen, TD_ENCODER);
|
||||||
|
tEncodeSStreamTask(&encoder, pTask);
|
||||||
|
tCoderClear(&encoder);
|
||||||
|
|
||||||
|
STransAction action = {0};
|
||||||
|
action.epSet = plan->execNode.epSet;
|
||||||
|
action.pCont = buf;
|
||||||
|
action.contLen = tlen;
|
||||||
|
action.msgType = TDMT_VND_TASK_DEPLOY;
|
||||||
|
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||||
|
rpcFreeCont(buf);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else if (plan->subplanType == SUBPLAN_TYPE_SCAN) {
|
} else if (plan->subplanType == SUBPLAN_TYPE_SCAN) {
|
||||||
// duplicatable
|
// duplicatable
|
||||||
|
@ -101,6 +127,32 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayPush(taskOneLevel, pTask);
|
taosArrayPush(taskOneLevel, pTask);
|
||||||
|
|
||||||
|
SCoder encoder;
|
||||||
|
tCoderInit(&encoder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER);
|
||||||
|
tEncodeSStreamTask(&encoder, pTask);
|
||||||
|
int32_t tlen = sizeof(SMsgHead) + encoder.pos;
|
||||||
|
tCoderClear(&encoder);
|
||||||
|
void* buf = rpcMallocCont(tlen);
|
||||||
|
if (buf == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
((SMsgHead*)buf)->streamTaskId = pTask->taskId;
|
||||||
|
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
||||||
|
tCoderInit(&encoder, TD_LITTLE_ENDIAN, abuf, tlen, TD_ENCODER);
|
||||||
|
tEncodeSStreamTask(&encoder, pTask);
|
||||||
|
tCoderClear(&encoder);
|
||||||
|
|
||||||
|
STransAction action = {0};
|
||||||
|
action.epSet = plan->execNode.epSet;
|
||||||
|
action.pCont = buf;
|
||||||
|
action.contLen = tlen;
|
||||||
|
action.msgType = TDMT_SND_TASK_DEPLOY;
|
||||||
|
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||||
|
rpcFreeCont(buf);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// not duplicatable
|
// not duplicatable
|
||||||
|
@ -117,6 +169,32 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
taosArrayPush(taskOneLevel, pTask);
|
taosArrayPush(taskOneLevel, pTask);
|
||||||
|
|
||||||
|
SCoder encoder;
|
||||||
|
tCoderInit(&encoder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER);
|
||||||
|
tEncodeSStreamTask(&encoder, pTask);
|
||||||
|
int32_t tlen = sizeof(SMsgHead) + encoder.pos;
|
||||||
|
tCoderClear(&encoder);
|
||||||
|
void* buf = rpcMallocCont(tlen);
|
||||||
|
if (buf == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
((SMsgHead*)buf)->streamTaskId = pTask->taskId;
|
||||||
|
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
||||||
|
tCoderInit(&encoder, TD_LITTLE_ENDIAN, abuf, tlen, TD_ENCODER);
|
||||||
|
tEncodeSStreamTask(&encoder, pTask);
|
||||||
|
tCoderClear(&encoder);
|
||||||
|
|
||||||
|
STransAction action = {0};
|
||||||
|
action.epSet = plan->execNode.epSet;
|
||||||
|
action.pCont = buf;
|
||||||
|
action.contLen = tlen;
|
||||||
|
action.msgType = TDMT_SND_TASK_DEPLOY;
|
||||||
|
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||||
|
rpcFreeCont(buf);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
taosArrayPush(pStream->tasks, taskOneLevel);
|
taosArrayPush(pStream->tasks, taskOneLevel);
|
||||||
}
|
}
|
||||||
|
|
|
@ -230,6 +230,12 @@ static int32_t mndCreateStream(SMnode *pMnode, SMnodeMsg *pReq, SCMCreateStreamR
|
||||||
}
|
}
|
||||||
mDebug("trans:%d, used to create stream:%s", pTrans->id, pCreate->name);
|
mDebug("trans:%d, used to create stream:%s", pTrans->id, pCreate->name);
|
||||||
|
|
||||||
|
if (mndScheduleStream(pMnode, pTrans, &streamObj) < 0) {
|
||||||
|
mError("stream:%ld, schedule stream since %s", streamObj.uid, terrstr());
|
||||||
|
mndTransDrop(pTrans);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
SSdbRaw *pRedoRaw = mndStreamActionEncode(&streamObj);
|
SSdbRaw *pRedoRaw = mndStreamActionEncode(&streamObj);
|
||||||
if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) {
|
if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) {
|
||||||
mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr());
|
mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr());
|
||||||
|
@ -238,12 +244,6 @@ static int32_t mndCreateStream(SMnode *pMnode, SMnodeMsg *pReq, SCMCreateStreamR
|
||||||
}
|
}
|
||||||
sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY);
|
sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY);
|
||||||
|
|
||||||
if (mndScheduleStream(pMnode, pTrans, &streamObj) < 0) {
|
|
||||||
mError("stream:%ld, schedule stream since %s", streamObj.uid, terrstr());
|
|
||||||
mndTransDrop(pTrans);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (mndTransPrepare(pMnode, pTrans) != 0) {
|
if (mndTransPrepare(pMnode, pTrans) != 0) {
|
||||||
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
|
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
|
||||||
mndTransDrop(pTrans);
|
mndTransDrop(pTrans);
|
||||||
|
|
|
@ -55,6 +55,7 @@ int tqCommit(STQ*);
|
||||||
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg);
|
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg);
|
||||||
int32_t tqProcessSetConnReq(STQ* pTq, char* msg);
|
int32_t tqProcessSetConnReq(STQ* pTq, char* msg);
|
||||||
int32_t tqProcessRebReq(STQ* pTq, char* msg);
|
int32_t tqProcessRebReq(STQ* pTq, char* msg);
|
||||||
|
int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -161,6 +161,7 @@ struct STQ {
|
||||||
STqMemRef tqMemRef;
|
STqMemRef tqMemRef;
|
||||||
STqMetaStore* tqMeta;
|
STqMetaStore* tqMeta;
|
||||||
STqPushMgr* tqPushMgr;
|
STqPushMgr* tqPushMgr;
|
||||||
|
SHashObj* pStreamTasks;
|
||||||
SWal* pWal;
|
SWal* pWal;
|
||||||
SMeta* pVnodeMeta;
|
SMeta* pVnodeMeta;
|
||||||
};
|
};
|
||||||
|
|
|
@ -55,6 +55,8 @@ STQ* tqOpen(const char* path, SWal* pWal, SMeta* pVnodeMeta, STqCfg* tqConfig, S
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pTq->pStreamTasks = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
|
||||||
|
|
||||||
return pTq;
|
return pTq;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -416,3 +418,18 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) {
|
||||||
terrno = TSDB_CODE_SUCCESS;
|
terrno = TSDB_CODE_SUCCESS;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) {
|
||||||
|
SStreamTask* pTask = malloc(sizeof(SStreamTask));
|
||||||
|
if (pTask == NULL) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
SCoder decoder;
|
||||||
|
tCoderInit(&decoder, TD_LITTLE_ENDIAN, (uint8_t*)msg, msgLen, TD_DECODER);
|
||||||
|
tDecodeSStreamTask(&decoder, pTask);
|
||||||
|
tCoderClear(&decoder);
|
||||||
|
|
||||||
|
taosHashPut(pTq->pStreamTasks, &pTask->taskId, sizeof(int32_t), pTask, sizeof(SStreamTask));
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
|
@ -41,7 +41,7 @@ int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||||
void *ptr = NULL;
|
void *ptr = NULL;
|
||||||
|
|
||||||
if (pVnode->config.streamMode == 0) {
|
if (pVnode->config.streamMode == 0) {
|
||||||
|
@ -63,7 +63,7 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||||
|
|
||||||
switch (pMsg->msgType) {
|
switch (pMsg->msgType) {
|
||||||
case TDMT_VND_CREATE_STB: {
|
case TDMT_VND_CREATE_STB: {
|
||||||
SVCreateTbReq vCreateTbReq = {0};
|
SVCreateTbReq vCreateTbReq = {0};
|
||||||
tDeserializeSVCreateTbReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vCreateTbReq);
|
tDeserializeSVCreateTbReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vCreateTbReq);
|
||||||
if (metaCreateTable(pVnode->pMeta, &(vCreateTbReq)) < 0) {
|
if (metaCreateTable(pVnode->pMeta, &(vCreateTbReq)) < 0) {
|
||||||
// TODO: handle error
|
// TODO: handle error
|
||||||
|
@ -100,7 +100,7 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case TDMT_VND_ALTER_STB: {
|
case TDMT_VND_ALTER_STB: {
|
||||||
SVCreateTbReq vAlterTbReq = {0};
|
SVCreateTbReq vAlterTbReq = {0};
|
||||||
vTrace("vgId:%d, process alter stb req", pVnode->vgId);
|
vTrace("vgId:%d, process alter stb req", pVnode->vgId);
|
||||||
tDeserializeSVCreateTbReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vAlterTbReq);
|
tDeserializeSVCreateTbReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vAlterTbReq);
|
||||||
free(vAlterTbReq.stbCfg.pSchema);
|
free(vAlterTbReq.stbCfg.pSchema);
|
||||||
|
@ -132,6 +132,11 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||||
if (tqProcessRebReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) {
|
if (tqProcessRebReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) {
|
||||||
}
|
}
|
||||||
} break;
|
} break;
|
||||||
|
case TDMT_VND_TASK_DEPLOY: {
|
||||||
|
if (tqProcessTaskDeploy(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
|
||||||
|
pMsg->contLen - sizeof(SMsgHead)) < 0) {
|
||||||
|
}
|
||||||
|
} break;
|
||||||
case TDMT_VND_CREATE_SMA: { // timeRangeSMA
|
case TDMT_VND_CREATE_SMA: { // timeRangeSMA
|
||||||
SSmaCfg vCreateSmaReq = {0};
|
SSmaCfg vCreateSmaReq = {0};
|
||||||
if (tDeserializeSVCreateTSmaReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vCreateSmaReq) == NULL) {
|
if (tDeserializeSVCreateTSmaReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vCreateSmaReq) == NULL) {
|
||||||
|
|
Loading…
Reference in New Issue