diff --git a/example/CMakeLists.txt b/example/CMakeLists.txt index 51e1e996dc..acb99caffc 100644 --- a/example/CMakeLists.txt +++ b/example/CMakeLists.txt @@ -1,12 +1,30 @@ -aux_source_directory(src TMQ_DEMO_SRC) +add_executable(tmq "") +add_executable(tstream "") -add_executable(tmq ${TMQ_DEMO_SRC}) -target_link_libraries( - tmq taos +target_sources(tmq + PRIVATE + "src/tmq.c" ) -target_include_directories( - tmq + +target_sources(tstream + PRIVATE + "src/tstream.c" +) +target_link_libraries(tmq + taos +) + +target_link_libraries(tstream + taos +) + +target_include_directories(tmq + PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" +) + +target_include_directories(tstream PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" ) SET_TARGET_PROPERTIES(tmq PROPERTIES OUTPUT_NAME tmq) +SET_TARGET_PROPERTIES(tstream PROPERTIES OUTPUT_NAME tstream) diff --git a/example/src/tstream.c b/example/src/tstream.c new file mode 100644 index 0000000000..62d94b041d --- /dev/null +++ b/example/src/tstream.c @@ -0,0 +1,105 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include +#include +#include +#include +#include "taos.h" + +int32_t init_env() { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + if (pConn == NULL) { + return -1; + } + + TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 1"); + if (taos_errno(pRes) != 0) { + printf("error in create db, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "use abc1"); + if (taos_errno(pRes) != 0) { + printf("error in use db, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "create stable if not exists st1 (ts timestamp, k int) tags(a int)"); + if (taos_errno(pRes) != 0) { + printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "create table if not exists tu1 using st1 tags(1)"); + if (taos_errno(pRes) != 0) { + printf("failed to create child table tu1, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "create table if not exists tu2 using st1 tags(2)"); + if (taos_errno(pRes) != 0) { + printf("failed to create child table tu2, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + return 0; +} + +int32_t create_stream() { + printf("create topic\n"); + TAOS_RES* pRes; + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + if (pConn == NULL) { + return -1; + } + + pRes = taos_query(pConn, "use abc1"); + if (taos_errno(pRes) != 0) { + printf("error in use db, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + const char* sql = "select ts,k from tu1"; + pRes = tmq_create_stream(pConn, "stream1", "out1", sql); + if (taos_errno(pRes) != 0) { + printf("failed to create stream out1, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + taos_close(pConn); + return 0; +} + +int main(int argc, char* argv[]) { + int code; + if (argc > 1) { + printf("env init\n"); + code = init_env(); + } + create_stream(); +#if 0 + tmq_t* tmq = build_consumer(); + tmq_list_t* topic_list = build_topic_list(); + /*perf_loop(tmq, topic_list);*/ + /*basic_consume_loop(tmq, topic_list);*/ + sync_consume_loop(tmq, topic_list); +#endif +} diff --git a/include/client/taos.h b/include/client/taos.h index da24136d80..82f0635612 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -214,7 +214,6 @@ typedef void(tmq_commit_cb(tmq_t *, tmq_resp_err_t, tmq_topic_vgroup_list_t *, v DLL_EXPORT tmq_list_t *tmq_list_new(); DLL_EXPORT int32_t tmq_list_append(tmq_list_t *, const char *); -DLL_EXPORT TAOS_RES *tmq_create_topic(TAOS *taos, const char *name, const char *sql, int sqlLen); DLL_EXPORT tmq_t *tmq_consumer_new(void *conn, tmq_conf_t *conf, char *errstr, int32_t errstrLen); DLL_EXPORT void tmq_message_destroy(tmq_message_t *tmq_message); DLL_EXPORT const char *tmq_err2str(tmq_resp_err_t); @@ -258,7 +257,12 @@ int32_t tmqGetSkipLogNum(tmq_message_t *tmq_message); DLL_EXPORT TAOS_ROW tmq_get_row(tmq_message_t *message); DLL_EXPORT char *tmq_get_topic_name(tmq_message_t *message); -/* ---------------------- OTHER ---------------------------- */ +/* --------------------TMPORARY INTERFACE FOR TESTING--------------------- */ +DLL_EXPORT TAOS_RES *tmq_create_topic(TAOS *taos, const char *name, const char *sql, int sqlLen); + +DLL_EXPORT TAOS_RES *tmq_create_stream(TAOS *taos, const char *streamName, const char *tbName, const char *sql); + +/* -------------------------------- OTHER -------------------------------- */ typedef void (*TAOS_SUBSCRIBE_CALLBACK)(TAOS_SUB *tsub, TAOS_RES *res, void *param, int code); DLL_EXPORT int taos_stmt_affected_rows(TAOS_STMT *stmt); diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 297f0c34e6..c91efc3ce2 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -91,6 +91,9 @@ void* blockDataDestroy(SSDataBlock* pBlock); int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock); void* tDecodeDataBlock(const void* buf, SSDataBlock* pBlock); +int32_t tEncodeDataBlocks(void** buf, const SArray* blocks); +void* tDecodeDataBlocks(const void* buf, SArray* blocks); + static FORCE_INLINE void blockDestroyInner(SSDataBlock* pBlock) { // WARNING: do not use info.numOfCols, // sometimes info.numOfCols != array size diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 0eeff31d1c..560490569a 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2294,20 +2294,22 @@ enum { typedef struct { void* inputHandle; - void* executor[4]; -} SStreamTaskParRunner; + void* executor; +} SStreamRunner; typedef struct { int64_t streamId; int32_t taskId; int32_t level; int8_t status; - int8_t pipeEnd; - int8_t parallel; + int8_t pipeSource; + int8_t pipeSink; + int8_t numOfRunners; + int8_t parallelizable; SEpSet NextOpEp; char* qmsg; // not applied to encoder and decoder - SStreamTaskParRunner runner; + SStreamRunner runner[8]; // void* executor; // void* stateStore; // storage handle @@ -2339,7 +2341,7 @@ typedef struct { typedef struct { SStreamExecMsgHead head; - // TODO: other info needed by task + SArray* data; // SArray } SStreamTaskExecReq; typedef struct { diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 6f925dd9d3..b9d94c2254 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -35,15 +35,15 @@ typedef struct SReadHandle { } SReadHandle; #define STREAM_DATA_TYPE_SUBMIT_BLOCK 0x1 -#define STREAM_DATA_TYPE_SSDAT_BLOCK 0x2 +#define STREAM_DATA_TYPE_SSDATA_BLOCK 0x2 - /** - * Create the exec task for streaming mode - * @param pMsg - * @param streamReadHandle - * @return - */ -qTaskInfo_t qCreateStreamExecTaskInfo(void *msg, void* streamReadHandle); +/** + * Create the exec task for streaming mode + * @param pMsg + * @param streamReadHandle + * @return + */ +qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle); /** * Set the input data block for the stream scan. @@ -64,16 +64,17 @@ int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type); */ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, SArray* tableIdList, bool isAdd); - /** - * Create the exec task object according to task json - * @param readHandle - * @param vgId - * @param pTaskInfoMsg - * @param pTaskInfo - * @param qId - * @return - */ -int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, struct SSubplan* pPlan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle); +/** + * Create the exec task object according to task json + * @param readHandle + * @param vgId + * @param pTaskInfoMsg + * @param pTaskInfo + * @param qId + * @return + */ +int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, struct SSubplan* pPlan, + qTaskInfo_t* pTaskInfo, DataSinkHandle* handle); /** * The main task execution function, including query on both table and multiple tables, @@ -83,7 +84,7 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, * @param handle * @return */ -int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds); +int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds); /** * Retrieve the produced results information, if current query is not paused or completed, @@ -146,7 +147,8 @@ int32_t qGetQualifiedTableIdList(void* pTableList, const char* tagCond, int32_t * @param numOfIndex * @return */ -//int32_t qCreateTableGroupByGroupExpr(SArray* pTableIdList, TSKEY skey, STableGroupInfo groupInfo, SColIndex* groupByIndex, int32_t numOfIndex); +// int32_t qCreateTableGroupByGroupExpr(SArray* pTableIdList, TSKEY skey, STableGroupInfo groupInfo, SColIndex* +// groupByIndex, int32_t numOfIndex); /** * Update the table id list of a given query. @@ -169,19 +171,19 @@ void* qOpenTaskMgmt(int32_t vgId); * broadcast the close information and wait for all query stop. * @param pExecutor */ -void qTaskMgmtNotifyClosing(void* pExecutor); +void qTaskMgmtNotifyClosing(void* pExecutor); /** * Re-open the query handle management module when opening the vnode again. * @param pExecutor */ -void qQueryMgmtReOpen(void *pExecutor); +void qQueryMgmtReOpen(void* pExecutor); /** * Close query mgmt and clean up resources. * @param pExecutor */ -void qCleanupTaskMgmt(void* pExecutor); +void qCleanupTaskMgmt(void* pExecutor); /** * Add the query into the query mgmt object @@ -190,7 +192,7 @@ void qCleanupTaskMgmt(void* pExecutor); * @param qInfo * @return */ -void** qRegisterTask(void* pMgmt, uint64_t qId, void *qInfo); +void** qRegisterTask(void* pMgmt, uint64_t qId, void* qInfo); /** * acquire the query handle according to the key from query mgmt object. diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index b0cb4e1116..d05ad06a66 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -471,8 +471,8 @@ TAOS_RES* tmq_create_stream(TAOS* taos, const char* streamName, const char* tbNa } sqlLen = strlen(sql); - if (strlen(streamName) >= TSDB_TABLE_NAME_LEN) { - tscError("stream name too long, max length:%d", TSDB_TABLE_NAME_LEN - 1); + if (strlen(tbName) >= TSDB_TABLE_NAME_LEN) { + tscError("output tb name too long, max length:%d", TSDB_TABLE_NAME_LEN - 1); terrno = TSDB_CODE_TSC_INVALID_INPUT; goto _return; } diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index b5b9e320e3..aadb74ba86 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1300,3 +1300,26 @@ void* tDecodeDataBlock(const void* buf, SSDataBlock* pBlock) { } return (void*)buf; } + +int32_t tEncodeDataBlocks(void** buf, const SArray* blocks) { + int32_t tlen = 0; + int32_t sz = taosArrayGetSize(blocks); + tlen += taosEncodeFixedI32(buf, sz); + + for (int32_t i = 0; i < sz; i++) { + SSDataBlock* pBlock = taosArrayGet(blocks, i); + tlen += tEncodeDataBlock(buf, pBlock); + } + + return tlen; +} + +void* tDecodeDataBlocks(const void* buf, SArray* blocks) { + int32_t sz; + buf = taosDecodeFixedI32(buf, &sz); + for (int32_t i = 0; i < sz; i++) { + SSDataBlock pBlock = {0}; + buf = tDecodeDataBlock(buf, &pBlock); + } + return (void*)buf; +} diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index cea07316eb..47872b89d5 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -2797,8 +2797,8 @@ int32_t tEncodeSStreamTask(SCoder *pEncoder, const SStreamTask *pTask) { if (tEncodeI32(pEncoder, pTask->taskId) < 0) return -1; if (tEncodeI32(pEncoder, pTask->level) < 0) return -1; if (tEncodeI8(pEncoder, pTask->status) < 0) return -1; - if (tEncodeI8(pEncoder, pTask->pipeEnd) < 0) return -1; - if (tEncodeI8(pEncoder, pTask->parallel) < 0) return -1; + if (tEncodeI8(pEncoder, pTask->pipeSink) < 0) return -1; + // if (tEncodeI8(pEncoder, pTask->numOfRunners) < 0) return -1; if (tEncodeSEpSet(pEncoder, &pTask->NextOpEp) < 0) return -1; if (tEncodeCStr(pEncoder, pTask->qmsg) < 0) return -1; tEndEncode(pEncoder); @@ -2811,8 +2811,8 @@ int32_t tDecodeSStreamTask(SCoder *pDecoder, SStreamTask *pTask) { if (tDecodeI32(pDecoder, &pTask->taskId) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->level) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->status) < 0) return -1; - if (tDecodeI8(pDecoder, &pTask->pipeEnd) < 0) return -1; - if (tDecodeI8(pDecoder, &pTask->parallel) < 0) return -1; + if (tDecodeI8(pDecoder, &pTask->pipeSink) < 0) return -1; + // if (tDecodeI8(pDecoder, &pTask->numOfRunners) < 0) return -1; if (tDecodeSEpSet(pDecoder, &pTask->NextOpEp) < 0) return -1; if (tDecodeCStrAlloc(pDecoder, &pTask->qmsg) < 0) return -1; tEndDecode(pDecoder); diff --git a/source/dnode/mgmt/mnode/src/mmMsg.c b/source/dnode/mgmt/mnode/src/mmMsg.c index 8445e73787..56a580ed93 100644 --- a/source/dnode/mgmt/mnode/src/mmMsg.c +++ b/source/dnode/mgmt/mnode/src/mmMsg.c @@ -149,5 +149,4 @@ void mmInitMsgHandles(SMgmtWrapper *pWrapper) { dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_STB_RSP, (NodeMsgFp)mmProcessWriteMsg, 0); dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_STB_RSP, (NodeMsgFp)mmProcessWriteMsg, 0); dndSetMsgHandle(pWrapper, TDMT_VND_DROP_STB_RSP, (NodeMsgFp)mmProcessWriteMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY, (NodeMsgFp)mmProcessWriteMsg, 0); } diff --git a/source/dnode/mgmt/vnode/src/vmMsg.c b/source/dnode/mgmt/vnode/src/vmMsg.c index e4a4cfcd9f..d4af82b382 100644 --- a/source/dnode/mgmt/vnode/src/vmMsg.c +++ b/source/dnode/mgmt/vnode/src/vmMsg.c @@ -274,6 +274,7 @@ void vmInitMsgHandles(SMgmtWrapper *pWrapper) { dndSetMsgHandle(pWrapper, TDMT_VND_MQ_REB, (NodeMsgFp)vmProcessWriteMsg, 0); dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, (NodeMsgFp)vmProcessFetchMsg, 0); dndSetMsgHandle(pWrapper, TDMT_VND_CONSUME, (NodeMsgFp)vmProcessFetchMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY, (NodeMsgFp)vmProcessWriteMsg, 0); dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, (NodeMsgFp)vmProcessFetchMsg, 0); dndSetMsgHandle(pWrapper, TDMT_VND_TASK_EXEC, (NodeMsgFp)vmProcessFetchMsg, 0); diff --git a/source/dnode/mnode/impl/inc/mndSnode.h b/source/dnode/mnode/impl/inc/mndSnode.h index 8d64879605..180f18a6dd 100644 --- a/source/dnode/mnode/impl/inc/mndSnode.h +++ b/source/dnode/mnode/impl/inc/mndSnode.h @@ -24,6 +24,7 @@ extern "C" { int32_t mndInitSnode(SMnode *pMnode); void mndCleanupSnode(SMnode *pMnode); +SEpSet mndAcquireEpFromSnode(SMnode *pMnode, const SSnodeObj *pSnode); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index c28c0d76c4..b95574ea41 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -20,6 +20,7 @@ #include "mndMnode.h" #include "mndOffset.h" #include "mndShow.h" +#include "mndSnode.h" #include "mndStb.h" #include "mndStream.h" #include "mndSubscribe.h" @@ -31,7 +32,7 @@ #include "tname.h" #include "tuuid.h" -int32_t mndPersistTaskDeployReq(STrans* pTrans, SStreamTask* pTask, const SEpSet* pEpSet) { +int32_t mndPersistTaskDeployReq(STrans* pTrans, SStreamTask* pTask, const SEpSet* pEpSet, tmsg_t type) { SCoder encoder; tCoderInit(&encoder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER); tEncodeSStreamTask(&encoder, pTask); @@ -52,7 +53,7 @@ int32_t mndPersistTaskDeployReq(STrans* pTrans, SStreamTask* pTask, const SEpSet memcpy(&action.epSet, pEpSet, sizeof(SEpSet)); action.pCont = buf; action.contLen = tlen; - action.msgType = TDMT_SND_TASK_DEPLOY; + action.msgType = type; if (mndTransAppendRedoAction(pTrans, &action) != 0) { rpcFreeCont(buf); return -1; @@ -69,12 +70,27 @@ int32_t mndAssignTaskToVg(SMnode* pMnode, STrans* pTrans, SStreamTask* pTask, SS terrno = TSDB_CODE_QRY_INVALID_INPUT; return -1; } - mndPersistTaskDeployReq(pTrans, pTask, &plan->execNode.epSet); + mndPersistTaskDeployReq(pTrans, pTask, &plan->execNode.epSet, TDMT_VND_TASK_DEPLOY); return 0; } +SSnodeObj* mndSchedFetchSnode(SMnode* pMnode) { + SSnodeObj* pObj = NULL; + pObj = sdbFetch(pMnode->pSdb, SDB_SNODE, NULL, (void**)&pObj); + return pObj; +} + int32_t mndAssignTaskToSnode(SMnode* pMnode, STrans* pTrans, SStreamTask* pTask, SSubplan* plan, const SSnodeObj* pSnode) { + int32_t msgLen; + plan->execNode.nodeId = pSnode->id; + plan->execNode.epSet = mndAcquireEpFromSnode(pMnode, pSnode); + + if (qSubPlanToString(plan, &pTask->qmsg, &msgLen) < 0) { + terrno = TSDB_CODE_QRY_INVALID_INPUT; + return -1; + } + mndPersistTaskDeployReq(pTrans, pTask, &plan->execNode.epSet, TDMT_SND_TASK_DEPLOY); return 0; } @@ -113,8 +129,8 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { // send to vnode SStreamTask* pTask = streamTaskNew(pStream->uid, level); + pTask->pipeSink = level == totLevel - 1 ? 1 : 0; // TODO: set to - pTask->parallel = 4; if (mndAssignTaskToVg(pMnode, pTrans, pTask, plan, pVgroup) < 0) { sdbRelease(pSdb, pVgroup); qDestroyQueryPlan(pPlan); @@ -122,34 +138,20 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { } taosArrayPush(taskOneLevel, pTask); } - - } else if (plan->subplanType == SUBPLAN_TYPE_SCAN) { - // duplicatable - - int32_t parallel = 0; - // if no snode, parallel set to fetch thread num in vnode - - // if has snode, set to shared thread num in snode - parallel = SND_SHARED_THREAD_NUM; - - SStreamTask* pTask = streamTaskNew(pStream->uid, level); - pTask->parallel = parallel; - // TODO:get snode id and ep - if (mndAssignTaskToVg(pMnode, pTrans, pTask, plan, pVgroup) < 0) { - sdbRelease(pSdb, pVgroup); - qDestroyQueryPlan(pPlan); - return -1; - } - taosArrayPush(taskOneLevel, pTask); } else { - // not duplicatable SStreamTask* pTask = streamTaskNew(pStream->uid, level); - - // TODO: get snode - if (mndAssignTaskToVg(pMnode, pTrans, pTask, plan, pVgroup) < 0) { - sdbRelease(pSdb, pVgroup); - qDestroyQueryPlan(pPlan); - return -1; + pTask->pipeSink = level == totLevel - 1 ? 1 : 0; + SSnodeObj* pSnode = mndSchedFetchSnode(pMnode); + if (pSnode != NULL) { + if (mndAssignTaskToSnode(pMnode, pTrans, pTask, plan, pSnode) < 0) { + sdbRelease(pSdb, pSnode); + qDestroyQueryPlan(pPlan); + return -1; + } + sdbRelease(pMnode->pSdb, pSnode); + } else { + // TODO: assign to one vg + ASSERT(0); } taosArrayPush(taskOneLevel, pTask); } diff --git a/source/dnode/mnode/impl/src/mndSnode.c b/source/dnode/mnode/impl/src/mndSnode.c index dda02fecf2..f01b143fbc 100644 --- a/source/dnode/mnode/impl/src/mndSnode.c +++ b/source/dnode/mnode/impl/src/mndSnode.c @@ -60,6 +60,15 @@ int32_t mndInitSnode(SMnode *pMnode) { void mndCleanupSnode(SMnode *pMnode) {} +SEpSet mndAcquireEpFromSnode(SMnode *pMnode, const SSnodeObj *pSnode) { + SEpSet epSet; + memcpy(epSet.eps->fqdn, pSnode->pDnode->fqdn, 128); + epSet.eps->port = pSnode->pDnode->port; + epSet.numOfEps = 1; + epSet.inUse = 0; + return epSet; +} + static SSnodeObj *mndAcquireSnode(SMnode *pMnode, int32_t snodeId) { SSnodeObj *pObj = sdbAcquire(pMnode->pSdb, SDB_SNODE, &snodeId); if (pObj == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) { diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 147f44b260..afa5930821 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -57,8 +57,8 @@ void sndMetaDelete(SStreamMeta *pMeta) { } int32_t sndMetaDeployTask(SStreamMeta *pMeta, SStreamTask *pTask) { - for (int i = 0; i < pTask->parallel; i++) { - pTask->runner.executor[i] = qCreateStreamExecTaskInfo(pTask->qmsg, NULL); + for (int i = 0; i < pTask->numOfRunners; i++) { + pTask->runner[i].executor = qCreateStreamExecTaskInfo(pTask->qmsg, NULL); } return taosHashPut(pMeta->pHash, &pTask->taskId, sizeof(int32_t), pTask, sizeof(void *)); } diff --git a/source/dnode/vnode/inc/tq.h b/source/dnode/vnode/inc/tq.h deleted file mode 100644 index 6391eaffea..0000000000 --- a/source/dnode/vnode/inc/tq.h +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef _TQ_H_ -#define _TQ_H_ - -#include "executor.h" -#include "meta.h" -#include "taoserror.h" -#include "tcommon.h" -#include "tmallocator.h" -#include "tmsg.h" -#include "trpc.h" -#include "ttimer.h" -#include "tutil.h" -#include "vnode.h" -#include "wal.h" - -#ifdef __cplusplus -extern "C" { -#endif - -typedef struct STQ STQ; - -// memory allocator provided by vnode -typedef struct { - SMemAllocatorFactory* pAllocatorFactory; - SMemAllocator* pAllocator; -} STqMemRef; - -// init once -int tqInit(); -void tqCleanUp(); - -// open in each vnode -STQ* tqOpen(const char* path, SWal* pWal, SMeta* pMeta, STqCfg* tqConfig, SMemAllocatorFactory* allocFac); -void tqClose(STQ*); - -// required by vnode -int tqPushMsg(STQ*, void* msg, tmsg_t msgType, int64_t version); -int tqCommit(STQ*); - -int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg); -int32_t tqProcessSetConnReq(STQ* pTq, char* msg); -int32_t tqProcessRebReq(STQ* pTq, char* msg); -int32_t tqProcessTaskExec(STQ* pTq, SRpcMsg* msg); - -int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen); - -#ifdef __cplusplus -} -#endif - -#endif /*_TQ_H_*/ diff --git a/source/dnode/vnode/src/inc/tqInt.h b/source/dnode/vnode/src/inc/tqInt.h index 9b21cf92ce..4d4bb12a21 100644 --- a/source/dnode/vnode/src/inc/tqInt.h +++ b/source/dnode/vnode/src/inc/tqInt.h @@ -18,8 +18,8 @@ #include "meta.h" #include "tlog.h" -#include "tq.h" #include "tqPush.h" +#include "vnd.h" #ifdef __cplusplus extern "C" { @@ -153,6 +153,11 @@ typedef struct { FTqDelete pDeleter; } STqMetaStore; +typedef struct { + SMemAllocatorFactory* pAllocatorFactory; + SMemAllocator* pAllocator; +} STqMemRef; + struct STQ { // the collection of groups // the handle of meta kvstore diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index c911f95e4a..ed9aad9277 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -23,7 +23,6 @@ #include "tlist.h" #include "tlockfree.h" #include "tmacro.h" -#include "tq.h" #include "wal.h" #include "vnode.h" @@ -34,6 +33,8 @@ extern "C" { #endif +typedef struct STQ STQ; + typedef struct SVState SVState; typedef struct SVBufPool SVBufPool; @@ -171,6 +172,25 @@ void* vmaMalloc(SVMemAllocator* pVMA, uint64_t size); void vmaFree(SVMemAllocator* pVMA, void* ptr); bool vmaIsFull(SVMemAllocator* pVMA); +// init once +int tqInit(); +void tqCleanUp(); + +// open in each vnode +STQ* tqOpen(const char* path, SWal* pWal, SMeta* pMeta, STqCfg* tqConfig, SMemAllocatorFactory* allocFac); +void tqClose(STQ*); + +// required by vnode +int tqPushMsg(STQ*, void* msg, tmsg_t msgType, int64_t version); +int tqCommit(STQ*); + +int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg); +int32_t tqProcessSetConnReq(STQ* pTq, char* msg); +int32_t tqProcessRebReq(STQ* pTq, char* msg); +int32_t tqProcessTaskExec(STQ* pTq, SRpcMsg* msg); + +int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen); + #ifdef __cplusplus } #endif diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 87c670a4e9..3af79ca461 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -70,6 +70,46 @@ void tqClose(STQ* pTq) { int tqPushMsg(STQ* pTq, void* msg, tmsg_t msgType, int64_t version) { if (msgType != TDMT_VND_SUBMIT) return 0; + + void* pIter = NULL; + + while (1) { + pIter = taosHashIterate(pTq->pStreamTasks, pIter); + if (pIter == NULL) break; + SStreamTask* pTask = (SStreamTask*)pIter; + if (!pTask->pipeSource) continue; + + int32_t workerId = 0; + void* exec = pTask->runner[workerId].executor; + qSetStreamInput(exec, msg, STREAM_DATA_TYPE_SUBMIT_BLOCK); + SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); + while (1) { + SSDataBlock* output; + uint64_t ts; + if (qExecTask(exec, &output, &ts) < 0) { + ASSERT(false); + } + if (output == NULL) { + break; + } + taosArrayPush(pRes, output); + } + if (pTask->pipeSink) { + // write back + } else { + int32_t tlen = sizeof(SStreamExecMsgHead) + tEncodeDataBlocks(NULL, pRes); + void* buf = rpcMallocCont(tlen); + if (buf == NULL) { + return -1; + } + void* abuf = POINTER_SHIFT(buf, sizeof(SStreamExecMsgHead)); + tEncodeDataBlocks(abuf, pRes); + // serialize + // to next level + } + } + +#if 0 void* pIter = taosHashIterate(pTq->tqPushMgr->pHash, NULL); while (pIter != NULL) { STqPusher* pusher = *(STqPusher**)pIter; @@ -97,6 +137,7 @@ int tqPushMsg(STQ* pTq, void* msg, tmsg_t msgType, int64_t version) { // if handle waiting, launch query and response to consumer // // if no waiting handle, return +#endif return 0; } @@ -420,6 +461,21 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) { return 0; } +int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int32_t parallel) { + ASSERT(parallel <= 8); + pTask->numOfRunners = parallel; + for (int32_t i = 0; i < parallel; i++) { + STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pVnodeMeta); + SReadHandle handle = { + .reader = pReadHandle, + .meta = pTq->pVnodeMeta, + }; + pTask->runner[i].inputHandle = pReadHandle; + pTask->runner[i].executor = qCreateStreamExecTaskInfo(pTask->qmsg, &handle); + } + return 0; +} + int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) { SStreamTask* pTask = malloc(sizeof(SStreamTask)); if (pTask == NULL) { @@ -430,12 +486,118 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) { tDecodeSStreamTask(&decoder, pTask); tCoderClear(&decoder); + tqExpandTask(pTq, pTask, 8); taosHashPut(pTq->pStreamTasks, &pTask->taskId, sizeof(int32_t), pTask, sizeof(SStreamTask)); return 0; } +static char* formatTimestamp(char* buf, int64_t val, int precision) { + time_t tt; + int32_t ms = 0; + if (precision == TSDB_TIME_PRECISION_NANO) { + tt = (time_t)(val / 1000000000); + ms = val % 1000000000; + } else if (precision == TSDB_TIME_PRECISION_MICRO) { + tt = (time_t)(val / 1000000); + ms = val % 1000000; + } else { + tt = (time_t)(val / 1000); + ms = val % 1000; + } + + /* comment out as it make testcases like select_with_tags.sim fail. + but in windows, this may cause the call to localtime crash if tt < 0, + need to find a better solution. + if (tt < 0) { + tt = 0; + } + */ + +#ifdef WINDOWS + if (tt < 0) tt = 0; +#endif + if (tt <= 0 && ms < 0) { + tt--; + if (precision == TSDB_TIME_PRECISION_NANO) { + ms += 1000000000; + } else if (precision == TSDB_TIME_PRECISION_MICRO) { + ms += 1000000; + } else { + ms += 1000; + } + } + + struct tm* ptm = localtime(&tt); + size_t pos = strftime(buf, 35, "%Y-%m-%d %H:%M:%S", ptm); + + if (precision == TSDB_TIME_PRECISION_NANO) { + sprintf(buf + pos, ".%09d", ms); + } else if (precision == TSDB_TIME_PRECISION_MICRO) { + sprintf(buf + pos, ".%06d", ms); + } else { + sprintf(buf + pos, ".%03d", ms); + } + + return buf; +} +void tqDebugShowSSData(SArray* dataBlocks) { + char pBuf[128]; + int32_t sz = taosArrayGetSize(dataBlocks); + for (int32_t i = 0; i < sz; i++) { + SSDataBlock* pDataBlock = taosArrayGet(dataBlocks, i); + int32_t colNum = pDataBlock->info.numOfCols; + int32_t rows = pDataBlock->info.rows; + for (int32_t j = 0; j < rows; j++) { + printf("|"); + for (int32_t k = 0; k < colNum; k++) { + SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k); + void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes); + switch (pColInfoData->info.type) { + case TSDB_DATA_TYPE_TIMESTAMP: + formatTimestamp(pBuf, *(uint64_t*)var, TSDB_TIME_PRECISION_MILLI); + printf(" %25s |", pBuf); + break; + case TSDB_DATA_TYPE_INT: + case TSDB_DATA_TYPE_UINT: + printf(" %15u |", *(uint32_t*)var); + break; + } + } + printf("\n"); + } + } +} + int32_t tqProcessTaskExec(STQ* pTq, SRpcMsg* msg) { - // + SStreamTaskExecReq* pReq = msg->pCont; + + int32_t taskId = pReq->head.streamTaskId; + int32_t workerType = pReq->head.workerType; + + SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); + // assume worker id is 1 + int32_t workerId = 1; + void* exec = pTask->runner[workerId].executor; + int32_t sz = taosArrayGetSize(pReq->data); + printf("input data:\n"); + tqDebugShowSSData(pReq->data); + SArray* pRes = taosArrayInit(0, sizeof(void*)); + for (int32_t i = 0; i < sz; i++) { + SSDataBlock* input = taosArrayGet(pReq->data, i); + SSDataBlock* output; + uint64_t ts; + qSetStreamInput(exec, input, STREAM_DATA_TYPE_SSDATA_BLOCK); + if (qExecTask(exec, &output, &ts) < 0) { + ASSERT(0); + } + if (output == NULL) { + break; + } + taosArrayPush(pRes, &output); + } + printf("output data:\n"); + tqDebugShowSSData(pRes); + return 0; } diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index 7ab435bdae..2f5f94b55f 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -14,6 +14,7 @@ */ #include "vnodeQuery.h" +#include "executor.h" #include "vnd.h" static int32_t vnodeGetTableList(SVnode *pVnode, SRpcMsg *pMsg); diff --git a/source/dnode/vnode/src/vnd/vnodeWrite.c b/source/dnode/vnode/src/vnd/vnodeWrite.c index c3f53b6b91..ade9adb1e1 100644 --- a/source/dnode/vnode/src/vnd/vnodeWrite.c +++ b/source/dnode/vnode/src/vnd/vnodeWrite.c @@ -13,12 +13,11 @@ * along with this program. If not, see . */ -#include "tq.h" #include "vnd.h" void vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) { SNodeMsg *pMsg; - SRpcMsg *pRpc; + SRpcMsg *pRpc; for (int i = 0; i < taosArrayGetSize(pMsgs); i++) { pMsg = *(SNodeMsg **)taosArrayGet(pMsgs, i); diff --git a/source/dnode/vnode/test/tqSerializerTest.cpp b/source/dnode/vnode/test/tqSerializerTest.cpp deleted file mode 100644 index 0d76322c17..0000000000 --- a/source/dnode/vnode/test/tqSerializerTest.cpp +++ /dev/null @@ -1,13 +0,0 @@ -#include -#include -#include -#include - -#include "tq.h" - -using namespace std; - -TEST(TqSerializerTest, basicTest) { - TqGroupHandle* gHandle = (TqGroupHandle*)malloc(sizeof(TqGroupHandle)); - -} diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index e6cdbcf10f..19100d7560 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -16,7 +16,7 @@ #include "executor.h" #include "executorimpl.h" #include "planner.h" -#include "tq.h" +#include "vnode.h" static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, int32_t type, char* id) { ASSERT(pOperator != NULL); @@ -52,9 +52,8 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, int32_t t SSDataBlock* pDataBlock = input; pInfo->pRes->info = pDataBlock->info; - for(int32_t i = 0; i < pInfo->pRes->info.numOfCols; ++i) { - pInfo->pRes->pDataBlock = pDataBlock->pDataBlock; - } + taosArrayClear(pInfo->pRes->pDataBlock); + taosArrayAddAll(pInfo->pRes->pDataBlock, pDataBlock->pDataBlock); // set current block valid. pInfo->blockValid = true; @@ -121,7 +120,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, SArray* tableIdList, bool isA // traverse to the streamscan node to add this table id SOperatorInfo* pInfo = pTaskInfo->pRoot; - while(pInfo->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { + while (pInfo->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { pInfo = pInfo->pDownstream[0]; } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 6b2398f599..407df1e90b 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4721,12 +4721,13 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo *pOperator, bool* newgroup) SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SStreamBlockScanInfo* pInfo = pOperator->info; + pTaskInfo->code = pOperator->_openFn(pOperator); if (pTaskInfo->code != TSDB_CODE_SUCCESS) { return NULL; } - if (pInfo->blockType == STREAM_DATA_TYPE_SSDAT_BLOCK) { + if (pInfo->blockType == STREAM_DATA_TYPE_SSDATA_BLOCK) { if (pInfo->blockValid) { pInfo->blockValid = false; // this block can only be used once. return pInfo->pRes;