Merge pull request #12864 from taosdata/feature/stream
refactor(stream)
This commit is contained in:
commit
65c5604ebe
|
@ -71,8 +71,8 @@ ELSE ()
|
||||||
ENDIF ()
|
ENDIF ()
|
||||||
|
|
||||||
IF (${SANITIZER} MATCHES "true")
|
IF (${SANITIZER} MATCHES "true")
|
||||||
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -gdwarf-2 -fsanitize=address -fsanitize=undefined -fsanitize-recover=all -fsanitize=float-divide-by-zero -fsanitize=float-cast-overflow -fno-sanitize=null -fno-sanitize=alignment -static-libasan -g3")
|
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -gdwarf-2 -fsanitize=address -fsanitize=undefined -fsanitize-recover=all -fsanitize=float-divide-by-zero -fsanitize=float-cast-overflow -fno-sanitize=null -fno-sanitize=alignment -g3")
|
||||||
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror -Wno-literal-suffix -Werror=return-type -fPIC -gdwarf-2 -fsanitize=address -fsanitize=undefined -fsanitize-recover=all -fsanitize=float-divide-by-zero -fsanitize=float-cast-overflow -fno-sanitize=null -fno-sanitize=alignment -static-libasan -g3")
|
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror -Wno-literal-suffix -Werror=return-type -fPIC -gdwarf-2 -fsanitize=address -fsanitize=undefined -fsanitize-recover=all -fsanitize=float-divide-by-zero -fsanitize=float-cast-overflow -fno-sanitize=null -fno-sanitize=alignment -g3")
|
||||||
MESSAGE(STATUS "Will compile with Address Sanitizer!")
|
MESSAGE(STATUS "Will compile with Address Sanitizer!")
|
||||||
ELSE ()
|
ELSE ()
|
||||||
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -gdwarf-2 -g3")
|
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -gdwarf-2 -g3")
|
||||||
|
|
|
@ -82,9 +82,7 @@ int32_t create_stream() {
|
||||||
/*const char* sql = "select sum(k) from tu1 interval(10m)";*/
|
/*const char* sql = "select sum(k) from tu1 interval(10m)";*/
|
||||||
/*pRes = tmq_create_stream(pConn, "stream1", "out1", sql);*/
|
/*pRes = tmq_create_stream(pConn, "stream1", "out1", sql);*/
|
||||||
pRes = taos_query(
|
pRes = taos_query(
|
||||||
pConn,
|
pConn, "create stream stream1 trigger at_once into outstb as select _wstartts, sum(k) from tu1 interval(10m)");
|
||||||
"create stream stream1 trigger at_once into outstb as select _wstartts, min(k), max(k), sum(k) as sum_of_k "
|
|
||||||
"from tu1 interval(10m)");
|
|
||||||
if (taos_errno(pRes) != 0) {
|
if (taos_errno(pRes) != 0) {
|
||||||
printf("failed to create stream stream1, reason:%s\n", taos_errstr(pRes));
|
printf("failed to create stream stream1, reason:%s\n", taos_errstr(pRes));
|
||||||
return -1;
|
return -1;
|
||||||
|
|
|
@ -114,17 +114,12 @@ static FORCE_INLINE void streamDataSubmitRefDec(SStreamDataSubmit* pDataSubmit)
|
||||||
int32_t streamDataBlockEncode(void** buf, const SStreamDataBlock* pOutput);
|
int32_t streamDataBlockEncode(void** buf, const SStreamDataBlock* pOutput);
|
||||||
void* streamDataBlockDecode(const void* buf, SStreamDataBlock* pInput);
|
void* streamDataBlockDecode(const void* buf, SStreamDataBlock* pInput);
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
void* inputHandle;
|
|
||||||
void* executor;
|
|
||||||
} SStreamRunner;
|
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int8_t parallelizable;
|
int8_t parallelizable;
|
||||||
char* qmsg;
|
char* qmsg;
|
||||||
// followings are not applicable to encoder and decoder
|
// followings are not applicable to encoder and decoder
|
||||||
int8_t numOfRunners;
|
void* inputHandle;
|
||||||
SStreamRunner* runners;
|
void* executor;
|
||||||
} STaskExec;
|
} STaskExec;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
@ -320,17 +315,15 @@ int32_t streamEnqueueDataSubmit(SStreamTask* pTask, SStreamDataSubmit* input);
|
||||||
int32_t streamEnqueueDataBlk(SStreamTask* pTask, SStreamDataBlock* input);
|
int32_t streamEnqueueDataBlk(SStreamTask* pTask, SStreamDataBlock* input);
|
||||||
int32_t streamDequeueOutput(SStreamTask* pTask, void** output);
|
int32_t streamDequeueOutput(SStreamTask* pTask, void** output);
|
||||||
|
|
||||||
int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, int32_t inputType, int32_t workId);
|
|
||||||
|
|
||||||
int32_t streamTaskRun(SStreamTask* pTask);
|
int32_t streamTaskRun(SStreamTask* pTask);
|
||||||
|
|
||||||
int32_t streamTaskHandleInput(SStreamTask* pTask, void* data);
|
int32_t streamTaskHandleInput(SStreamTask* pTask, void* data);
|
||||||
|
|
||||||
int32_t streamTaskProcessRunReq(SStreamTask* pTask, SMsgCb* pMsgCb);
|
int32_t streamTaskProcessRunReq(SStreamTask* pTask, SMsgCb* pMsgCb);
|
||||||
int32_t streamTaskProcessDispatchReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDispatchReq* pReq, SRpcMsg* pMsg);
|
int32_t streamProcessDispatchReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDispatchReq* pReq, SRpcMsg* pMsg);
|
||||||
int32_t streamTaskProcessDispatchRsp(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDispatchRsp* pRsp);
|
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDispatchRsp* pRsp);
|
||||||
int32_t streamTaskProcessRecoverReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamTaskRecoverReq* pReq, SRpcMsg* pMsg);
|
int32_t streamProcessRecoverReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamTaskRecoverReq* pReq, SRpcMsg* pMsg);
|
||||||
int32_t streamTaskProcessRecoverRsp(SStreamTask* pTask, SStreamTaskRecoverRsp* pRsp);
|
int32_t streamProcessRecoverRsp(SStreamTask* pTask, SStreamTaskRecoverRsp* pRsp);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -57,9 +57,7 @@ void sndMetaDelete(SStreamMeta *pMeta) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t sndMetaDeployTask(SStreamMeta *pMeta, SStreamTask *pTask) {
|
int32_t sndMetaDeployTask(SStreamMeta *pMeta, SStreamTask *pTask) {
|
||||||
for (int i = 0; i < pTask->exec.numOfRunners; i++) {
|
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, NULL);
|
||||||
pTask->exec.runners[i].executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, NULL);
|
|
||||||
}
|
|
||||||
return taosHashPut(pMeta->pHash, &pTask->taskId, sizeof(int32_t), pTask, sizeof(void *));
|
return taosHashPut(pMeta->pHash, &pTask->taskId, sizeof(int32_t), pTask, sizeof(void *));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -173,8 +173,7 @@ typedef struct {
|
||||||
} STqExec;
|
} STqExec;
|
||||||
|
|
||||||
struct STQ {
|
struct STQ {
|
||||||
char* path;
|
char* path;
|
||||||
// STqMetaStore* tqMeta;
|
|
||||||
SHashObj* pushMgr; // consumerId -> STqExec*
|
SHashObj* pushMgr; // consumerId -> STqExec*
|
||||||
SHashObj* execs; // subKey -> STqExec
|
SHashObj* execs; // subKey -> STqExec
|
||||||
SHashObj* pStreamTasks;
|
SHashObj* pStreamTasks;
|
||||||
|
|
|
@ -123,11 +123,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen);
|
||||||
int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen);
|
int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen);
|
||||||
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId);
|
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId);
|
||||||
int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen);
|
int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen);
|
||||||
#if 0
|
int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* data);
|
||||||
int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen, int32_t workerId);
|
|
||||||
int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen, int32_t workerId);
|
|
||||||
#endif
|
|
||||||
int32_t tqProcessStreamTriggerNew(STQ* pTq, SSubmitReq* data);
|
|
||||||
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg);
|
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg);
|
||||||
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg);
|
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg);
|
||||||
int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg);
|
int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg);
|
||||||
|
|
|
@ -36,15 +36,6 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) {
|
||||||
/*ASSERT(0);*/
|
/*ASSERT(0);*/
|
||||||
/*}*/
|
/*}*/
|
||||||
|
|
||||||
#if 0
|
|
||||||
pTq->tqMeta = tqStoreOpen(pTq, path, (FTqSerialize)tqSerializeConsumer, (FTqDeserialize)tqDeserializeConsumer,
|
|
||||||
(FTqDelete)taosMemoryFree, 0);
|
|
||||||
if (pTq->tqMeta == NULL) {
|
|
||||||
taosMemoryFree(pTq);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
pTq->execs = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
|
pTq->execs = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
|
||||||
|
|
||||||
pTq->pStreamTasks = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
|
pTq->pStreamTasks = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
|
||||||
|
@ -65,48 +56,6 @@ void tqClose(STQ* pTq) {
|
||||||
// TODO
|
// TODO
|
||||||
}
|
}
|
||||||
|
|
||||||
static void tdSRowDemo() {
|
|
||||||
#define DEMO_N_COLS 3
|
|
||||||
|
|
||||||
int16_t schemaVersion = 0;
|
|
||||||
int32_t numOfCols = DEMO_N_COLS; // ts + int
|
|
||||||
SRowBuilder rb = {0};
|
|
||||||
|
|
||||||
SSchema schema[DEMO_N_COLS] = {
|
|
||||||
{.type = TSDB_DATA_TYPE_TIMESTAMP, .colId = 1, .name = "ts", .bytes = 8, .flags = COL_SMA_ON},
|
|
||||||
{.type = TSDB_DATA_TYPE_INT, .colId = 2, .name = "c1", .bytes = 4, .flags = COL_SMA_ON},
|
|
||||||
{.type = TSDB_DATA_TYPE_INT, .colId = 3, .name = "c2", .bytes = 4, .flags = COL_SMA_ON}};
|
|
||||||
|
|
||||||
SSchema* pSchema = schema;
|
|
||||||
STSchema* pTSChema = tdGetSTSChemaFromSSChema(&pSchema, numOfCols);
|
|
||||||
|
|
||||||
tdSRowInit(&rb, schemaVersion);
|
|
||||||
tdSRowSetTpInfo(&rb, numOfCols, pTSChema->flen);
|
|
||||||
int32_t maxLen = TD_ROW_MAX_BYTES_FROM_SCHEMA(pTSChema);
|
|
||||||
void* row = taosMemoryCalloc(1, maxLen); // make sure the buffer is enough
|
|
||||||
|
|
||||||
// set row buf
|
|
||||||
tdSRowResetBuf(&rb, row);
|
|
||||||
|
|
||||||
for (int32_t idx = 0; idx < pTSChema->numOfCols; ++idx) {
|
|
||||||
STColumn* pColumn = pTSChema->columns + idx;
|
|
||||||
if (idx == 0) {
|
|
||||||
int64_t tsKey = 1651234567;
|
|
||||||
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, &tsKey, true, pColumn->offset, idx);
|
|
||||||
} else if (idx == 1) {
|
|
||||||
int32_t val1 = 10;
|
|
||||||
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, &val1, true, pColumn->offset, idx);
|
|
||||||
} else {
|
|
||||||
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NONE, NULL, true, pColumn->offset, idx);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// print
|
|
||||||
tdSRowPrint(row, pTSChema, __func__);
|
|
||||||
|
|
||||||
taosMemoryFree(pTSChema);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
|
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
|
||||||
void* pIter = NULL;
|
void* pIter = NULL;
|
||||||
while (1) {
|
while (1) {
|
||||||
|
@ -261,33 +210,21 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_
|
||||||
}
|
}
|
||||||
|
|
||||||
int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) {
|
int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) {
|
||||||
if (msgType != TDMT_VND_SUBMIT) return 0;
|
if (msgType == TDMT_VND_SUBMIT) {
|
||||||
|
if (taosHashGetSize(pTq->pStreamTasks) == 0) return 0;
|
||||||
|
|
||||||
// make sure msgType == TDMT_VND_SUBMIT
|
if (tdUpdateExpireWindow(pTq->pVnode->pSma, msg, ver) != 0) {
|
||||||
if (tdUpdateExpireWindow(pTq->pVnode->pSma, msg, ver) != 0) {
|
// TODO error handle
|
||||||
return -1;
|
}
|
||||||
|
void* data = taosMemoryMalloc(msgLen);
|
||||||
|
if (data == NULL) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
memcpy(data, msg, msgLen);
|
||||||
|
|
||||||
|
tqProcessStreamTrigger(pTq, data);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (taosHashGetSize(pTq->pStreamTasks) == 0) return 0;
|
|
||||||
|
|
||||||
void* data = taosMemoryMalloc(msgLen);
|
|
||||||
if (data == NULL) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
memcpy(data, msg, msgLen);
|
|
||||||
|
|
||||||
tqProcessStreamTriggerNew(pTq, data);
|
|
||||||
|
|
||||||
#if 0
|
|
||||||
SRpcMsg req = {
|
|
||||||
.msgType = TDMT_VND_STREAM_TRIGGER,
|
|
||||||
.pCont = data,
|
|
||||||
.contLen = msgLen,
|
|
||||||
};
|
|
||||||
|
|
||||||
tmsgPutToQueue(&pTq->pVnode->msgCb, FETCH_QUEUE, &req);
|
|
||||||
#endif
|
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -685,213 +622,6 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
#if 0
|
|
||||||
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
|
||||||
SMqPollReq* pReq = pMsg->pCont;
|
|
||||||
int64_t consumerId = pReq->consumerId;
|
|
||||||
int64_t fetchOffset;
|
|
||||||
int64_t blockingTime = pReq->blockingTime;
|
|
||||||
int32_t reqEpoch = pReq->epoch;
|
|
||||||
|
|
||||||
if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__EARLIEAST) {
|
|
||||||
fetchOffset = walGetFirstVer(pTq->pWal);
|
|
||||||
} else if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__LATEST) {
|
|
||||||
fetchOffset = walGetLastVer(pTq->pWal);
|
|
||||||
} else {
|
|
||||||
fetchOffset = pReq->currentOffset + 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
tqDebug("tmq poll: consumer %ld (epoch %d) recv poll req in vg %d, req %ld %ld", consumerId, pReq->epoch,
|
|
||||||
TD_VID(pTq->pVnode), pReq->currentOffset, fetchOffset);
|
|
||||||
|
|
||||||
SMqPollRspV2 rspV2 = {0};
|
|
||||||
rspV2.dataLen = 0;
|
|
||||||
|
|
||||||
STqConsumer* pConsumer = tqHandleGet(pTq->tqMeta, consumerId);
|
|
||||||
if (pConsumer == NULL) {
|
|
||||||
vWarn("tmq poll: consumer %ld (epoch %d) not found in vg %d", consumerId, pReq->epoch, TD_VID(pTq->pVnode));
|
|
||||||
pMsg->pCont = NULL;
|
|
||||||
pMsg->contLen = 0;
|
|
||||||
pMsg->code = -1;
|
|
||||||
tmsgSendRsp(pMsg);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t consumerEpoch = atomic_load_32(&pConsumer->epoch);
|
|
||||||
while (consumerEpoch < reqEpoch) {
|
|
||||||
consumerEpoch = atomic_val_compare_exchange_32(&pConsumer->epoch, consumerEpoch, reqEpoch);
|
|
||||||
}
|
|
||||||
|
|
||||||
STqTopic* pTopic = NULL;
|
|
||||||
int32_t topicSz = taosArrayGetSize(pConsumer->topics);
|
|
||||||
for (int32_t i = 0; i < topicSz; i++) {
|
|
||||||
STqTopic* topic = taosArrayGet(pConsumer->topics, i);
|
|
||||||
// TODO race condition
|
|
||||||
ASSERT(pConsumer->consumerId == consumerId);
|
|
||||||
if (strcmp(topic->topicName, pReq->topic) == 0) {
|
|
||||||
pTopic = topic;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (pTopic == NULL) {
|
|
||||||
vWarn("tmq poll: consumer %ld (epoch %d) topic %s not found in vg %d", consumerId, pReq->epoch, pReq->topic,
|
|
||||||
TD_VID(pTq->pVnode));
|
|
||||||
pMsg->pCont = NULL;
|
|
||||||
pMsg->contLen = 0;
|
|
||||||
pMsg->code = -1;
|
|
||||||
tmsgSendRsp(pMsg);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
tqDebug("poll topic %s from consumer %ld (epoch %d) vg %d", pTopic->topicName, consumerId, pReq->epoch,
|
|
||||||
TD_VID(pTq->pVnode));
|
|
||||||
|
|
||||||
rspV2.reqOffset = pReq->currentOffset;
|
|
||||||
rspV2.skipLogNum = 0;
|
|
||||||
|
|
||||||
while (1) {
|
|
||||||
/*if (fetchOffset > walGetLastVer(pTq->pWal) || walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) {*/
|
|
||||||
// TODO
|
|
||||||
consumerEpoch = atomic_load_32(&pConsumer->epoch);
|
|
||||||
if (consumerEpoch > reqEpoch) {
|
|
||||||
tqDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, found new consumer epoch %d discard req epoch %d",
|
|
||||||
consumerId, pReq->epoch, TD_VID(pTq->pVnode), fetchOffset, consumerEpoch, reqEpoch);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
SWalReadHead* pHead;
|
|
||||||
if (walReadWithHandle_s(pTopic->pReadhandle, fetchOffset, &pHead) < 0) {
|
|
||||||
// TODO: no more log, set timer to wait blocking time
|
|
||||||
// if data inserted during waiting, launch query and
|
|
||||||
// response to user
|
|
||||||
tqDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, no more log to return", consumerId, pReq->epoch,
|
|
||||||
TD_VID(pTq->pVnode), fetchOffset);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
tqDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d offset %ld msgType %d", consumerId, pReq->epoch,
|
|
||||||
TD_VID(pTq->pVnode), fetchOffset, pHead->msgType);
|
|
||||||
/*int8_t pos = fetchOffset % TQ_BUFFER_SIZE;*/
|
|
||||||
/*pHead = pTopic->pReadhandle->pHead;*/
|
|
||||||
if (pHead->msgType == TDMT_VND_SUBMIT) {
|
|
||||||
SSubmitReq* pCont = (SSubmitReq*)&pHead->body;
|
|
||||||
qTaskInfo_t task = pTopic->buffer.output[workerId].task;
|
|
||||||
ASSERT(task);
|
|
||||||
qSetStreamInput(task, pCont, STREAM_DATA_TYPE_SUBMIT_BLOCK);
|
|
||||||
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
|
|
||||||
while (1) {
|
|
||||||
SSDataBlock* pDataBlock = NULL;
|
|
||||||
uint64_t ts;
|
|
||||||
if (qExecTask(task, &pDataBlock, &ts) < 0) {
|
|
||||||
ASSERT(false);
|
|
||||||
}
|
|
||||||
if (pDataBlock == NULL) {
|
|
||||||
/*pos = fetchOffset % TQ_BUFFER_SIZE;*/
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
taosArrayPush(pRes, pDataBlock);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (taosArrayGetSize(pRes) == 0) {
|
|
||||||
tqDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d skip log %ld since not wanted", consumerId,
|
|
||||||
pReq->epoch, TD_VID(pTq->pVnode), fetchOffset);
|
|
||||||
fetchOffset++;
|
|
||||||
rspV2.skipLogNum++;
|
|
||||||
taosArrayDestroy(pRes);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
rspV2.rspOffset = fetchOffset;
|
|
||||||
|
|
||||||
int32_t blockSz = taosArrayGetSize(pRes);
|
|
||||||
int32_t dataBlockStrLen = 0;
|
|
||||||
for (int32_t i = 0; i < blockSz; i++) {
|
|
||||||
SSDataBlock* pBlock = taosArrayGet(pRes, i);
|
|
||||||
dataBlockStrLen += sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock);
|
|
||||||
}
|
|
||||||
|
|
||||||
void* dataBlockBuf = taosMemoryMalloc(dataBlockStrLen);
|
|
||||||
if (dataBlockBuf == NULL) {
|
|
||||||
pMsg->code = -1;
|
|
||||||
taosMemoryFree(pHead);
|
|
||||||
}
|
|
||||||
|
|
||||||
rspV2.blockData = dataBlockBuf;
|
|
||||||
|
|
||||||
int32_t pos;
|
|
||||||
rspV2.blockPos = taosArrayInit(blockSz, sizeof(int32_t));
|
|
||||||
for (int32_t i = 0; i < blockSz; i++) {
|
|
||||||
pos = 0;
|
|
||||||
SSDataBlock* pBlock = taosArrayGet(pRes, i);
|
|
||||||
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)dataBlockBuf;
|
|
||||||
pRetrieve->useconds = 0;
|
|
||||||
pRetrieve->precision = 0;
|
|
||||||
pRetrieve->compressed = 0;
|
|
||||||
pRetrieve->completed = 1;
|
|
||||||
pRetrieve->numOfRows = htonl(pBlock->info.rows);
|
|
||||||
blockCompressEncode(pBlock, pRetrieve->data, &pos, pBlock->info.numOfCols, false);
|
|
||||||
taosArrayPush(rspV2.blockPos, &rspV2.dataLen);
|
|
||||||
|
|
||||||
int32_t totLen = sizeof(SRetrieveTableRsp) + pos;
|
|
||||||
pRetrieve->compLen = htonl(totLen);
|
|
||||||
rspV2.dataLen += totLen;
|
|
||||||
dataBlockBuf = POINTER_SHIFT(dataBlockBuf, totLen);
|
|
||||||
}
|
|
||||||
ASSERT(POINTER_DISTANCE(dataBlockBuf, rspV2.blockData) <= dataBlockStrLen);
|
|
||||||
|
|
||||||
int32_t msgLen = sizeof(SMqRspHead) + tEncodeSMqPollRspV2(NULL, &rspV2);
|
|
||||||
void* buf = rpcMallocCont(msgLen);
|
|
||||||
|
|
||||||
((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
|
|
||||||
((SMqRspHead*)buf)->epoch = pReq->epoch;
|
|
||||||
((SMqRspHead*)buf)->consumerId = consumerId;
|
|
||||||
|
|
||||||
void* msgBodyBuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
|
|
||||||
tEncodeSMqPollRspV2(&msgBodyBuf, &rspV2);
|
|
||||||
|
|
||||||
/*rsp.pBlockData = pRes;*/
|
|
||||||
|
|
||||||
/*taosArrayDestroyEx(rsp.pBlockData, (void (*)(void*))tDeleteSSDataBlock);*/
|
|
||||||
SRpcMsg resp = {.info = pMsg->info, pCont = buf, .contLen = msgLen, .code = 0};
|
|
||||||
tqDebug("vg %d offset %ld msgType %d from consumer %ld (epoch %d) actual rsp", TD_VID(pTq->pVnode), fetchOffset,
|
|
||||||
pHead->msgType, consumerId, pReq->epoch);
|
|
||||||
tmsgSendRsp(&resp);
|
|
||||||
taosMemoryFree(pHead);
|
|
||||||
return 0;
|
|
||||||
} else {
|
|
||||||
taosMemoryFree(pHead);
|
|
||||||
fetchOffset++;
|
|
||||||
rspV2.skipLogNum++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/*if (blockingTime != 0) {*/
|
|
||||||
/*tqAddClientPusher(pTq->tqPushMgr, pMsg, consumerId, blockingTime);*/
|
|
||||||
/*} else {*/
|
|
||||||
|
|
||||||
rspV2.rspOffset = fetchOffset - 1;
|
|
||||||
|
|
||||||
int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqPollRspV2(NULL, &rspV2);
|
|
||||||
void* buf = rpcMallocCont(tlen);
|
|
||||||
if (buf == NULL) {
|
|
||||||
pMsg->code = -1;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
|
|
||||||
((SMqRspHead*)buf)->epoch = pReq->epoch;
|
|
||||||
((SMqRspHead*)buf)->consumerId = consumerId;
|
|
||||||
|
|
||||||
void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
|
|
||||||
tEncodeSMqPollRspV2(&abuf, &rspV2);
|
|
||||||
|
|
||||||
SRpcMsg resp = {.info = pMsg->info, .pCont = buf, .contLen = tlen, .code = 0};
|
|
||||||
tmsgSendRsp(&resp);
|
|
||||||
tqDebug("vg %d offset %ld from consumer %ld (epoch %d) not rsp", TD_VID(pTq->pVnode), fetchOffset, consumerId,
|
|
||||||
pReq->epoch);
|
|
||||||
/*}*/
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen) {
|
int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen) {
|
||||||
SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
|
SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
|
||||||
|
|
||||||
|
@ -981,55 +711,6 @@ void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
|
||||||
ASSERT(tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) == 0);
|
ASSERT(tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) == 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int32_t parallel) {
|
|
||||||
pTask->status = TASK_STATUS__IDLE;
|
|
||||||
pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
|
|
||||||
pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL;
|
|
||||||
|
|
||||||
pTask->inputQ = taosOpenQueue();
|
|
||||||
pTask->outputQ = taosOpenQueue();
|
|
||||||
pTask->inputQAll = taosAllocateQall();
|
|
||||||
pTask->outputQAll = taosAllocateQall();
|
|
||||||
|
|
||||||
if (pTask->inputQ == NULL || pTask->outputQ == NULL || pTask->inputQAll == NULL || pTask->outputQAll == NULL)
|
|
||||||
goto FAIL;
|
|
||||||
|
|
||||||
if (pTask->execType != TASK_EXEC__NONE) {
|
|
||||||
// expand runners
|
|
||||||
pTask->exec.numOfRunners = parallel;
|
|
||||||
pTask->exec.runners = taosMemoryCalloc(parallel, sizeof(SStreamRunner));
|
|
||||||
if (pTask->exec.runners == NULL) {
|
|
||||||
goto FAIL;
|
|
||||||
}
|
|
||||||
for (int32_t i = 0; i < parallel; i++) {
|
|
||||||
STqReadHandle* pStreamReader = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);
|
|
||||||
SReadHandle handle = {
|
|
||||||
.reader = pStreamReader,
|
|
||||||
.meta = pTq->pVnode->pMeta,
|
|
||||||
.pMsgCb = &pTq->pVnode->msgCb,
|
|
||||||
.vnode = pTq->pVnode,
|
|
||||||
};
|
|
||||||
pTask->exec.runners[i].inputHandle = pStreamReader;
|
|
||||||
pTask->exec.runners[i].executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle);
|
|
||||||
ASSERT(pTask->exec.runners[i].executor);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pTask->sinkType == TASK_SINK__TABLE) {
|
|
||||||
pTask->tbSink.vnode = pTq->pVnode;
|
|
||||||
pTask->tbSink.tbSinkFunc = tqTableSink;
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
FAIL:
|
|
||||||
if (pTask->inputQ) taosCloseQueue(pTask->inputQ);
|
|
||||||
if (pTask->outputQ) taosCloseQueue(pTask->outputQ);
|
|
||||||
if (pTask->inputQAll) taosFreeQall(pTask->inputQAll);
|
|
||||||
if (pTask->outputQAll) taosFreeQall(pTask->outputQAll);
|
|
||||||
if (pTask) taosMemoryFree(pTask);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) {
|
int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) {
|
||||||
SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
|
SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
|
||||||
if (pTask == NULL) {
|
if (pTask == NULL) {
|
||||||
|
@ -1042,9 +723,31 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) {
|
||||||
}
|
}
|
||||||
tDecoderClear(&decoder);
|
tDecoderClear(&decoder);
|
||||||
|
|
||||||
|
pTask->status = TASK_STATUS__IDLE;
|
||||||
|
pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
|
||||||
|
pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL;
|
||||||
|
|
||||||
|
pTask->inputQ = taosOpenQueue();
|
||||||
|
pTask->outputQ = taosOpenQueue();
|
||||||
|
pTask->inputQAll = taosAllocateQall();
|
||||||
|
pTask->outputQAll = taosAllocateQall();
|
||||||
|
|
||||||
|
if (pTask->inputQ == NULL || pTask->outputQ == NULL || pTask->inputQAll == NULL || pTask->outputQAll == NULL)
|
||||||
|
goto FAIL;
|
||||||
|
|
||||||
// exec
|
// exec
|
||||||
if (tqExpandTask(pTq, pTask, 4) < 0) {
|
if (pTask->execType != TASK_EXEC__NONE) {
|
||||||
ASSERT(0);
|
// expand runners
|
||||||
|
STqReadHandle* pStreamReader = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);
|
||||||
|
SReadHandle handle = {
|
||||||
|
.reader = pStreamReader,
|
||||||
|
.meta = pTq->pVnode->pMeta,
|
||||||
|
.pMsgCb = &pTq->pVnode->msgCb,
|
||||||
|
.vnode = pTq->pVnode,
|
||||||
|
};
|
||||||
|
pTask->exec.inputHandle = pStreamReader;
|
||||||
|
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle);
|
||||||
|
ASSERT(pTask->exec.executor);
|
||||||
}
|
}
|
||||||
|
|
||||||
// sink
|
// sink
|
||||||
|
@ -1052,8 +755,12 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) {
|
||||||
if (pTask->sinkType == TASK_SINK__SMA) {
|
if (pTask->sinkType == TASK_SINK__SMA) {
|
||||||
pTask->smaSink.smaSink = smaHandleRes;
|
pTask->smaSink.smaSink = smaHandleRes;
|
||||||
} else if (pTask->sinkType == TASK_SINK__TABLE) {
|
} else if (pTask->sinkType == TASK_SINK__TABLE) {
|
||||||
|
pTask->tbSink.vnode = pTq->pVnode;
|
||||||
|
pTask->tbSink.tbSinkFunc = tqTableSink;
|
||||||
|
|
||||||
ASSERT(pTask->tbSink.pSchemaWrapper);
|
ASSERT(pTask->tbSink.pSchemaWrapper);
|
||||||
ASSERT(pTask->tbSink.pSchemaWrapper->pSchema);
|
ASSERT(pTask->tbSink.pSchemaWrapper->pSchema);
|
||||||
|
|
||||||
pTask->tbSink.pTSchema =
|
pTask->tbSink.pTSchema =
|
||||||
tdGetSTSChemaFromSSChema(&pTask->tbSink.pSchemaWrapper->pSchema, pTask->tbSink.pSchemaWrapper->nCols);
|
tdGetSTSChemaFromSSChema(&pTask->tbSink.pSchemaWrapper->pSchema, pTask->tbSink.pSchemaWrapper->nCols);
|
||||||
ASSERT(pTask->tbSink.pTSchema);
|
ASSERT(pTask->tbSink.pTSchema);
|
||||||
|
@ -1061,94 +768,17 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) {
|
||||||
|
|
||||||
taosHashPut(pTq->pStreamTasks, &pTask->taskId, sizeof(int32_t), pTask, sizeof(SStreamTask));
|
taosHashPut(pTq->pStreamTasks, &pTask->taskId, sizeof(int32_t), pTask, sizeof(SStreamTask));
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen, int32_t workerId) {
|
|
||||||
void* pIter = NULL;
|
|
||||||
|
|
||||||
while (1) {
|
|
||||||
pIter = taosHashIterate(pTq->pStreamTasks, pIter);
|
|
||||||
if (pIter == NULL) break;
|
|
||||||
SStreamTask* pTask = (SStreamTask*)pIter;
|
|
||||||
|
|
||||||
if (streamExecTask(pTask, &pTq->pVnode->msgCb, data, STREAM_DATA_TYPE_SUBMIT_BLOCK, workerId) < 0) {
|
|
||||||
// TODO
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
#if 0
|
|
||||||
int32_t tqProcessStreamTriggerNew(STQ* pTq, SSubmitReq* data) {
|
|
||||||
SStreamDataSubmit* pSubmit = NULL;
|
|
||||||
|
|
||||||
// build data
|
|
||||||
pSubmit = taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM);
|
|
||||||
if (pSubmit == NULL) return -1;
|
|
||||||
pSubmit->dataRef = taosMemoryMalloc(sizeof(int32_t));
|
|
||||||
if (pSubmit->dataRef == NULL) goto FAIL;
|
|
||||||
*pSubmit->dataRef = 1;
|
|
||||||
pSubmit->data = data;
|
|
||||||
pSubmit->type = STREAM_INPUT__DATA_BLOCK;
|
|
||||||
|
|
||||||
void* pIter = NULL;
|
|
||||||
while (1) {
|
|
||||||
pIter = taosHashIterate(pTq->pStreamTasks, pIter);
|
|
||||||
if (pIter == NULL) break;
|
|
||||||
SStreamTask* pTask = (SStreamTask*)pIter;
|
|
||||||
if (pTask->inputType == TASK_INPUT_TYPE__SUMBIT_BLOCK) {
|
|
||||||
streamEnqueueDataSubmit(pTask, pSubmit);
|
|
||||||
// TODO cal back pressure
|
|
||||||
}
|
|
||||||
// check run
|
|
||||||
int8_t execStatus = atomic_load_8(&pTask->status);
|
|
||||||
if (execStatus == TASK_STATUS__IDLE || execStatus == TASK_STATUS__CLOSING) {
|
|
||||||
SStreamTaskRunReq* pReq = taosMemoryMalloc(sizeof(SStreamTaskRunReq));
|
|
||||||
if (pReq == NULL) continue;
|
|
||||||
// TODO: do we need htonl?
|
|
||||||
pReq->head.vgId = pTq->pVnode->config.vgId;
|
|
||||||
pReq->streamId = pTask->streamId;
|
|
||||||
pReq->taskId = pTask->taskId;
|
|
||||||
SRpcMsg msg = {
|
|
||||||
.msgType = 0,
|
|
||||||
.pCont = pReq,
|
|
||||||
.contLen = sizeof(SStreamTaskRunReq),
|
|
||||||
};
|
|
||||||
tmsgPutToQueue(&pTq->pVnode->msgCb, FETCH_QUEUE, &msg);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
streamDataSubmitRefDec(pSubmit);
|
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
FAIL:
|
FAIL:
|
||||||
if (pSubmit) {
|
if (pTask->inputQ) taosCloseQueue(pTask->inputQ);
|
||||||
if (pSubmit->dataRef) {
|
if (pTask->outputQ) taosCloseQueue(pTask->outputQ);
|
||||||
taosMemoryFree(pSubmit->dataRef);
|
if (pTask->inputQAll) taosFreeQall(pTask->inputQAll);
|
||||||
}
|
if (pTask->outputQAll) taosFreeQall(pTask->outputQAll);
|
||||||
taosFreeQitem(pSubmit);
|
if (pTask) taosMemoryFree(pTask);
|
||||||
}
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
#endif
|
|
||||||
|
|
||||||
int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen, int32_t workerId) {
|
int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq) {
|
||||||
SStreamTaskExecReq req;
|
|
||||||
tDecodeSStreamTaskExecReq(msg, &req);
|
|
||||||
|
|
||||||
int32_t taskId = req.taskId;
|
|
||||||
ASSERT(taskId);
|
|
||||||
|
|
||||||
SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
|
|
||||||
ASSERT(pTask);
|
|
||||||
|
|
||||||
if (streamExecTask(pTask, &pTq->pVnode->msgCb, req.data, STREAM_DATA_TYPE_SSDATA_BLOCK, workerId) < 0) {
|
|
||||||
// TODO
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tqProcessStreamTriggerNew(STQ* pTq, SSubmitReq* pReq) {
|
|
||||||
void* pIter = NULL;
|
void* pIter = NULL;
|
||||||
bool failed = false;
|
bool failed = false;
|
||||||
|
|
||||||
|
@ -1234,7 +864,7 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
SStreamDispatchReq* pReq = pMsg->pCont;
|
SStreamDispatchReq* pReq = pMsg->pCont;
|
||||||
int32_t taskId = pReq->taskId;
|
int32_t taskId = pReq->taskId;
|
||||||
SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
|
SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
|
||||||
streamTaskProcessDispatchReq(pTask, &pTq->pVnode->msgCb, pReq, pMsg);
|
streamProcessDispatchReq(pTask, &pTq->pVnode->msgCb, pReq, pMsg);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1242,7 +872,7 @@ int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
SStreamTaskRecoverReq* pReq = pMsg->pCont;
|
SStreamTaskRecoverReq* pReq = pMsg->pCont;
|
||||||
int32_t taskId = pReq->taskId;
|
int32_t taskId = pReq->taskId;
|
||||||
SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
|
SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
|
||||||
streamTaskProcessRecoverReq(pTask, &pTq->pVnode->msgCb, pReq, pMsg);
|
streamProcessRecoverReq(pTask, &pTq->pVnode->msgCb, pReq, pMsg);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1250,7 +880,7 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
SStreamDispatchRsp* pRsp = pMsg->pCont;
|
SStreamDispatchRsp* pRsp = pMsg->pCont;
|
||||||
int32_t taskId = pRsp->taskId;
|
int32_t taskId = pRsp->taskId;
|
||||||
SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
|
SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
|
||||||
streamTaskProcessDispatchRsp(pTask, &pTq->pVnode->msgCb, pRsp);
|
streamProcessDispatchRsp(pTask, &pTq->pVnode->msgCb, pRsp);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1258,6 +888,6 @@ int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
SStreamTaskRecoverRsp* pRsp = pMsg->pCont;
|
SStreamTaskRecoverRsp* pRsp = pMsg->pCont;
|
||||||
int32_t taskId = pRsp->taskId;
|
int32_t taskId = pRsp->taskId;
|
||||||
SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
|
SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
|
||||||
streamTaskProcessRecoverRsp(pTask, pRsp);
|
streamProcessRecoverRsp(pTask, pRsp);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -142,13 +142,6 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg
|
||||||
pMsg->contLen - sizeof(SMsgHead)) < 0) {
|
pMsg->contLen - sizeof(SMsgHead)) < 0) {
|
||||||
}
|
}
|
||||||
} break;
|
} break;
|
||||||
#if 0
|
|
||||||
case TDMT_VND_TASK_WRITE_EXEC: {
|
|
||||||
if (tqProcessTaskExec(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), pMsg->contLen - sizeof(SMsgHead),
|
|
||||||
0) < 0) {
|
|
||||||
}
|
|
||||||
} break;
|
|
||||||
#endif
|
|
||||||
case TDMT_VND_ALTER_VNODE:
|
case TDMT_VND_ALTER_VNODE:
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
|
@ -231,17 +224,6 @@ int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
|
||||||
case TDMT_VND_TASK_RECOVER_RSP:
|
case TDMT_VND_TASK_RECOVER_RSP:
|
||||||
return tqProcessTaskRecoverRsp(pVnode->pTq, pMsg);
|
return tqProcessTaskRecoverRsp(pVnode->pTq, pMsg);
|
||||||
|
|
||||||
#if 0
|
|
||||||
case TDMT_VND_TASK_PIPE_EXEC:
|
|
||||||
case TDMT_VND_TASK_MERGE_EXEC:
|
|
||||||
return tqProcessTaskExec(pVnode->pTq, msgstr, msgLen, 0);
|
|
||||||
case TDMT_VND_STREAM_TRIGGER:{
|
|
||||||
// refactor, avoid double free
|
|
||||||
int code = tqProcessStreamTrigger(pVnode->pTq, pMsg->pCont, pMsg->contLen, 0);
|
|
||||||
pMsg->pCont = NULL;
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
case TDMT_VND_QUERY_HEARTBEAT:
|
case TDMT_VND_QUERY_HEARTBEAT:
|
||||||
return qWorkerProcessHbMsg(pVnode, pVnode->pQuery, pMsg);
|
return qWorkerProcessHbMsg(pVnode, pVnode->pQuery, pMsg);
|
||||||
default:
|
default:
|
||||||
|
|
|
@ -134,7 +134,7 @@ int32_t streamEnqueueDataBlk(SStreamTask* pTask, SStreamDataBlock* input) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes) {
|
static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes) {
|
||||||
void* exec = pTask->exec.runners[0].executor;
|
void* exec = pTask->exec.executor;
|
||||||
|
|
||||||
// set input
|
// set input
|
||||||
if (pTask->inputType == STREAM_INPUT__DATA_SUBMIT) {
|
if (pTask->inputType == STREAM_INPUT__DATA_SUBMIT) {
|
||||||
|
@ -171,12 +171,12 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes)
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: handle version
|
// TODO: handle version
|
||||||
int32_t streamTaskExec2(SStreamTask* pTask, SMsgCb* pMsgCb) {
|
int32_t streamExec(SStreamTask* pTask, SMsgCb* pMsgCb) {
|
||||||
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
|
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
|
||||||
if (pRes == NULL) return -1;
|
if (pRes == NULL) return -1;
|
||||||
while (1) {
|
while (1) {
|
||||||
int8_t execStatus = atomic_val_compare_exchange_8(&pTask->status, TASK_STATUS__IDLE, TASK_STATUS__EXECUTING);
|
int8_t execStatus = atomic_val_compare_exchange_8(&pTask->status, TASK_STATUS__IDLE, TASK_STATUS__EXECUTING);
|
||||||
void* exec = pTask->exec.runners[0].executor;
|
void* exec = pTask->exec.executor;
|
||||||
if (execStatus == TASK_STATUS__IDLE) {
|
if (execStatus == TASK_STATUS__IDLE) {
|
||||||
// first run, from qall, handle failure from last exec
|
// first run, from qall, handle failure from last exec
|
||||||
while (1) {
|
while (1) {
|
||||||
|
@ -278,7 +278,7 @@ FAIL:
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamTaskSink(SStreamTask* pTask, SMsgCb* pMsgCb) {
|
int32_t streamSink(SStreamTask* pTask, SMsgCb* pMsgCb) {
|
||||||
bool firstRun = 1;
|
bool firstRun = 1;
|
||||||
while (1) {
|
while (1) {
|
||||||
SStreamDataBlock* pBlock = NULL;
|
SStreamDataBlock* pBlock = NULL;
|
||||||
|
@ -407,7 +407,7 @@ int32_t streamTaskEnqueue(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg*
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamTaskProcessDispatchReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDispatchReq* pReq, SRpcMsg* pRsp) {
|
int32_t streamProcessDispatchReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDispatchReq* pReq, SRpcMsg* pRsp) {
|
||||||
// 1. handle input
|
// 1. handle input
|
||||||
streamTaskEnqueue(pTask, pReq, pRsp);
|
streamTaskEnqueue(pTask, pReq, pRsp);
|
||||||
|
|
||||||
|
@ -415,172 +415,42 @@ int32_t streamTaskProcessDispatchReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStream
|
||||||
// 2.1. idle: exec
|
// 2.1. idle: exec
|
||||||
// 2.2. executing: return
|
// 2.2. executing: return
|
||||||
// 2.3. closing: keep trying
|
// 2.3. closing: keep trying
|
||||||
streamTaskExec2(pTask, pMsgCb);
|
streamExec(pTask, pMsgCb);
|
||||||
|
|
||||||
// 3. handle output
|
// 3. handle output
|
||||||
// 3.1 check and set status
|
// 3.1 check and set status
|
||||||
// 3.2 dispatch / sink
|
// 3.2 dispatch / sink
|
||||||
streamTaskSink(pTask, pMsgCb);
|
streamSink(pTask, pMsgCb);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamTaskProcessDispatchRsp(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDispatchRsp* pRsp) {
|
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDispatchRsp* pRsp) {
|
||||||
atomic_store_8(&pTask->inputStatus, pRsp->inputStatus);
|
atomic_store_8(&pTask->inputStatus, pRsp->inputStatus);
|
||||||
if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) {
|
if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) {
|
||||||
// TODO: init recover timer
|
// TODO: init recover timer
|
||||||
}
|
}
|
||||||
// continue dispatch
|
// continue dispatch
|
||||||
streamTaskSink(pTask, pMsgCb);
|
streamSink(pTask, pMsgCb);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamTaskProcessRunReq(SStreamTask* pTask, SMsgCb* pMsgCb) {
|
int32_t streamTaskProcessRunReq(SStreamTask* pTask, SMsgCb* pMsgCb) {
|
||||||
streamTaskExec2(pTask, pMsgCb);
|
streamExec(pTask, pMsgCb);
|
||||||
streamTaskSink(pTask, pMsgCb);
|
streamSink(pTask, pMsgCb);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamTaskProcessRecoverReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamTaskRecoverReq* pReq, SRpcMsg* pMsg) {
|
int32_t streamProcessRecoverReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamTaskRecoverReq* pReq, SRpcMsg* pMsg) {
|
||||||
//
|
//
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamTaskProcessRecoverRsp(SStreamTask* pTask, SStreamTaskRecoverRsp* pRsp) {
|
int32_t streamProcessRecoverRsp(SStreamTask* pTask, SStreamTaskRecoverRsp* pRsp) {
|
||||||
//
|
//
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, int32_t inputType, int32_t workId) {
|
|
||||||
SArray* pRes = NULL;
|
|
||||||
// source
|
|
||||||
if (inputType == STREAM_DATA_TYPE_SUBMIT_BLOCK && pTask->sourceType != TASK_SOURCE__SCAN) return 0;
|
|
||||||
|
|
||||||
// exec
|
|
||||||
if (pTask->execType != TASK_EXEC__NONE) {
|
|
||||||
ASSERT(workId < pTask->exec.numOfRunners);
|
|
||||||
void* exec = pTask->exec.runners[workId].executor;
|
|
||||||
pRes = taosArrayInit(0, sizeof(SSDataBlock));
|
|
||||||
if (pRes == NULL) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
if (inputType == STREAM_DATA_TYPE_SUBMIT_BLOCK) {
|
|
||||||
qSetStreamInput(exec, input, inputType);
|
|
||||||
while (1) {
|
|
||||||
SSDataBlock* output;
|
|
||||||
uint64_t ts;
|
|
||||||
if (qExecTask(exec, &output, &ts) < 0) {
|
|
||||||
ASSERT(false);
|
|
||||||
}
|
|
||||||
if (output == NULL) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
taosArrayPush(pRes, output);
|
|
||||||
}
|
|
||||||
} else if (inputType == STREAM_DATA_TYPE_SSDATA_BLOCK) {
|
|
||||||
const SArray* blocks = (const SArray*)input;
|
|
||||||
/*int32_t sz = taosArrayGetSize(blocks);*/
|
|
||||||
/*for (int32_t i = 0; i < sz; i++) {*/
|
|
||||||
/*SSDataBlock* pBlock = taosArrayGet(blocks, i);*/
|
|
||||||
/*qSetStreamInput(exec, pBlock, inputType);*/
|
|
||||||
qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_DATA_TYPE_SSDATA_BLOCK);
|
|
||||||
while (1) {
|
|
||||||
SSDataBlock* output;
|
|
||||||
uint64_t ts;
|
|
||||||
if (qExecTask(exec, &output, &ts) < 0) {
|
|
||||||
ASSERT(false);
|
|
||||||
}
|
|
||||||
if (output == NULL) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
taosArrayPush(pRes, output);
|
|
||||||
}
|
|
||||||
/*}*/
|
|
||||||
} else {
|
|
||||||
ASSERT(0);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
ASSERT(inputType == STREAM_DATA_TYPE_SSDATA_BLOCK);
|
|
||||||
pRes = (SArray*)input;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pRes == NULL || taosArrayGetSize(pRes) == 0) return 0;
|
|
||||||
|
|
||||||
// sink
|
|
||||||
if (pTask->sinkType == TASK_SINK__TABLE) {
|
|
||||||
// blockDebugShowData(pRes);
|
|
||||||
pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pRes);
|
|
||||||
} else if (pTask->sinkType == TASK_SINK__SMA) {
|
|
||||||
pTask->smaSink.smaSink(pTask->ahandle, pTask->smaSink.smaId, pRes);
|
|
||||||
//
|
|
||||||
} else if (pTask->sinkType == TASK_SINK__FETCH) {
|
|
||||||
//
|
|
||||||
} else {
|
|
||||||
ASSERT(pTask->sinkType == TASK_SINK__NONE);
|
|
||||||
}
|
|
||||||
|
|
||||||
// dispatch
|
|
||||||
|
|
||||||
if (pTask->dispatchType == TASK_DISPATCH__INPLACE) {
|
|
||||||
SRpcMsg dispatchMsg = {0};
|
|
||||||
if (streamBuildExecMsg(pTask, pRes, &dispatchMsg, NULL) < 0) {
|
|
||||||
ASSERT(0);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t qType;
|
|
||||||
if (pTask->dispatchMsgType == TDMT_VND_TASK_PIPE_EXEC || pTask->dispatchMsgType == TDMT_SND_TASK_PIPE_EXEC) {
|
|
||||||
qType = FETCH_QUEUE;
|
|
||||||
} else if (pTask->dispatchMsgType == TDMT_VND_TASK_MERGE_EXEC ||
|
|
||||||
pTask->dispatchMsgType == TDMT_SND_TASK_MERGE_EXEC) {
|
|
||||||
qType = MERGE_QUEUE;
|
|
||||||
} else if (pTask->dispatchMsgType == TDMT_VND_TASK_WRITE_EXEC) {
|
|
||||||
qType = WRITE_QUEUE;
|
|
||||||
} else {
|
|
||||||
ASSERT(0);
|
|
||||||
}
|
|
||||||
tmsgPutToQueue(pMsgCb, qType, &dispatchMsg);
|
|
||||||
|
|
||||||
} else if (pTask->dispatchType == TASK_DISPATCH__FIXED) {
|
|
||||||
SRpcMsg dispatchMsg = {0};
|
|
||||||
SEpSet* pEpSet = NULL;
|
|
||||||
if (streamBuildExecMsg(pTask, pRes, &dispatchMsg, &pEpSet) < 0) {
|
|
||||||
ASSERT(0);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
tmsgSendReq(pEpSet, &dispatchMsg);
|
|
||||||
|
|
||||||
} else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) {
|
|
||||||
SHashObj* pShuffleRes = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
|
|
||||||
if (pShuffleRes == NULL) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t sz = taosArrayGetSize(pRes);
|
|
||||||
for (int32_t i = 0; i < sz; i++) {
|
|
||||||
SSDataBlock* pDataBlock = taosArrayGet(pRes, i);
|
|
||||||
SArray* pArray = taosHashGet(pShuffleRes, &pDataBlock->info.groupId, sizeof(int64_t));
|
|
||||||
if (pArray == NULL) {
|
|
||||||
pArray = taosArrayInit(0, sizeof(SSDataBlock));
|
|
||||||
if (pArray == NULL) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
taosHashPut(pShuffleRes, &pDataBlock->info.groupId, sizeof(int64_t), &pArray, sizeof(void*));
|
|
||||||
}
|
|
||||||
taosArrayPush(pArray, pDataBlock);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (streamShuffleDispatch(pTask, pMsgCb, pShuffleRes) < 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
} else {
|
|
||||||
ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE);
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tEncodeSStreamTaskExecReq(void** buf, const SStreamTaskExecReq* pReq) {
|
int32_t tEncodeSStreamTaskExecReq(void** buf, const SStreamTaskExecReq* pReq) {
|
||||||
int32_t tlen = 0;
|
int32_t tlen = 0;
|
||||||
tlen += taosEncodeFixedI64(buf, pReq->streamId);
|
tlen += taosEncodeFixedI64(buf, pReq->streamId);
|
||||||
|
@ -607,20 +477,7 @@ SStreamTask* tNewSStreamTask(int64_t streamId) {
|
||||||
pTask->streamId = streamId;
|
pTask->streamId = streamId;
|
||||||
pTask->status = TASK_STATUS__IDLE;
|
pTask->status = TASK_STATUS__IDLE;
|
||||||
|
|
||||||
pTask->inputQ = taosOpenQueue();
|
|
||||||
pTask->outputQ = taosOpenQueue();
|
|
||||||
pTask->inputQAll = taosAllocateQall();
|
|
||||||
pTask->outputQAll = taosAllocateQall();
|
|
||||||
if (pTask->inputQ == NULL || pTask->outputQ == NULL || pTask->inputQAll == NULL || pTask->outputQAll == NULL)
|
|
||||||
goto FAIL;
|
|
||||||
return pTask;
|
return pTask;
|
||||||
FAIL:
|
|
||||||
if (pTask->inputQ) taosCloseQueue(pTask->inputQ);
|
|
||||||
if (pTask->outputQ) taosCloseQueue(pTask->outputQ);
|
|
||||||
if (pTask->inputQAll) taosFreeQall(pTask->inputQAll);
|
|
||||||
if (pTask->outputQAll) taosFreeQall(pTask->outputQAll);
|
|
||||||
if (pTask) taosMemoryFree(pTask);
|
|
||||||
return NULL;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
|
int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
|
||||||
|
@ -722,11 +579,7 @@ void tFreeSStreamTask(SStreamTask* pTask) {
|
||||||
taosCloseQueue(pTask->outputQ);
|
taosCloseQueue(pTask->outputQ);
|
||||||
// TODO
|
// TODO
|
||||||
if (pTask->exec.qmsg) taosMemoryFree(pTask->exec.qmsg);
|
if (pTask->exec.qmsg) taosMemoryFree(pTask->exec.qmsg);
|
||||||
for (int32_t i = 0; i < pTask->exec.numOfRunners; i++) {
|
qDestroyTask(pTask->exec.executor);
|
||||||
qDestroyTask(pTask->exec.runners[i].executor);
|
|
||||||
}
|
|
||||||
taosMemoryFree(pTask->exec.runners);
|
|
||||||
/*taosMemoryFree(pTask->executor);*/
|
|
||||||
taosMemoryFree(pTask);
|
taosMemoryFree(pTask);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue