diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 97410311c2..afa0fa2a6e 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -67,7 +67,7 @@ enum { enum { // WARN: new msg should be appended to segment tail #endif - TD_NEW_MSG_SEG(TDMT_DND_MSG) + TD_NEW_MSG_SEG(TDMT_DND_MSG) // 0<<8 TD_DEF_MSG_TYPE(TDMT_DND_CREATE_MNODE, "dnode-create-mnode", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_DND_DROP_MNODE, "dnode-drop-mnode", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_DND_CREATE_QNODE, "dnode-create-qnode", NULL, NULL) @@ -87,7 +87,7 @@ enum { // WARN: new msg should be appended to segment tail TD_DEF_MSG_TYPE(TDMT_DND_ALTER_VNODE_TYPE, "dnode-alter-vnode-type", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP, "dnode-check-vnode-learner-catchup", NULL, NULL) - TD_NEW_MSG_SEG(TDMT_MND_MSG) + TD_NEW_MSG_SEG(TDMT_MND_MSG) // 1<<8 TD_DEF_MSG_TYPE(TDMT_MND_CONNECT, "connect", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_CREATE_ACCT, "create-acct", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_ALTER_ACCT, "alter-acct", NULL, NULL) @@ -195,7 +195,7 @@ enum { // WARN: new msg should be appended to segment tail TD_DEF_MSG_TYPE(TDMT_MND_VIEW_META, "view-meta", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_MAX_MSG, "mnd-max", NULL, NULL) - TD_NEW_MSG_SEG(TDMT_VND_MSG) + TD_NEW_MSG_SEG(TDMT_VND_MSG) // 2<<8 TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT, "submit", SSubmitReq, SSubmitRsp) TD_DEF_MSG_TYPE(TDMT_VND_CREATE_TABLE, "create-table", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_ALTER_TABLE, "alter-table", NULL, NULL) @@ -244,7 +244,7 @@ enum { // WARN: new msg should be appended to segment tail TD_DEF_MSG_TYPE(TDMT_VND_DISABLE_WRITE, "vnode-disable-write", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_MAX_MSG, "vnd-max", NULL, NULL) - TD_NEW_MSG_SEG(TDMT_SCH_MSG) + TD_NEW_MSG_SEG(TDMT_SCH_MSG) // 3<<8 TD_DEF_MSG_TYPE(TDMT_SCH_QUERY, "query", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SCH_MERGE_QUERY, "merge-query", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SCH_QUERY_CONTINUE, "query-continue", NULL, NULL) @@ -259,13 +259,13 @@ enum { // WARN: new msg should be appended to segment tail TD_DEF_MSG_TYPE(TDMT_SCH_MAX_MSG, "sch-max", NULL, NULL) - TD_NEW_MSG_SEG(TDMT_STREAM_MSG) - TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_DEPLOY, "stream-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp) + TD_NEW_MSG_SEG(TDMT_STREAM_MSG) //4 << 8 + TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_DEPLOY, "stream-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp) //1025 1026 TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_DROP, "stream-task-drop", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_RUN, "stream-task-run", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_DISPATCH, "stream-task-dispatch", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_UNUSED1, "stream-unused1", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_STREAM_RETRIEVE, "stream-retrieve", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_STREAM_RETRIEVE, "stream-retrieve", NULL, NULL) //1035 1036 TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_CHECKPOINT_READY, "stream-checkpoint-ready", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_REPORT_CHECKPOINT, "stream-report-checkpoint", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_RESTORE_CHECKPOINT, "stream-restore-checkpoint", NULL, NULL) @@ -275,10 +275,10 @@ enum { // WARN: new msg should be appended to segment tail TD_DEF_MSG_TYPE(TDMT_STREAM_HTASK_DROP, "stream-htask-drop", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_MAX_MSG, "stream-max", NULL, NULL) - TD_NEW_MSG_SEG(TDMT_MON_MSG) + TD_NEW_MSG_SEG(TDMT_MON_MSG) //5 << 8 TD_DEF_MSG_TYPE(TDMT_MON_MAX_MSG, "monitor-max", NULL, NULL) - TD_NEW_MSG_SEG(TDMT_SYNC_MSG) + TD_NEW_MSG_SEG(TDMT_SYNC_MSG) //6 << 8 TD_DEF_MSG_TYPE(TDMT_SYNC_TIMEOUT, "sync-timer", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_TIMEOUT_ELECTION, "sync-elect", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_PING_REPLY, "sync-ping-reply", NULL, NULL) // no longer used @@ -309,7 +309,7 @@ enum { // WARN: new msg should be appended to segment tail TD_DEF_MSG_TYPE(TDMT_SYNC_MAX_MSG, "sync-max", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_FORCE_FOLLOWER, "sync-force-become-follower", NULL, NULL) - TD_NEW_MSG_SEG(TDMT_VND_STREAM_MSG) + TD_NEW_MSG_SEG(TDMT_VND_STREAM_MSG) //7 << 8 TD_DEF_MSG_TYPE(TDMT_VND_STREAM_SCAN_HISTORY, "vnode-stream-scan-history", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_STREAM_SCAN_HISTORY_FINISH, "vnode-stream-scan-history-finish", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_STREAM_CHECK_POINT_SOURCE, "vnode-stream-checkpoint-source", NULL, NULL) @@ -318,7 +318,7 @@ enum { // WARN: new msg should be appended to segment tail TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TASK_CHECK, "vnode-stream-task-check", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_STREAM_MAX_MSG, "vnd-stream-max", NULL, NULL) - TD_NEW_MSG_SEG(TDMT_VND_TMQ_MSG) + TD_NEW_MSG_SEG(TDMT_VND_TMQ_MSG) //8 << 8 TD_DEF_MSG_TYPE(TDMT_VND_TMQ_SUBSCRIBE, "vnode-tmq-subscribe", SMqRebVgReq, SMqRebVgRsp) TD_DEF_MSG_TYPE(TDMT_VND_TMQ_DELETE_SUB, "vnode-tmq-delete-sub", SMqVDeleteReq, SMqVDeleteRsp) TD_DEF_MSG_TYPE(TDMT_VND_TMQ_COMMIT_OFFSET, "vnode-tmq-commit-offset", STqOffset, STqOffset) diff --git a/include/dnode/vnode/stream.h b/include/dnode/vnode/stream.h deleted file mode 100644 index 6d86847542..0000000000 --- a/include/dnode/vnode/stream.h +++ /dev/null @@ -1,18 +0,0 @@ -// -// Created by mingming wanng on 2023/11/15. -// - -#ifndef TDENGINE_STREAM_H -#define TDENGINE_STREAM_H - -#define STREAM_EXEC_EXTRACT_DATA_IN_WAL_ID (-1) -#define STREAM_EXEC_START_ALL_TASKS_ID (-2) -#define STREAM_EXEC_RESTART_ALL_TASKS_ID (-3) - -typedef struct STaskUpdateEntry { - int64_t streamId; - int32_t taskId; - int32_t transId; -} STaskUpdateEntry; - -#endif // TDENGINE_STREAM_H diff --git a/include/dnode/vnode/tqCommon.h b/include/dnode/vnode/tqCommon.h new file mode 100644 index 0000000000..75dafcdbff --- /dev/null +++ b/include/dnode/vnode/tqCommon.h @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_TQ_COMMON_H +#define TDENGINE_TQ_COMMON_H + +// message process +int32_t tqStreamTaskStartAsync(SStreamMeta* pMeta, SMsgCb* cb, bool restart); +int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pMsg, bool restored); +int32_t tqStreamTaskProcessDispatchReq(SStreamMeta* pMeta, SRpcMsg* pMsg); +int32_t tqStreamTaskProcessDispatchRsp(SStreamMeta* pMeta, SRpcMsg* pMsg); +int32_t tqStreamTaskProcessRetrieveReq(SStreamMeta* pMeta, SRpcMsg* pMsg); +int32_t tqStreamTaskProcessScanHistoryFinishReq(SStreamMeta* pMeta, SRpcMsg* pMsg); +int32_t tqStreamTaskProcessScanHistoryFinishRsp(SStreamMeta* pMeta, SRpcMsg* pMsg); +int32_t tqStreamTaskProcessCheckReq(SStreamMeta* pMeta, SRpcMsg* pMsg); +int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader); +int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg); +int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, int64_t sversion, char* msg, int32_t msgLen, bool isLeader, bool restored); +int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen); +int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader); +int32_t startStreamTasks(SStreamMeta* pMeta); +int32_t resetStreamTaskStatus(SStreamMeta* pMeta); + +#endif // TDENGINE_TQ_COMMON_H diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index f91223b863..9c112e1c4c 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -50,6 +50,10 @@ extern "C" { (_t)->hTaskInfo.id.streamId = 0; \ } while (0) +#define STREAM_EXEC_EXTRACT_DATA_IN_WAL_ID (-1) +#define STREAM_EXEC_START_ALL_TASKS_ID (-2) +#define STREAM_EXEC_RESTART_ALL_TASKS_ID (-3) + typedef struct SStreamTask SStreamTask; typedef struct SStreamQueue SStreamQueue; typedef struct SStreamTaskSM SStreamTaskSM; diff --git a/include/os/osFile.h b/include/os/osFile.h index e9d685ed69..503535a454 100644 --- a/include/os/osFile.h +++ b/include/os/osFile.h @@ -65,6 +65,7 @@ typedef struct TdFile *TdFilePtr; #define TD_FILE_STREAM 0x0100 // Only support taosFprintfFile, taosGetLineFile, taosEOFFile #define TD_FILE_WRITE_THROUGH 0x0200 #define TD_FILE_CLOEXEC 0x0400 + TdFilePtr taosOpenFile(const char *path, int32_t tdFileOptions); TdFilePtr taosCreateFile(const char *path, int32_t tdFileOptions); diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 2091bcf64f..3a91ae82d5 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -876,6 +876,7 @@ int taos_get_current_db(TAOS *taos, char *database, int len, int *required) { code = 0; } taosThreadMutexUnlock(&pTscObj->mutex); + releaseTscObj(*(int64_t *)taos); return code; } diff --git a/source/common/src/systable.c b/source/common/src/systable.c index 89995fc326..1623d9f062 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -140,7 +140,7 @@ static const SSysDbTableSchema userStbsSchema[] = { {.name = "columns", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, {.name = "tags", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, {.name = "last_update", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false}, - {.name = "table_comment", .bytes = TSDB_TB_COMMENT_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "table_comment", .bytes = TSDB_TB_COMMENT_LEN - 1 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "watermark", .bytes = 64 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "max_delay", .bytes = 64 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "rollup", .bytes = 128 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, diff --git a/source/dnode/mgmt/mgmt_snode/src/smWorker.c b/source/dnode/mgmt/mgmt_snode/src/smWorker.c index e8402eb7c0..9220d3395d 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smWorker.c +++ b/source/dnode/mgmt/mgmt_snode/src/smWorker.c @@ -193,10 +193,6 @@ int32_t smPutNodeMsgToStreamQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg) { SSingleWorker *pWorker = &pMgmt->streamWorker; dTrace("msg:%p, put into worker %s", pMsg, pWorker->name); - if (pMsg->msgType == TDMT_STREAM_TASK_DISPATCH) { - sndEnqueueStreamDispatch(pMgmt->pSnode, pMsg); - } else { - taosWriteQitem(pWorker->queue, pMsg); - } + taosWriteQitem(pWorker->queue, pMsg); return 0; } diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index d80bc62c47..9a792a2774 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -206,11 +206,7 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp break; case STREAM_QUEUE: dGTrace("vgId:%d, msg:%p put into vnode-stream queue", pVnode->vgId, pMsg); - if (pMsg->msgType == TDMT_STREAM_TASK_DISPATCH) { - vnodeEnqueueStreamMsg(pVnode->pImpl, pMsg); - } else { - taosWriteQitem(pVnode->pStreamQ, pMsg); - } + taosWriteQitem(pVnode->pStreamQ, pMsg); break; case FETCH_QUEUE: dGTrace("vgId:%d, msg:%p put into vnode-fetch queue", pVnode->vgId, pMsg); diff --git a/source/dnode/snode/CMakeLists.txt b/source/dnode/snode/CMakeLists.txt index 2da1f9adac..4cd8e26e78 100644 --- a/source/dnode/snode/CMakeLists.txt +++ b/source/dnode/snode/CMakeLists.txt @@ -17,4 +17,5 @@ target_link_libraries( PRIVATE stream PRIVATE wal PRIVATE index + PRIVATE tqCommon ) diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 7d1443215d..380be1dd38 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -16,8 +16,7 @@ #include "executor.h" #include "rsync.h" #include "sndInt.h" -#include "stream.h" -#include "tstream.h" +#include "tqCommon.h" #include "tuuid.h" #define sndError(...) \ @@ -41,45 +40,6 @@ } \ } while (0) -void sndEnqueueStreamDispatch(SSnode *pSnode, SRpcMsg *pMsg) { - char * msgStr = pMsg->pCont; - char * msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead)); - int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); - int32_t code = 0; - - SStreamDispatchReq req; - SDecoder decoder; - tDecoderInit(&decoder, (uint8_t *)msgBody, msgLen); - if (tDecodeStreamDispatchReq(&decoder, &req) < 0) { - code = TSDB_CODE_MSG_DECODE_ERROR; - tDecoderClear(&decoder); - goto FAIL; - } - - tDecoderClear(&decoder); - - SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, req.streamId, req.taskId); - if (pTask) { - SRpcMsg rsp = {.info = pMsg->info, .code = 0}; - streamProcessDispatchMsg(pTask, &req, &rsp); - tDeleteStreamDispatchReq(&req); - streamMetaReleaseTask(pSnode->pMeta, pTask); - rpcFreeCont(pMsg->pCont); - taosFreeQitem(pMsg); - return; - } else { - tDeleteStreamDispatchReq(&req); - return; - } - -FAIL: - if (pMsg->info.handle == NULL) return; - SRpcMsg rsp = {.code = code, .info = pMsg->info}; - tmsgSendRsp(&rsp); - rpcFreeCont(pMsg->pCont); - taosFreeQitem(pMsg); -} - int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProcessVer) { ASSERT(pTask->info.taskLevel == TASK_LEVEL__AGG && taosArrayGetSize(pTask->upstreamInfo.pList) != 0); int32_t code = streamTaskInit(pTask, pSnode->pMeta, &pSnode->msgCb, nextProcessVer); @@ -129,161 +89,31 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProcessVer // checkpoint ver is the kept version, handled data should be the next version. if (pTask->chkInfo.checkpointId != 0) { pTask->chkInfo.nextProcessVer = pTask->chkInfo.checkpointVer + 1; - sndInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " nextProcessVer:%" PRId64, - pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer); - } else { - if (pTask->chkInfo.nextProcessVer == -1) { - pTask->chkInfo.nextProcessVer = 0; - } + sndInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " nextProcessVer:%" PRId64, pTask->id.idStr, + pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer); } char *p = NULL; streamTaskGetStatus(pTask, &p); - sndInfo("snode:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 - " nextProcessVer:%" PRId64 " child id:%d, level:%d, status:%s fill-history:%d, trigger:%" PRId64 " ms", - SNODE_HANDLE, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer, - pTask->info.selfChildId, pTask->info.taskLevel, p, pTask->info.fillHistory, pTask->info.triggerParam); - + if (pTask->info.fillHistory) { + sndInfo("vgId:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 + " nextProcessVer:%" PRId64 + " child id:%d, level:%d, status:%s fill-history:%d, related stream task:0x%x trigger:%" PRId64 " ms", + SNODE_HANDLE, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer, + pTask->info.selfChildId, pTask->info.taskLevel, p, pTask->info.fillHistory, + (int32_t)pTask->streamTaskId.taskId, pTask->info.triggerParam); + } else { + sndInfo("vgId:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 + " nextProcessVer:%" PRId64 + " child id:%d, level:%d, status:%s fill-history:%d, related fill-task:0x%x trigger:%" PRId64 " ms", + SNODE_HANDLE, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer, + pTask->info.selfChildId, pTask->info.taskLevel, p, pTask->info.fillHistory, + (int32_t)pTask->hTaskInfo.id.taskId, pTask->info.triggerParam); + } return 0; } -int32_t sndStartStreamTasks(SSnode *pSnode) { - int32_t code = TSDB_CODE_SUCCESS; - int32_t vgId = SNODE_HANDLE; - SStreamMeta *pMeta = pSnode->pMeta; - - int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); - sndDebug("vgId:%d start to check all %d stream task(s) downstream status", vgId, numOfTasks); - if (numOfTasks == 0) { - return TSDB_CODE_SUCCESS; - } - - SArray *pTaskList = NULL; - streamMetaWLock(pMeta); - pTaskList = taosArrayDup(pMeta->pTaskList, NULL); - taosHashClear(pMeta->startInfo.pReadyTaskSet); - taosHashClear(pMeta->startInfo.pFailedTaskSet); - pMeta->startInfo.startTs = taosGetTimestampMs(); - streamMetaWUnLock(pMeta); - - // broadcast the check downstream tasks msg - for (int32_t i = 0; i < numOfTasks; ++i) { - SStreamTaskId *pTaskId = taosArrayGet(pTaskList, i); - SStreamTask * pTask = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId); - if (pTask == NULL) { - continue; - } - - // fill-history task can only be launched by related stream tasks. - if (pTask->info.fillHistory == 1) { - streamMetaReleaseTask(pMeta, pTask); - continue; - } - - if (pTask->status.downstreamReady == 1) { - if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { - sndDebug("s-task:%s downstream ready, no need to check downstream, check only related fill-history task", - pTask->id.idStr); - streamLaunchFillHistoryTask(pTask); - } - - streamMetaUpdateTaskDownstreamStatus(pTask, pTask->execInfo.init, pTask->execInfo.start, true); - streamMetaReleaseTask(pMeta, pTask); - continue; - } - - EStreamTaskEvent event = (HAS_RELATED_FILLHISTORY_TASK(pTask)) ? TASK_EVENT_INIT_STREAM_SCANHIST : TASK_EVENT_INIT; - int32_t ret = streamTaskHandleEvent(pTask->status.pSM, event); - if (ret != TSDB_CODE_SUCCESS) { - code = ret; - } - - streamMetaReleaseTask(pMeta, pTask); - } - - taosArrayDestroy(pTaskList); - return code; -} - -int32_t sndResetStreamTaskStatus(SSnode *pSnode) { - SStreamMeta *pMeta = pSnode->pMeta; - int32_t vgId = pMeta->vgId; - int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); - - sndDebug("vgId:%d reset all %d stream task(s) status to be uninit", vgId, numOfTasks); - if (numOfTasks == 0) { - return TSDB_CODE_SUCCESS; - } - - for (int32_t i = 0; i < numOfTasks; ++i) { - SStreamTaskId *pTaskId = taosArrayGet(pMeta->pTaskList, i); - - STaskId id = {.streamId = pTaskId->streamId, .taskId = pTaskId->taskId}; - SStreamTask **pTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); - streamTaskResetStatus(*pTask); - } - - return 0; -} - -int32_t sndRestartStreamTasks(SSnode *pSnode) { - SStreamMeta *pMeta = pSnode->pMeta; - int32_t vgId = pMeta->vgId; - int32_t code = 0; - int64_t st = taosGetTimestampMs(); - - while (1) { - int32_t startVal = atomic_val_compare_exchange_32(&pMeta->startInfo.taskStarting, 0, 1); - if (startVal == 0) { - break; - } - - sndDebug("vgId:%d in start stream tasks procedure, wait for 500ms and recheck", vgId); - taosMsleep(500); - } - - terrno = 0; - sndInfo("vgId:%d tasks are all updated and stopped, restart all tasks, triggered by transId:%d", vgId, - pMeta->updateInfo.transId); - - while (streamMetaTaskInTimer(pMeta)) { - sndDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId); - taosMsleep(100); - } - - streamMetaWLock(pMeta); - - code = streamMetaReopen(pMeta); - if (code != TSDB_CODE_SUCCESS) { - sndError("vgId:%d failed to reopen stream meta", vgId); - streamMetaWUnLock(pMeta); - code = terrno; - return code; - } - - streamMetaInitBackend(pMeta); - int64_t el = taosGetTimestampMs() - st; - - sndInfo("vgId:%d close&reload state elapsed time:%.3fs", vgId, el / 1000.); - - code = streamMetaLoadAllTasks(pMeta); - if (code != TSDB_CODE_SUCCESS) { - sndError("vgId:%d failed to load stream tasks, code:%s", vgId, tstrerror(terrno)); - streamMetaWUnLock(pMeta); - code = terrno; - return code; - } - sndInfo("vgId:%d restart all stream tasks after all tasks being updated", vgId); - sndResetStreamTaskStatus(pSnode); - - streamMetaWUnLock(pMeta); - sndStartStreamTasks(pSnode); - - code = terrno; - return code; -} - SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) { SSnode *pSnode = taosMemoryCalloc(1, sizeof(SSnode)); if (pSnode == NULL) { @@ -318,9 +148,9 @@ FAIL: return NULL; } -int32_t sndInit(SSnode *pSnode) { - sndResetStreamTaskStatus(pSnode); - sndStartStreamTasks(pSnode); +int32_t sndInit(SSnode * pSnode) { + resetStreamTaskStatus(pSnode->pMeta); + startStreamTasks(pSnode->pMeta); return 0; } @@ -333,582 +163,30 @@ void sndClose(SSnode *pSnode) { taosMemoryFree(pSnode); } -int32_t sndGetLoad(SSnode *pSnode, SSnodeLoad *pLoad) { return 0; } - -int32_t sndStartStreamTaskAsync(SSnode *pSnode, bool restart) { - SStreamMeta *pMeta = pSnode->pMeta; - int32_t vgId = pMeta->vgId; - - int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); - if (numOfTasks == 0) { - sndDebug("vgId:%d no stream tasks existed to run", vgId); - return 0; - } - - SStreamTaskRunReq *pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq)); - if (pRunReq == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - sndError("vgId:%d failed to create msg to start wal scanning to launch stream tasks, code:%s", vgId, terrstr()); - return -1; - } - - sndDebug("vgId:%d start all %d stream task(s) async", vgId, numOfTasks); - pRunReq->head.vgId = vgId; - pRunReq->streamId = 0; - pRunReq->taskId = restart ? STREAM_EXEC_RESTART_ALL_TASKS_ID : STREAM_EXEC_START_ALL_TASKS_ID; - - SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)}; - tmsgPutToQueue(&pSnode->msgCb, STREAM_QUEUE, &msg); - return 0; -} - -int32_t sndProcessTaskDeployReq(SSnode *pSnode, char *msg, int32_t msgLen) { - int32_t code; - - // 1.deserialize msg and build task - SStreamTask *pTask = taosMemoryCalloc(1, sizeof(SStreamTask)); - if (pTask == NULL) { - return -1; - } - - SDecoder decoder; - tDecoderInit(&decoder, (uint8_t *)msg, msgLen); - code = tDecodeStreamTask(&decoder, pTask); - if (code < 0) { - tDecoderClear(&decoder); - taosMemoryFree(pTask); - return -1; - } - - tDecoderClear(&decoder); - - ASSERT(pTask->info.taskLevel == TASK_LEVEL__AGG); - - // 2.save task - streamMetaWLock(pSnode->pMeta); - - bool added = false; - code = streamMetaRegisterTask(pSnode->pMeta, -1, pTask, &added); - if (code < 0) { - streamMetaWUnLock(pSnode->pMeta); - return -1; - } - - int32_t numOfTasks = streamMetaGetNumOfTasks(pSnode->pMeta); - streamMetaWUnLock(pSnode->pMeta); - - char *p = NULL; - streamTaskGetStatus(pTask, &p); - - sndDebug("snode:%d s-task:%s is deployed on snode and add into meta, status:%s, numOfTasks:%d", SNODE_HANDLE, - pTask->id.idStr, p, numOfTasks); - - EStreamTaskEvent event = (HAS_RELATED_FILLHISTORY_TASK(pTask)) ? TASK_EVENT_INIT_STREAM_SCANHIST : TASK_EVENT_INIT; - streamTaskHandleEvent(pTask->status.pSM, event); - return 0; -} - -int32_t sndProcessTaskDropReq(SSnode *pSnode, char *msg, int32_t msgLen) { - SVDropStreamTaskReq *pReq = (SVDropStreamTaskReq *)msg; - sndDebug("snode:%d receive msg to drop stream task:0x%x", pSnode->pMeta->vgId, pReq->taskId); - streamMetaUnregisterTask(pSnode->pMeta, pReq->streamId, pReq->taskId); - - // commit the update - streamMetaWLock(pSnode->pMeta); - int32_t numOfTasks = streamMetaGetNumOfTasks(pSnode->pMeta); - sndDebug("vgId:%d task:0x%x dropped, remain tasks:%d", pSnode->pMeta->vgId, pReq->taskId, numOfTasks); - - if (streamMetaCommit(pSnode->pMeta) < 0) { - // persist to disk - } - streamMetaWUnLock(pSnode->pMeta); - return 0; -} - -int32_t sndProcessTaskRunReq(SSnode *pSnode, SRpcMsg *pMsg) { - SStreamTaskRunReq *pReq = pMsg->pCont; - - int32_t taskId = pReq->taskId; - - if (taskId == STREAM_EXEC_START_ALL_TASKS_ID) { - sndStartStreamTasks(pSnode); - return 0; - } else if (taskId == STREAM_EXEC_RESTART_ALL_TASKS_ID) { - sndRestartStreamTasks(pSnode); - return 0; - } - - SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, pReq->streamId, pReq->taskId); - if (pTask) { - streamExecTask(pTask); - streamMetaReleaseTask(pSnode->pMeta, pTask); - return 0; - } else { - return -1; - } -} - -int32_t sndProcessTaskDispatchReq(SSnode *pSnode, SRpcMsg *pMsg, bool exec) { - char * msgStr = pMsg->pCont; - char * msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead)); - int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); - SStreamDispatchReq req; - SDecoder decoder; - tDecoderInit(&decoder, (uint8_t *)msgBody, msgLen); - tDecodeStreamDispatchReq(&decoder, &req); - tDecoderClear(&decoder); - - SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, req.streamId, req.taskId); - if (pTask) { - SRpcMsg rsp = {.info = pMsg->info, .code = 0}; - streamProcessDispatchMsg(pTask, &req, &rsp); - tDeleteStreamDispatchReq(&req); - streamMetaReleaseTask(pSnode->pMeta, pTask); - return 0; - } else { - tDeleteStreamDispatchReq(&req); - return -1; - } -} - -int32_t sndProcessTaskRetrieveReq(SSnode *pSnode, SRpcMsg *pMsg) { - char * msgStr = pMsg->pCont; - char * msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead)); - int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); - SStreamRetrieveReq req; - SDecoder decoder; - tDecoderInit(&decoder, (uint8_t *)msgBody, msgLen); - tDecodeStreamRetrieveReq(&decoder, &req); - tDecoderClear(&decoder); - SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, req.streamId, req.dstTaskId); - - if (pTask) { - SRpcMsg rsp = {.info = pMsg->info, .code = 0}; - streamProcessRetrieveReq(pTask, &req, &rsp); - streamMetaReleaseTask(pSnode->pMeta, pTask); - tDeleteStreamRetrieveReq(&req); - return 0; - } else { - return -1; - } -} - -int32_t sndProcessTaskDispatchRsp(SSnode *pSnode, SRpcMsg *pMsg) { - SStreamDispatchRsp *pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); - - pRsp->upstreamTaskId = htonl(pRsp->upstreamTaskId); - pRsp->streamId = htobe64(pRsp->streamId); - pRsp->downstreamTaskId = htonl(pRsp->downstreamTaskId); - pRsp->downstreamNodeId = htonl(pRsp->downstreamNodeId); - pRsp->stage = htobe64(pRsp->stage); - pRsp->msgId = htonl(pRsp->msgId); - - SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, pRsp->streamId, pRsp->upstreamTaskId); - if (pTask) { - streamProcessDispatchRsp(pTask, pRsp, pMsg->code); - streamMetaReleaseTask(pSnode->pMeta, pTask); - return 0; - } else { - return -1; - } -} - -int32_t sndProcessTaskRetrieveRsp(SSnode *pSnode, SRpcMsg *pMsg) { - // - return 0; -} - -int32_t sndProcessTaskScanHistoryFinishReq(SSnode *pSnode, SRpcMsg *pMsg) { - char * msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); - int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); - - // deserialize - SStreamScanHistoryFinishReq req; - - SDecoder decoder; - tDecoderInit(&decoder, (uint8_t *)msg, msgLen); - tDecodeStreamScanHistoryFinishReq(&decoder, &req); - tDecoderClear(&decoder); - - // find task - SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, req.streamId, req.downstreamTaskId); - if (pTask == NULL) { - return -1; - } - // do process request - if (streamProcessScanHistoryFinishReq(pTask, &req, &pMsg->info) < 0) { - streamMetaReleaseTask(pSnode->pMeta, pTask); - return -1; - } - - streamMetaReleaseTask(pSnode->pMeta, pTask); - return 0; -} - -int32_t sndProcessTaskScanHistoryFinishRsp(SSnode *pSnode, SRpcMsg *pMsg) { - char * msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); - int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); - - // deserialize - SStreamCompleteHistoryMsg req = {0}; - - SDecoder decoder; - tDecoderInit(&decoder, (uint8_t *)msg, msgLen); - tDecodeCompleteHistoryDataMsg(&decoder, &req); - tDecoderClear(&decoder); - - SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, req.streamId, req.upstreamTaskId); - if (pTask == NULL) { - sndError("vgId:%d process scan history finish rsp, failed to find task:0x%x, it may be destroyed", - pSnode->pMeta->vgId, req.upstreamTaskId); - return -1; - } - - int32_t remain = atomic_sub_fetch_32(&pTask->notReadyTasks, 1); - if (remain > 0) { - sndDebug("s-task:%s scan-history finish rsp received from downstream task:0x%x, unfinished remain:%d", - pTask->id.idStr, req.downstreamId, remain); - } else { - sndDebug( - "s-task:%s scan-history finish rsp received from downstream task:0x%x, all downstream tasks rsp scan-history " - "completed msg", - pTask->id.idStr, req.downstreamId); - streamProcessScanHistoryFinishRsp(pTask); - } - - streamMetaReleaseTask(pSnode->pMeta, pTask); - return 0; -} - -// downstream task has complete the stream task checkpoint procedure, let's start the handle the rsp by execute task -int32_t sndProcessTaskCheckpointReadyMsg(SSnode *pSnode, SRpcMsg *pMsg) { - SStreamMeta *pMeta = pSnode->pMeta; - char * msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); - int32_t len = pMsg->contLen - sizeof(SMsgHead); - int32_t code = 0; - - SStreamCheckpointReadyMsg req = {0}; - - SDecoder decoder; - tDecoderInit(&decoder, (uint8_t *)msg, len); - if (tDecodeStreamCheckpointReadyMsg(&decoder, &req) < 0) { - code = TSDB_CODE_MSG_DECODE_ERROR; - tDecoderClear(&decoder); - return code; - } - tDecoderClear(&decoder); - - SStreamTask *pTask = streamMetaAcquireTask(pMeta, req.streamId, req.upstreamTaskId); - if (pTask == NULL) { - sndError("vgId:%d failed to find s-task:0x%x, it may have been destroyed already", pMeta->vgId, - req.downstreamTaskId); - return code; - } - - sndDebug("snode vgId:%d s-task:%s received the checkpoint ready msg from task:0x%x (vgId:%d), handle it", pMeta->vgId, - pTask->id.idStr, req.downstreamTaskId, req.downstreamNodeId); - - streamProcessCheckpointReadyMsg(pTask); - streamMetaReleaseTask(pMeta, pTask); - return code; -} - -int32_t sndProcessStreamTaskCheckReq(SSnode *pSnode, SRpcMsg *pMsg) { - char * msgStr = pMsg->pCont; - char * msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead)); - int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); - - SStreamTaskCheckReq req; - SDecoder decoder; - - tDecoderInit(&decoder, (uint8_t *)msgBody, msgLen); - tDecodeStreamTaskCheckReq(&decoder, &req); - tDecoderClear(&decoder); - - int32_t taskId = req.downstreamTaskId; - - SStreamTaskCheckRsp rsp = { - .reqId = req.reqId, - .streamId = req.streamId, - .childId = req.childId, - .downstreamNodeId = req.downstreamNodeId, - .downstreamTaskId = req.downstreamTaskId, - .upstreamNodeId = req.upstreamNodeId, - .upstreamTaskId = req.upstreamTaskId, - }; - - SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, req.streamId, taskId); - - if (pTask != NULL) { - rsp.status = streamTaskCheckStatus(pTask, req.upstreamTaskId, req.upstreamNodeId, req.stage, &rsp.oldStage); - streamMetaReleaseTask(pSnode->pMeta, pTask); - char *p = NULL; - streamTaskGetStatus(pTask, &p); - sndDebug("s-task:%s status:%s, recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), ready:%d", - pTask->id.idStr, p, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); - } else { - rsp.status = TASK_DOWNSTREAM_NOT_READY; - sndDebug("recv task check(taskId:0x%x not built yet) req(reqId:0x%" PRIx64 - ") from task:0x%x (vgId:%d), rsp status %d", - taskId, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); - } - - SEncoder encoder; - int32_t code; - int32_t len; - - tEncodeSize(tEncodeStreamTaskCheckRsp, &rsp, len, code); - if (code < 0) { - sndError("vgId:%d failed to encode task check rsp, task:0x%x", pSnode->pMeta->vgId, taskId); - return -1; - } - - void *buf = rpcMallocCont(sizeof(SMsgHead) + len); - ((SMsgHead *)buf)->vgId = htonl(req.upstreamNodeId); - - void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); - tEncoderInit(&encoder, (uint8_t *)abuf, len); - tEncodeStreamTaskCheckRsp(&encoder, &rsp); - tEncoderClear(&encoder); - - SRpcMsg rspMsg = {.code = 0, .pCont = buf, .contLen = sizeof(SMsgHead) + len, .info = pMsg->info}; - - tmsgSendRsp(&rspMsg); - return 0; -} - -int32_t sndProcessStreamTaskCheckRsp(SSnode *pSnode, SRpcMsg *pMsg) { - char * pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); - int32_t len = pMsg->contLen - sizeof(SMsgHead); - - int32_t code; - SStreamTaskCheckRsp rsp; - - SDecoder decoder; - tDecoderInit(&decoder, (uint8_t *)pReq, len); - code = tDecodeStreamTaskCheckRsp(&decoder, &rsp); - - if (code < 0) { - tDecoderClear(&decoder); - return -1; - } - - tDecoderClear(&decoder); - sndDebug("tq task:0x%x (vgId:%d) recv check rsp(reqId:0x%" PRIx64 ") from 0x%x (vgId:%d) status %d", - rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.status); - - SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, rsp.streamId, rsp.upstreamTaskId); - if (pTask == NULL) { - sndError("tq failed to locate the stream task:0x%x (vgId:%d), it may have been destroyed", rsp.upstreamTaskId, - pSnode->pMeta->vgId); - return -1; - } - - code = streamProcessCheckRsp(pTask, &rsp); - streamMetaReleaseTask(pSnode->pMeta, pTask); - return code; -} - -int32_t sndProcessTaskUpdateReq(SSnode *pSnode, SRpcMsg *pMsg) { - SStreamMeta *pMeta = pSnode->pMeta; - int32_t vgId = SNODE_HANDLE; - char * msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); - int32_t len = pMsg->contLen - sizeof(SMsgHead); - SRpcMsg rsp = {.info = pMsg->info, .code = TSDB_CODE_SUCCESS}; - - SStreamTaskNodeUpdateMsg req = {0}; - - SDecoder decoder; - tDecoderInit(&decoder, (uint8_t *)msg, len); - if (tDecodeStreamTaskUpdateMsg(&decoder, &req) < 0) { - rsp.code = TSDB_CODE_MSG_DECODE_ERROR; - sndError("vgId:%d failed to decode task update msg, code:%s", vgId, tstrerror(rsp.code)); - tDecoderClear(&decoder); - return rsp.code; - } - - tDecoderClear(&decoder); - - // update the nodeEpset when it exists - streamMetaWLock(pMeta); - - // the task epset may be updated again and again, when replaying the WAL, the task may be in stop status. - STaskId id = {.streamId = req.streamId, .taskId = req.taskId}; - SStreamTask **ppTask = (SStreamTask **)taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); - if (ppTask == NULL || *ppTask == NULL) { - sndError("vgId:%d failed to acquire task:0x%x when handling update, it may have been dropped already", pMeta->vgId, - req.taskId); - rsp.code = TSDB_CODE_SUCCESS; - streamMetaWUnLock(pMeta); - - taosArrayDestroy(req.pNodeList); - return rsp.code; - } - - SStreamTask *pTask = *ppTask; - - if (pMeta->updateInfo.transId != req.transId) { - pMeta->updateInfo.transId = req.transId; - sndInfo("s-task:%s receive new trans to update nodeEp msg from mnode, transId:%d", pTask->id.idStr, req.transId); - // info needs to be kept till the new trans to update the nodeEp arrived. - taosHashClear(pMeta->updateInfo.pTasks); - } else { - sndDebug("s-task:%s recv trans to update nodeEp from mnode, transId:%d", pTask->id.idStr, req.transId); - } - - STaskUpdateEntry entry = {.streamId = req.streamId, .taskId = req.taskId, .transId = req.transId}; - void * exist = taosHashGet(pMeta->updateInfo.pTasks, &entry, sizeof(STaskUpdateEntry)); - if (exist != NULL) { - sndDebug("s-task:%s (vgId:%d) already update in trans:%d, discard the nodeEp update msg", pTask->id.idStr, vgId, - req.transId); - rsp.code = TSDB_CODE_SUCCESS; - streamMetaWUnLock(pMeta); - taosArrayDestroy(req.pNodeList); - return rsp.code; - } - - streamMetaWUnLock(pMeta); - - // the following two functions should not be executed within the scope of meta lock to avoid deadlock - streamTaskUpdateEpsetInfo(pTask, req.pNodeList); - streamTaskResetStatus(pTask); - - // continue after lock the meta again - streamMetaWLock(pMeta); - - SStreamTask **ppHTask = NULL; - if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { - ppHTask = (SStreamTask **)taosHashGet(pMeta->pTasksMap, &pTask->hTaskInfo.id, sizeof(pTask->hTaskInfo.id)); - if (ppHTask == NULL || *ppHTask == NULL) { - sndError( - "vgId:%d failed to acquire fill-history task:0x%x when handling update, it may have been dropped already", - pMeta->vgId, req.taskId); - CLEAR_RELATED_FILLHISTORY_TASK(pTask); - } else { - sndDebug("s-task:%s fill-history task update nodeEp along with stream task", (*ppHTask)->id.idStr); - streamTaskUpdateEpsetInfo(*ppHTask, req.pNodeList); - } - } - - { - streamMetaSaveTask(pMeta, pTask); - if (ppHTask != NULL) { - streamMetaSaveTask(pMeta, *ppHTask); - } - - if (streamMetaCommit(pMeta) < 0) { - // persist to disk - } - } - - streamTaskStop(pTask); - - // keep the already handled info - taosHashPut(pMeta->updateInfo.pTasks, &entry, sizeof(entry), NULL, 0); - - if (ppHTask != NULL) { - streamTaskStop(*ppHTask); - sndDebug("s-task:%s task nodeEp update completed, streamTask and related fill-history task closed", - pTask->id.idStr); - taosHashPut(pMeta->updateInfo.pTasks, &(*ppHTask)->id, sizeof(pTask->id), NULL, 0); - } else { - sndDebug("s-task:%s task nodeEp update completed, streamTask closed", pTask->id.idStr); - } - - rsp.code = 0; - - // possibly only handle the stream task. - int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); - int32_t updateTasks = taosHashGetSize(pMeta->updateInfo.pTasks); - - pMeta->startInfo.tasksWillRestart = 1; - - if (updateTasks < numOfTasks) { - sndDebug("vgId:%d closed tasks:%d, unclosed:%d, all tasks will be started when nodeEp update completed", vgId, - updateTasks, (numOfTasks - updateTasks)); - streamMetaWUnLock(pMeta); - } else { - sndDebug("vgId:%d all %d task(s) nodeEp updated and closed", vgId, numOfTasks); -#if 1 - sndStartStreamTaskAsync(pSnode, true); - streamMetaWUnLock(pMeta); -#else - streamMetaWUnLock(pMeta); - - // For debug purpose. - // the following procedure consume many CPU resource, result in the re-election of leader - // with high probability. So we employ it as a test case for the stream processing framework, with - // checkpoint/restart/nodeUpdate etc. - while (1) { - int32_t startVal = atomic_val_compare_exchange_32(&pMeta->startInfo.taskStarting, 0, 1); - if (startVal == 0) { - break; - } - - tqDebug("vgId:%d in start stream tasks procedure, wait for 500ms and recheck", vgId); - taosMsleep(500); - } - - while (streamMetaTaskInTimer(pMeta)) { - tqDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId); - taosMsleep(100); - } - - streamMetaWLock(pMeta); - - int32_t code = streamMetaReopen(pMeta); - if (code != 0) { - tqError("vgId:%d failed to reopen stream meta", vgId); - streamMetaWUnLock(pMeta); - taosArrayDestroy(req.pNodeList); - return -1; - } - - if (streamMetaLoadAllTasks(pTq->pStreamMeta) < 0) { - tqError("vgId:%d failed to load stream tasks", vgId); - streamMetaWUnLock(pMeta); - taosArrayDestroy(req.pNodeList); - return -1; - } - - if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) { - tqInfo("vgId:%d start all stream tasks after all being updated", vgId); - tqResetStreamTaskStatus(pTq); - tqStartStreamTaskAsync(pTq, false); - } else { - tqInfo("vgId:%d, follower node not start stream tasks", vgId); - } - streamMetaWUnLock(pMeta); -#endif - } - - taosArrayDestroy(req.pNodeList); - return rsp.code; -} - int32_t sndProcessStreamMsg(SSnode *pSnode, SRpcMsg *pMsg) { switch (pMsg->msgType) { case TDMT_STREAM_TASK_RUN: - return sndProcessTaskRunReq(pSnode, pMsg); + return tqStreamTaskProcessRunReq(pSnode->pMeta, pMsg, true); case TDMT_STREAM_TASK_DISPATCH: - return sndProcessTaskDispatchReq(pSnode, pMsg, true); + return tqStreamTaskProcessDispatchReq(pSnode->pMeta, pMsg); case TDMT_STREAM_TASK_DISPATCH_RSP: - return sndProcessTaskDispatchRsp(pSnode, pMsg); + return tqStreamTaskProcessDispatchRsp(pSnode->pMeta, pMsg); case TDMT_STREAM_RETRIEVE: - return sndProcessTaskRetrieveReq(pSnode, pMsg); - case TDMT_STREAM_RETRIEVE_RSP: - return sndProcessTaskRetrieveRsp(pSnode, pMsg); + return tqStreamTaskProcessRetrieveReq(pSnode->pMeta, pMsg); + case TDMT_STREAM_RETRIEVE_RSP: // 1036 + break; case TDMT_VND_STREAM_SCAN_HISTORY_FINISH: - return sndProcessTaskScanHistoryFinishReq(pSnode, pMsg); + return tqStreamTaskProcessScanHistoryFinishReq(pSnode->pMeta, pMsg); case TDMT_VND_STREAM_SCAN_HISTORY_FINISH_RSP: - return sndProcessTaskScanHistoryFinishRsp(pSnode, pMsg); + return tqStreamTaskProcessScanHistoryFinishRsp(pSnode->pMeta, pMsg); case TDMT_VND_STREAM_TASK_CHECK: - return sndProcessStreamTaskCheckReq(pSnode, pMsg); + return tqStreamTaskProcessCheckReq(pSnode->pMeta, pMsg); case TDMT_VND_STREAM_TASK_CHECK_RSP: - return sndProcessStreamTaskCheckRsp(pSnode, pMsg); + return tqStreamTaskProcessCheckRsp(pSnode->pMeta, pMsg, true); case TDMT_STREAM_TASK_CHECKPOINT_READY: - return sndProcessTaskCheckpointReadyMsg(pSnode, pMsg); + return tqStreamTaskProcessCheckpointReadyMsg(pSnode->pMeta, pMsg); default: + sndError("invalid snode msg:%d", pMsg->msgType); ASSERT(0); } return 0; @@ -919,14 +197,13 @@ int32_t sndProcessWriteMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg *pRsp) { case TDMT_STREAM_TASK_DEPLOY: { void * pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); int32_t len = pMsg->contLen - sizeof(SMsgHead); - return sndProcessTaskDeployReq(pSnode, pReq, len); + return tqStreamTaskProcessDeployReq(pSnode->pMeta, -1, pReq, len, true, true); } case TDMT_STREAM_TASK_DROP: - return sndProcessTaskDropReq(pSnode, pMsg->pCont, pMsg->contLen); + return tqStreamTaskProcessDropReq(pSnode->pMeta, pMsg->pCont, pMsg->contLen); case TDMT_VND_STREAM_TASK_UPDATE: - sndProcessTaskUpdateReq(pSnode, pMsg); - break; + return tqStreamTaskProcessUpdateReq(pSnode->pMeta, &pSnode->msgCb, pMsg, true); default: ASSERT(0); } diff --git a/source/dnode/vnode/CMakeLists.txt b/source/dnode/vnode/CMakeLists.txt index 635c15aa41..9aeb14cd60 100644 --- a/source/dnode/vnode/CMakeLists.txt +++ b/source/dnode/vnode/CMakeLists.txt @@ -1,4 +1,5 @@ # vnode +add_subdirectory(src/tqCommon) add_library(vnode STATIC "") set( VNODE_SOURCE_FILES @@ -117,6 +118,7 @@ if (${BUILD_CONTRIB}) PUBLIC "inc" PUBLIC "src/inc" PUBLIC "${TD_SOURCE_DIR}/include/libs/scalar" + PUBLIC "${TD_SOURCE_DIR}/include/dnode/vnode" PUBLIC "${TD_SOURCE_DIR}/contrib/rocksdb/include" ) else() @@ -125,6 +127,7 @@ else() PUBLIC "inc" PUBLIC "src/inc" PUBLIC "${TD_SOURCE_DIR}/include/libs/scalar" + PUBLIC "${TD_SOURCE_DIR}/include/dnode/vnode" ) if (${TD_LINUX}) target_include_directories( @@ -138,11 +141,6 @@ else() endif() endif() -target_include_directories( - vnode - PUBLIC "${TD_SOURCE_DIR}/include/dnode/vnode" -) - target_link_libraries( vnode PUBLIC os @@ -163,6 +161,7 @@ target_link_libraries( PUBLIC transport PUBLIC stream PUBLIC index + PUBLIC tqCommon ) IF (TD_GRANT) diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 6a0c991be4..9307a620ad 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -253,8 +253,6 @@ bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids); int32_t tqRetrieveDataBlock(STqReader *pReader, SSDataBlock **pRes, const char *idstr); int32_t tqRetrieveTaosxBlock(STqReader *pReader, SArray *blocks, SArray *schemas, SSubmitTbData **pSubmitTbDataRet); -int32_t vnodeEnqueueStreamMsg(SVnode *pVnode, SRpcMsg *pMsg); - // sma int32_t smaGetTSmaDays(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days); diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index b3f8317add..cf57623a43 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -153,7 +153,6 @@ char* tqOffsetBuildFName(const char* path, int32_t fVer); int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname); // tqStream -int32_t tqResetStreamTaskStatus(STQ* pTq); int32_t tqStopStreamTasks(STQ* pTq); // tq util diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 16379db053..473329bfee 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -234,7 +234,6 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskDropHTask(STQ* pTq, SRpcMsg* pMsg); -int32_t tqStartStreamTaskAsync(STQ* pTq, bool restart); int32_t tqRestartStreamTasks(STQ* pTq); int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver); int32_t tqScanWal(STQ* pTq); @@ -263,7 +262,7 @@ int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t version, char* msg, int32_t msg int32_t tqProcessTaskCheckReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskCheckRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg); -int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec); +int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 070ea1d5ad..f0f62938ce 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -14,8 +14,8 @@ */ #include "tq.h" -#include "stream.h" #include "vnd.h" +#include "tqCommon.h" typedef struct { int8_t inited; @@ -904,175 +904,15 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) { } int32_t tqProcessTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { - char* msgStr = pMsg->pCont; - char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead)); - int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); - SStreamMeta* pMeta = pTq->pStreamMeta; - - SStreamTaskCheckReq req; - SDecoder decoder; - - tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen); - tDecodeStreamTaskCheckReq(&decoder, &req); - tDecoderClear(&decoder); - - int32_t taskId = req.downstreamTaskId; - - SStreamTaskCheckRsp rsp = { - .reqId = req.reqId, - .streamId = req.streamId, - .childId = req.childId, - .downstreamNodeId = req.downstreamNodeId, - .downstreamTaskId = req.downstreamTaskId, - .upstreamNodeId = req.upstreamNodeId, - .upstreamTaskId = req.upstreamTaskId, - }; - - // only the leader node handle the check request - if (pMeta->role == NODE_ROLE_FOLLOWER) { - tqError( - "s-task:0x%x invalid check msg from upstream:0x%x(vgId:%d), vgId:%d is follower, not handle check status msg", - taskId, req.upstreamTaskId, req.upstreamNodeId, pMeta->vgId); - rsp.status = TASK_DOWNSTREAM_NOT_LEADER; - } else { - SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, taskId); - if (pTask != NULL) { - rsp.status = streamTaskCheckStatus(pTask, req.upstreamTaskId, req.upstreamNodeId, req.stage, &rsp.oldStage); - streamMetaReleaseTask(pMeta, pTask); - - char* p = NULL; - streamTaskGetStatus(pTask, &p); - tqDebug("s-task:%s status:%s, stage:%" PRId64 " recv task check req(reqId:0x%" PRIx64 - ") task:0x%x (vgId:%d), check_status:%d", - pTask->id.idStr, p, rsp.oldStage, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); - } else { - rsp.status = TASK_DOWNSTREAM_NOT_READY; - tqDebug("tq recv task check(taskId:0x%" PRIx64 "-0x%x not built yet) req(reqId:0x%" PRIx64 - ") from task:0x%x (vgId:%d), rsp check_status %d", - req.streamId, taskId, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); - } - } - - return streamSendCheckRsp(pMeta, &req, &rsp, &pMsg->info, taskId); + return tqStreamTaskProcessCheckReq(pTq->pStreamMeta, pMsg); } int32_t tqProcessTaskCheckRsp(STQ* pTq, SRpcMsg* pMsg) { - char* pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); - int32_t len = pMsg->contLen - sizeof(SMsgHead); - int32_t vgId = pTq->pStreamMeta->vgId; - - int32_t code; - SStreamTaskCheckRsp rsp; - - SDecoder decoder; - tDecoderInit(&decoder, (uint8_t*)pReq, len); - code = tDecodeStreamTaskCheckRsp(&decoder, &rsp); - if (code < 0) { - terrno = TSDB_CODE_INVALID_MSG; - tDecoderClear(&decoder); - tqError("vgId:%d failed to parse check rsp msg, code:%s", vgId, tstrerror(terrno)); - return -1; - } - - tDecoderClear(&decoder); - tqDebug("tq task:0x%x (vgId:%d) recv check rsp(reqId:0x%" PRIx64 ") from 0x%x (vgId:%d) status %d", - rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.status); - - if (!vnodeIsRoleLeader(pTq->pVnode)) { - tqError("vgId:%d not leader, task:0x%x not handle the check rsp, downstream:0x%x (vgId:%d)", vgId, - rsp.upstreamTaskId, rsp.downstreamTaskId, rsp.downstreamNodeId); - return code; - } - - SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, rsp.streamId, rsp.upstreamTaskId); - if (pTask == NULL) { - tqError("tq failed to locate the stream task:0x%" PRIx64 "-0x%x (vgId:%d), it may have been destroyed or stopped", - rsp.streamId, rsp.upstreamTaskId, pTq->pStreamMeta->vgId); - terrno = TSDB_CODE_STREAM_TASK_NOT_EXIST; - return -1; - } - - code = streamProcessCheckRsp(pTask, &rsp); - streamMetaReleaseTask(pTq->pStreamMeta, pTask); - return code; + return tqStreamTaskProcessCheckRsp(pTq->pStreamMeta, pMsg, vnodeIsRoleLeader(pTq->pVnode)); } int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { - int32_t code = 0; - int32_t vgId = TD_VID(pTq->pVnode); - - if (tsDisableStream) { - tqInfo("vgId:%d stream disabled, not deploy stream tasks", vgId); - return code; - } - - tqDebug("vgId:%d receive new stream task deploy msg, start to build stream task", vgId); - - // 1.deserialize msg and build task - int32_t size = sizeof(SStreamTask); - SStreamTask* pTask = taosMemoryCalloc(1, size); - if (pTask == NULL) { - tqError("vgId:%d failed to create stream task due to out of memory, alloc size:%d", vgId, size); - return TSDB_CODE_OUT_OF_MEMORY; - } - - SDecoder decoder; - tDecoderInit(&decoder, (uint8_t*)msg, msgLen); - code = tDecodeStreamTask(&decoder, pTask); - tDecoderClear(&decoder); - - if (code != TSDB_CODE_SUCCESS) { - taosMemoryFree(pTask); - return TSDB_CODE_INVALID_MSG; - } - - SStreamMeta* pStreamMeta = pTq->pStreamMeta; - - // 2.save task, use the latest commit version as the initial start version of stream task. - int32_t taskId = pTask->id.taskId; - int64_t streamId = pTask->id.streamId; - bool added = false; - - streamMetaWLock(pStreamMeta); - code = streamMetaRegisterTask(pStreamMeta, sversion, pTask, &added); - int32_t numOfTasks = streamMetaGetNumOfTasks(pStreamMeta); - streamMetaWUnLock(pStreamMeta); - - if (code < 0) { - tqError("failed to add s-task:0x%x into vgId:%d meta, total:%d, code:%s", vgId, taskId, numOfTasks, - tstrerror(code)); - tFreeStreamTask(pTask); - return code; - } - - // added into meta store, pTask cannot be reference since it may have been destroyed by other threads already now if - // it is added into the meta store - if (added) { - // only handled in the leader node - if (vnodeIsRoleLeader(pTq->pVnode)) { - tqDebug("vgId:%d s-task:0x%x is deployed and add into meta, numOfTasks:%d", vgId, taskId, numOfTasks); - SStreamTask* p = streamMetaAcquireTask(pStreamMeta, streamId, taskId); - - bool restored = pTq->pVnode->restored; - if (p != NULL && restored && p->info.fillHistory == 0) { - EStreamTaskEvent event = (HAS_RELATED_FILLHISTORY_TASK(p)) ? TASK_EVENT_INIT_STREAM_SCANHIST : TASK_EVENT_INIT; - streamTaskHandleEvent(p->status.pSM, event); - } else if (!restored) { - tqWarn("s-task:%s not launched since vnode(vgId:%d) not ready", p->id.idStr, vgId); - } - - if (p != NULL) { - streamMetaReleaseTask(pStreamMeta, p); - } - } else { - tqDebug("vgId:%d not leader, not launch stream task s-task:0x%x", vgId, taskId); - } - } else { - tqWarn("vgId:%d failed to add s-task:0x%x, since already exists in meta store", vgId, taskId); - tFreeStreamTask(pTask); - } - - return code; + return tqStreamTaskProcessDeployReq(pTq->pStreamMeta, sversion, msg, msgLen, vnodeIsRoleLeader(pTq->pVnode), pTq->pVnode->restored); } static void doStartFillhistoryStep2(SStreamTask* pTask, SStreamTask* pStreamTask, STQ* pTq) { @@ -1254,190 +1094,39 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { // only the agg tasks and the sink tasks will receive this message from upstream tasks int32_t tqProcessTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg) { - char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); - int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); - - // deserialize - SStreamScanHistoryFinishReq req = {0}; - - SDecoder decoder; - tDecoderInit(&decoder, (uint8_t*)msg, msgLen); - tDecodeStreamScanHistoryFinishReq(&decoder, &req); - tDecoderClear(&decoder); - - SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, req.downstreamTaskId); - if (pTask == NULL) { - tqError("vgId:%d process scan history finish msg, failed to find task:0x%x, it may be destroyed", - pTq->pStreamMeta->vgId, req.downstreamTaskId); - return -1; - } - - tqDebug("s-task:%s receive scan-history finish msg from task:0x%x", pTask->id.idStr, req.upstreamTaskId); - - int32_t code = streamProcessScanHistoryFinishReq(pTask, &req, &pMsg->info); - streamMetaReleaseTask(pTq->pStreamMeta, pTask); - return code; + return tqStreamTaskProcessScanHistoryFinishReq(pTq->pStreamMeta, pMsg); } int32_t tqProcessTaskScanHistoryFinishRsp(STQ* pTq, SRpcMsg* pMsg) { - char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); - int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); - - // deserialize - SStreamCompleteHistoryMsg req = {0}; - - SDecoder decoder; - tDecoderInit(&decoder, (uint8_t*)msg, msgLen); - tDecodeCompleteHistoryDataMsg(&decoder, &req); - tDecoderClear(&decoder); - - SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, req.upstreamTaskId); - if (pTask == NULL) { - tqError("vgId:%d process scan history finish rsp, failed to find task:0x%x, it may be destroyed", - pTq->pStreamMeta->vgId, req.upstreamTaskId); - return -1; - } - - int32_t remain = atomic_sub_fetch_32(&pTask->notReadyTasks, 1); - if (remain > 0) { - tqDebug("s-task:%s scan-history finish rsp received from downstream task:0x%x, unfinished remain:%d", - pTask->id.idStr, req.downstreamId, remain); - } else { - tqDebug( - "s-task:%s scan-history finish rsp received from downstream task:0x%x, all downstream tasks rsp scan-history " - "completed msg", - pTask->id.idStr, req.downstreamId); - streamProcessScanHistoryFinishRsp(pTask); - } - - streamMetaReleaseTask(pTq->pStreamMeta, pTask); - return 0; + return tqStreamTaskProcessScanHistoryFinishRsp(pTq->pStreamMeta, pMsg); } int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { SStreamTaskRunReq* pReq = pMsg->pCont; int32_t taskId = pReq->taskId; - int32_t vgId = TD_VID(pTq->pVnode); if (taskId == STREAM_EXEC_EXTRACT_DATA_IN_WAL_ID) { // all tasks are extracted submit data from the wal tqScanWal(pTq); return 0; - } else if (taskId == STREAM_EXEC_START_ALL_TASKS_ID) { - tqStartStreamTasks(pTq); - return 0; - } else if (taskId == STREAM_EXEC_RESTART_ALL_TASKS_ID) { - tqRestartStreamTasks(pTq); - return 0; } - - SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->streamId, taskId); - if (pTask != NULL) { // even in halt status, the data in inputQ must be processed - char* p = NULL; - if (streamTaskReadyToRun(pTask, &p)) { - tqDebug("vgId:%d s-task:%s start to process block from inputQ, next checked ver:%" PRId64, vgId, pTask->id.idStr, - pTask->chkInfo.nextProcessVer); - streamExecTask(pTask); - } else { - int8_t status = streamTaskSetSchedStatusInactive(pTask); - tqDebug("vgId:%d s-task:%s ignore run req since not in ready state, status:%s, sched-status:%d", vgId, - pTask->id.idStr, p, status); - } - - streamMetaReleaseTask(pTq->pStreamMeta, pTask); + int32_t code = tqStreamTaskProcessRunReq(pTq->pStreamMeta, pMsg, vnodeIsRoleLeader(pTq->pVnode)); + if(code == 0 && taskId > 0){ tqScanWalAsync(pTq, false); - return 0; - } else { // NOTE: pTask->status.schedStatus is not updated since it is not be handled by the run exec. - // todo add one function to handle this - tqError("vgId:%d failed to found s-task, taskId:0x%x may have been dropped", vgId, taskId); - return -1; } + return code; } -int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) { - char* msgStr = pMsg->pCont; - char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead)); - int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); - - SStreamDispatchReq req = {0}; - - SDecoder decoder; - tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen); - tDecodeStreamDispatchReq(&decoder, &req); - tDecoderClear(&decoder); - - tqDebug("s-task:0x%x recv dispatch msg from 0x%x(vgId:%d)", req.taskId, req.upstreamTaskId, req.upstreamNodeId); - - SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, req.taskId); - if (pTask) { - SRpcMsg rsp = {.info = pMsg->info, .code = 0}; - streamProcessDispatchMsg(pTask, &req, &rsp); - tDeleteStreamDispatchReq(&req); - streamMetaReleaseTask(pTq->pStreamMeta, pTask); - return 0; - } else { - tqError("vgId:%d failed to find task:0x%x to handle the dispatch req, it may have been destroyed already", - pTq->pStreamMeta->vgId, req.taskId); - tDeleteStreamDispatchReq(&req); - return -1; - } +int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg) { + return tqStreamTaskProcessDispatchReq(pTq->pStreamMeta, pMsg); } int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) { - SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); - - int32_t vgId = pTq->pStreamMeta->vgId; - pRsp->upstreamTaskId = htonl(pRsp->upstreamTaskId); - pRsp->streamId = htobe64(pRsp->streamId); - pRsp->downstreamTaskId = htonl(pRsp->downstreamTaskId); - pRsp->downstreamNodeId = htonl(pRsp->downstreamNodeId); - pRsp->stage = htobe64(pRsp->stage); - pRsp->msgId = htonl(pRsp->msgId); - - SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pRsp->streamId, pRsp->upstreamTaskId); - if (pTask) { - streamProcessDispatchRsp(pTask, pRsp, pMsg->code); - streamMetaReleaseTask(pTq->pStreamMeta, pTask); - return TSDB_CODE_SUCCESS; - } else { - tqDebug("vgId:%d failed to handle the dispatch rsp, since find task:0x%x failed", vgId, pRsp->upstreamTaskId); - terrno = TSDB_CODE_STREAM_TASK_NOT_EXIST; - return terrno; - } + return tqStreamTaskProcessDispatchRsp(pTq->pStreamMeta, pMsg); } int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen) { - SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg; - - int32_t vgId = TD_VID(pTq->pVnode); - SStreamMeta* pMeta = pTq->pStreamMeta; - tqDebug("vgId:%d receive msg to drop s-task:0x%x", vgId, pReq->taskId); - - SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId); - if (pTask != NULL) { - // drop the related fill-history task firstly - if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { - STaskId* pHTaskId = &pTask->hTaskInfo.id; - streamMetaUnregisterTask(pMeta, pHTaskId->streamId, pHTaskId->taskId); - tqDebug("vgId:%d drop fill-history task:0x%x dropped firstly", vgId, (int32_t)pHTaskId->taskId); - } - streamMetaReleaseTask(pMeta, pTask); - } - - // drop the stream task now - streamMetaUnregisterTask(pMeta, pReq->streamId, pReq->taskId); - - // commit the update - streamMetaWLock(pMeta); - int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); - tqDebug("vgId:%d task:0x%x dropped, remain tasks:%d", vgId, pReq->taskId, numOfTasks); - - if (streamMetaCommit(pMeta) < 0) { - // persist to disk - } - streamMetaWUnLock(pMeta); - - return 0; + return tqStreamTaskProcessDropReq(pTq->pStreamMeta, msg, msgLen); } int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { @@ -1546,30 +1235,7 @@ int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms } int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) { - char* msgStr = pMsg->pCont; - char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead)); - int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); - SDecoder decoder; - - SStreamRetrieveReq req; - tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen); - tDecodeStreamRetrieveReq(&decoder, &req); - tDecoderClear(&decoder); - - int32_t vgId = pTq->pStreamMeta->vgId; - SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, req.dstTaskId); - if (pTask == NULL) { - tqError("vgId:%d process retrieve req, failed to acquire task:0x%x, it may have been dropped already", vgId, - req.dstTaskId); - return -1; - } - - SRpcMsg rsp = {.info = pMsg->info, .code = 0}; - streamProcessRetrieveReq(pTask, &req, &rsp); - - streamMetaReleaseTask(pTq->pStreamMeta, pTask); - tDeleteStreamRetrieveReq(&req); - return 0; + return tqStreamTaskProcessRetrieveReq(pTq->pStreamMeta, pMsg); } int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg) { @@ -1577,89 +1243,6 @@ int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg) { return 0; } -// todo refactor. -int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) { - STQ* pTq = pVnode->pTq; - int32_t vgId = pVnode->config.vgId; - - SMsgHead* msgStr = pMsg->pCont; - char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead)); - int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); - int32_t code = 0; - - SStreamDispatchReq req; - SDecoder decoder; - tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen); - if (tDecodeStreamDispatchReq(&decoder, &req) < 0) { - code = TSDB_CODE_MSG_DECODE_ERROR; - tDecoderClear(&decoder); - goto FAIL; - } - tDecoderClear(&decoder); - - int32_t taskId = req.taskId; - tqDebug("vgId:%d receive dispatch msg to s-task:0x%" PRIx64 "-0x%x", vgId, req.streamId, taskId); - - // for test purpose - // if (req.type == STREAM_INPUT__CHECKPOINT_TRIGGER) { - // code = TSDB_CODE_STREAM_TASK_NOT_EXIST; - // goto FAIL; - // } - - SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, taskId); - if (pTask != NULL) { - SRpcMsg rsp = {.info = pMsg->info, .code = 0}; - streamProcessDispatchMsg(pTask, &req, &rsp); - tDeleteStreamDispatchReq(&req); - streamMetaReleaseTask(pTq->pStreamMeta, pTask); - rpcFreeCont(pMsg->pCont); - taosFreeQitem(pMsg); - return 0; - } else { - tDeleteStreamDispatchReq(&req); - } - - code = TSDB_CODE_STREAM_TASK_NOT_EXIST; - -FAIL: - if (pMsg->info.handle == NULL) { - tqError("s-task:0x%x vgId:%d msg handle is null, abort enqueue dispatch msg", vgId, taskId); - return -1; - } - - SMsgHead* pRspHead = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp)); - if (pRspHead == NULL) { - SRpcMsg rsp = {.code = TSDB_CODE_OUT_OF_MEMORY, .info = pMsg->info}; - tqError("s-task:0x%x send dispatch error rsp, code:%s", taskId, tstrerror(code)); - tmsgSendRsp(&rsp); - rpcFreeCont(pMsg->pCont); - taosFreeQitem(pMsg); - return -1; - } - - pRspHead->vgId = htonl(req.upstreamNodeId); - ASSERT(pRspHead->vgId != 0); - - SStreamDispatchRsp* pRsp = POINTER_SHIFT(pRspHead, sizeof(SMsgHead)); - pRsp->streamId = htobe64(req.streamId); - pRsp->upstreamTaskId = htonl(req.upstreamTaskId); - pRsp->upstreamNodeId = htonl(req.upstreamNodeId); - pRsp->downstreamNodeId = htonl(pVnode->config.vgId); - pRsp->downstreamTaskId = htonl(req.taskId); - pRsp->msgId = htonl(req.msgId); - pRsp->stage = htobe64(req.stage); - pRsp->inputStatus = TASK_OUTPUT_STATUS__NORMAL; - - int32_t len = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp); - SRpcMsg rsp = {.code = code, .info = pMsg->info, .contLen = len, .pCont = pRspHead}; - tqError("s-task:0x%x send dispatch error rsp, code:%s", taskId, tstrerror(code)); - - tmsgSendRsp(&rsp); - rpcFreeCont(pMsg->pCont); - taosFreeQitem(pMsg); - return -1; -} - int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) { int32_t vgId = TD_VID(pTq->pVnode); SStreamMeta* pMeta = pTq->pStreamMeta; @@ -1784,220 +1367,11 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) // downstream task has complete the stream task checkpoint procedure, let's start the handle the rsp by execute task int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg) { - int32_t vgId = TD_VID(pTq->pVnode); - SStreamMeta* pMeta = pTq->pStreamMeta; - char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); - int32_t len = pMsg->contLen - sizeof(SMsgHead); - int32_t code = 0; - - SStreamCheckpointReadyMsg req = {0}; - - SDecoder decoder; - tDecoderInit(&decoder, (uint8_t*)msg, len); - if (tDecodeStreamCheckpointReadyMsg(&decoder, &req) < 0) { - code = TSDB_CODE_MSG_DECODE_ERROR; - tDecoderClear(&decoder); - return code; - } - tDecoderClear(&decoder); - - SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.upstreamTaskId); - if (pTask == NULL) { - tqError("vgId:%d failed to find s-task:0x%x, it may have been destroyed already", vgId, req.downstreamTaskId); - return code; - } - - tqDebug("vgId:%d s-task:%s received the checkpoint ready msg from task:0x%x (vgId:%d), handle it", vgId, - pTask->id.idStr, req.downstreamTaskId, req.downstreamNodeId); - - streamProcessCheckpointReadyMsg(pTask); - streamMetaReleaseTask(pMeta, pTask); - return code; + return tqStreamTaskProcessCheckpointReadyMsg(pTq->pStreamMeta, pMsg); } int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { - SStreamMeta* pMeta = pTq->pStreamMeta; - int32_t vgId = TD_VID(pTq->pVnode); - char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); - int32_t len = pMsg->contLen - sizeof(SMsgHead); - SRpcMsg rsp = {.info = pMsg->info, .code = TSDB_CODE_SUCCESS}; - - SStreamTaskNodeUpdateMsg req = {0}; - - SDecoder decoder; - tDecoderInit(&decoder, (uint8_t*)msg, len); - if (tDecodeStreamTaskUpdateMsg(&decoder, &req) < 0) { - rsp.code = TSDB_CODE_MSG_DECODE_ERROR; - tqError("vgId:%d failed to decode task update msg, code:%s", vgId, tstrerror(rsp.code)); - tDecoderClear(&decoder); - return rsp.code; - } - - tDecoderClear(&decoder); - - // update the nodeEpset when it exists - streamMetaWLock(pMeta); - - // the task epset may be updated again and again, when replaying the WAL, the task may be in stop status. - STaskId id = {.streamId = req.streamId, .taskId = req.taskId}; - SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); - if (ppTask == NULL || *ppTask == NULL) { - tqError("vgId:%d failed to acquire task:0x%x when handling update, it may have been dropped already", pMeta->vgId, - req.taskId); - rsp.code = TSDB_CODE_SUCCESS; - streamMetaWUnLock(pMeta); - - taosArrayDestroy(req.pNodeList); - return rsp.code; - } - - SStreamTask* pTask = *ppTask; - - if (pMeta->updateInfo.transId != req.transId) { - pMeta->updateInfo.transId = req.transId; - tqInfo("s-task:%s receive new trans to update nodeEp msg from mnode, transId:%d", pTask->id.idStr, req.transId); - // info needs to be kept till the new trans to update the nodeEp arrived. - taosHashClear(pMeta->updateInfo.pTasks); - } else { - tqDebug("s-task:%s recv trans to update nodeEp from mnode, transId:%d", pTask->id.idStr, req.transId); - } - - STaskUpdateEntry entry = {.streamId = req.streamId, .taskId = req.taskId, .transId = req.transId}; - void* exist = taosHashGet(pMeta->updateInfo.pTasks, &entry, sizeof(STaskUpdateEntry)); - if (exist != NULL) { - tqDebug("s-task:%s (vgId:%d) already update in trans:%d, discard the nodeEp update msg", pTask->id.idStr, vgId, - req.transId); - rsp.code = TSDB_CODE_SUCCESS; - streamMetaWUnLock(pMeta); - taosArrayDestroy(req.pNodeList); - return rsp.code; - } - - streamMetaWUnLock(pMeta); - - // the following two functions should not be executed within the scope of meta lock to avoid deadlock - streamTaskUpdateEpsetInfo(pTask, req.pNodeList); - streamTaskResetStatus(pTask); - - // continue after lock the meta again - streamMetaWLock(pMeta); - - SStreamTask** ppHTask = NULL; - if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { - ppHTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &pTask->hTaskInfo.id, sizeof(pTask->hTaskInfo.id)); - if (ppHTask == NULL || *ppHTask == NULL) { - tqError("vgId:%d failed to acquire fill-history task:0x%x when handling update, it may have been dropped already", - pMeta->vgId, req.taskId); - CLEAR_RELATED_FILLHISTORY_TASK(pTask); - } else { - tqDebug("s-task:%s fill-history task update nodeEp along with stream task", (*ppHTask)->id.idStr); - streamTaskUpdateEpsetInfo(*ppHTask, req.pNodeList); - } - } - - { - streamMetaSaveTask(pMeta, pTask); - if (ppHTask != NULL) { - streamMetaSaveTask(pMeta, *ppHTask); - } - - if (streamMetaCommit(pMeta) < 0) { - // persist to disk - } - } - - streamTaskStop(pTask); - - // keep the already handled info - taosHashPut(pMeta->updateInfo.pTasks, &entry, sizeof(entry), NULL, 0); - - if (ppHTask != NULL) { - streamTaskStop(*ppHTask); - tqDebug("s-task:%s task nodeEp update completed, streamTask and related fill-history task closed", pTask->id.idStr); - taosHashPut(pMeta->updateInfo.pTasks, &(*ppHTask)->id, sizeof(pTask->id), NULL, 0); - } else { - tqDebug("s-task:%s task nodeEp update completed, streamTask closed", pTask->id.idStr); - } - - rsp.code = 0; - - // possibly only handle the stream task. - int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); - int32_t updateTasks = taosHashGetSize(pMeta->updateInfo.pTasks); - - pMeta->startInfo.tasksWillRestart = 1; - - if (updateTasks < numOfTasks) { - tqDebug("vgId:%d closed tasks:%d, unclosed:%d, all tasks will be started when nodeEp update completed", vgId, - updateTasks, (numOfTasks - updateTasks)); - streamMetaWUnLock(pMeta); - } else { - if (!pTq->pVnode->restored) { - tqDebug("vgId:%d vnode restore not completed, not restart the tasks, clear the start after nodeUpdate flag", - vgId); - pMeta->startInfo.tasksWillRestart = 0; - streamMetaWUnLock(pMeta); - } else { - tqDebug("vgId:%d all %d task(s) nodeEp updated and closed", vgId, numOfTasks); - -#if 1 - tqStartStreamTaskAsync(pTq, true); - streamMetaWUnLock(pMeta); -#else - streamMetaWUnLock(pMeta); - - // For debug purpose. - // the following procedure consume many CPU resource, result in the re-election of leader - // with high probability. So we employ it as a test case for the stream processing framework, with - // checkpoint/restart/nodeUpdate etc. - while (1) { - int32_t startVal = atomic_val_compare_exchange_32(&pMeta->startInfo.taskStarting, 0, 1); - if (startVal == 0) { - break; - } - - tqDebug("vgId:%d in start stream tasks procedure, wait for 500ms and recheck", vgId); - taosMsleep(500); - } - - while (streamMetaTaskInTimer(pMeta)) { - tqDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId); - taosMsleep(100); - } - - streamMetaWLock(pMeta); - - int32_t code = streamMetaReopen(pMeta); - if (code != 0) { - tqError("vgId:%d failed to reopen stream meta", vgId); - streamMetaWUnLock(pMeta); - taosArrayDestroy(req.pNodeList); - return -1; - } - - streamMetaInitBackend(pMeta); - - if (streamMetaLoadAllTasks(pTq->pStreamMeta) < 0) { - tqError("vgId:%d failed to load stream tasks", vgId); - streamMetaWUnLock(pMeta); - taosArrayDestroy(req.pNodeList); - return -1; - } - - if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) { - tqInfo("vgId:%d start all stream tasks after all being updated", vgId); - tqResetStreamTaskStatus(pTq); - tqStartStreamTaskAsync(pTq, false); - } else { - tqInfo("vgId:%d, follower node not start stream tasks", vgId); - } - streamMetaWUnLock(pMeta); -#endif - } - } - - taosArrayDestroy(req.pNodeList); - return rsp.code; + return tqStreamTaskProcessUpdateReq(pTq->pStreamMeta, &pTq->pVnode->msgCb, pMsg, pTq->pVnode->restored); } int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg) { diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 56dcdb2abc..1b0a76e81c 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -15,7 +15,6 @@ #include "tq.h" #include "vnd.h" -#include "stream.h" #define MAX_REPEAT_SCAN_THRESHOLD 3 #define SCAN_WAL_IDLE_DURATION 100 @@ -61,154 +60,6 @@ int32_t tqScanWal(STQ* pTq) { return 0; } -int32_t tqStartStreamTasks(STQ* pTq) { - int32_t code = TSDB_CODE_SUCCESS; - int32_t vgId = TD_VID(pTq->pVnode); - SStreamMeta* pMeta = pTq->pStreamMeta; - - int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); - tqDebug("vgId:%d start to check all %d stream task(s) downstream status", vgId, numOfTasks); - if (numOfTasks == 0) { - return TSDB_CODE_SUCCESS; - } - - SArray* pTaskList = NULL; - streamMetaWLock(pMeta); - pTaskList = taosArrayDup(pMeta->pTaskList, NULL); - taosHashClear(pMeta->startInfo.pReadyTaskSet); - taosHashClear(pMeta->startInfo.pFailedTaskSet); - pMeta->startInfo.startTs = taosGetTimestampMs(); - streamMetaWUnLock(pMeta); - - // broadcast the check downstream tasks msg - for (int32_t i = 0; i < numOfTasks; ++i) { - SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i); - SStreamTask* pTask = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId); - if (pTask == NULL) { - continue; - } - - // fill-history task can only be launched by related stream tasks. - if (pTask->info.fillHistory == 1) { - streamMetaReleaseTask(pMeta, pTask); - continue; - } - - if (pTask->status.downstreamReady == 1) { - if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { - tqDebug("s-task:%s downstream ready, no need to check downstream, check only related fill-history task", - pTask->id.idStr); - streamLaunchFillHistoryTask(pTask); - } - - streamMetaUpdateTaskDownstreamStatus(pTask, pTask->execInfo.init, pTask->execInfo.start, true); - streamMetaReleaseTask(pMeta, pTask); - continue; - } - - EStreamTaskEvent event = (HAS_RELATED_FILLHISTORY_TASK(pTask)) ? TASK_EVENT_INIT_STREAM_SCANHIST : TASK_EVENT_INIT; - int32_t ret = streamTaskHandleEvent(pTask->status.pSM, event); - if (ret != TSDB_CODE_SUCCESS) { - code = ret; - } - - streamMetaReleaseTask(pMeta, pTask); - } - - taosArrayDestroy(pTaskList); - return code; -} - -int32_t tqRestartStreamTasks(STQ* pTq) { - SStreamMeta* pMeta = pTq->pStreamMeta; - int32_t vgId = pMeta->vgId; - int32_t code = 0; - int64_t st = taosGetTimestampMs(); - - while(1) { - int32_t startVal = atomic_val_compare_exchange_32(&pMeta->startInfo.taskStarting, 0, 1); - if (startVal == 0) { - break; - } - - tqDebug("vgId:%d in start stream tasks procedure, wait for 500ms and recheck", vgId); - taosMsleep(500); - } - - terrno = 0; - tqInfo("vgId:%d tasks are all updated and stopped, restart all tasks, triggered by transId:%d", vgId, - pMeta->updateInfo.transId); - - while (streamMetaTaskInTimer(pMeta)) { - tqDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId); - taosMsleep(100); - } - - streamMetaWLock(pMeta); - code = streamMetaReopen(pMeta); - if (code != TSDB_CODE_SUCCESS) { - tqError("vgId:%d failed to reopen stream meta", vgId); - streamMetaWUnLock(pMeta); - code = terrno; - return code; - } - - streamMetaInitBackend(pMeta); - int64_t el = taosGetTimestampMs() - st; - - tqInfo("vgId:%d close&reload state elapsed time:%.3fs", vgId, el/1000.); - - code = streamMetaLoadAllTasks(pTq->pStreamMeta); - if (code != TSDB_CODE_SUCCESS) { - tqError("vgId:%d failed to load stream tasks, code:%s", vgId, tstrerror(terrno)); - streamMetaWUnLock(pMeta); - code = terrno; - return code; - } - - if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) { - tqInfo("vgId:%d restart all stream tasks after all tasks being updated", vgId); - tqResetStreamTaskStatus(pTq); - - streamMetaWUnLock(pMeta); - tqStartStreamTasks(pTq); - } else { - streamMetaResetStartInfo(&pMeta->startInfo); - streamMetaWUnLock(pMeta); - tqInfo("vgId:%d, follower node not start stream tasks", vgId); - } - - code = terrno; - return code; -} - -int32_t tqStartStreamTaskAsync(STQ* pTq, bool restart) { - SStreamMeta* pMeta = pTq->pStreamMeta; - int32_t vgId = pMeta->vgId; - - int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); - if (numOfTasks == 0) { - tqDebug("vgId:%d no stream tasks existed to run", vgId); - return 0; - } - - SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq)); - if (pRunReq == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - tqError("vgId:%d failed to create msg to start wal scanning to launch stream tasks, code:%s", vgId, terrstr()); - return -1; - } - - tqDebug("vgId:%d start all %d stream task(s) async", vgId, numOfTasks); - pRunReq->head.vgId = vgId; - pRunReq->streamId = 0; - pRunReq->taskId = restart? STREAM_EXEC_RESTART_ALL_TASKS_ID:STREAM_EXEC_START_ALL_TASKS_ID; - - SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)}; - tmsgPutToQueue(&pTq->pVnode->msgCb, STREAM_QUEUE, &msg); - return 0; -} - int32_t tqScanWalAsync(STQ* pTq, bool ckPause) { int32_t vgId = TD_VID(pTq->pVnode); SStreamMeta* pMeta = pTq->pStreamMeta; @@ -302,27 +153,6 @@ int32_t tqStopStreamTasks(STQ* pTq) { return 0; } -int32_t tqResetStreamTaskStatus(STQ* pTq) { - SStreamMeta* pMeta = pTq->pStreamMeta; - int32_t vgId = TD_VID(pTq->pVnode); - int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); - - tqDebug("vgId:%d reset all %d stream task(s) status to be uninit", vgId, numOfTasks); - if (numOfTasks == 0) { - return TSDB_CODE_SUCCESS; - } - - for (int32_t i = 0; i < numOfTasks; ++i) { - SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i); - - STaskId id = {.streamId = pTaskId->streamId, .taskId = pTaskId->taskId}; - SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); - streamTaskResetStatus(*pTask); - } - - return 0; -} - int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId) { // seek the stored version and extract data from WAL int64_t firstVer = walReaderGetValidFirstVer(pTask->exec.pWalReader); diff --git a/source/dnode/vnode/src/tqCommon/CMakeLists.txt b/source/dnode/vnode/src/tqCommon/CMakeLists.txt new file mode 100644 index 0000000000..aea0e709e3 --- /dev/null +++ b/source/dnode/vnode/src/tqCommon/CMakeLists.txt @@ -0,0 +1,20 @@ +aux_source_directory(. TQ_SOURCE_FILES) +add_library(tqCommon STATIC ${TQ_SOURCE_FILES}) +target_include_directories( + tqCommon + PUBLIC "../inc" + PUBLIC "../../inc" +) + +target_link_libraries( + tqCommon + PRIVATE stream + PRIVATE common + PRIVATE transport + PRIVATE executor + PRIVATE index + PRIVATE qcom + PRIVATE qworker + PRIVATE sync + PRIVATE tfs +) diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c new file mode 100644 index 0000000000..aee2aaa244 --- /dev/null +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -0,0 +1,809 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "tstream.h" +#include "tmsgcb.h" +#include "tq.h" + +typedef struct STaskUpdateEntry { + int64_t streamId; + int32_t taskId; + int32_t transId; +} STaskUpdateEntry; + +int32_t tqStreamTaskStartAsync(SStreamMeta* pMeta, SMsgCb* cb, bool restart) { + int32_t vgId = pMeta->vgId; + + int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); + if (numOfTasks == 0) { + tqDebug("vgId:%d no stream tasks existed to run", vgId); + return 0; + } + + SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq)); + if (pRunReq == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + tqError("vgId:%d failed to create msg to start wal scanning to launch stream tasks, code:%s", vgId, terrstr()); + return -1; + } + + tqDebug("vgId:%d start all %d stream task(s) async", vgId, numOfTasks); + pRunReq->head.vgId = vgId; + pRunReq->streamId = 0; + pRunReq->taskId = restart? STREAM_EXEC_RESTART_ALL_TASKS_ID:STREAM_EXEC_START_ALL_TASKS_ID; + + SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)}; + tmsgPutToQueue(cb, STREAM_QUEUE, &msg); + return 0; +} + +int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pMsg, bool restored) { + int32_t vgId = pMeta->vgId; + char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + int32_t len = pMsg->contLen - sizeof(SMsgHead); + SRpcMsg rsp = {.info = pMsg->info, .code = TSDB_CODE_SUCCESS}; + + SStreamTaskNodeUpdateMsg req = {0}; + + SDecoder decoder; + tDecoderInit(&decoder, (uint8_t*)msg, len); + if (tDecodeStreamTaskUpdateMsg(&decoder, &req) < 0) { + rsp.code = TSDB_CODE_MSG_DECODE_ERROR; + tqError("vgId:%d failed to decode task update msg, code:%s", vgId, tstrerror(rsp.code)); + tDecoderClear(&decoder); + return rsp.code; + } + + tDecoderClear(&decoder); + + // update the nodeEpset when it exists + streamMetaWLock(pMeta); + + // the task epset may be updated again and again, when replaying the WAL, the task may be in stop status. + STaskId id = {.streamId = req.streamId, .taskId = req.taskId}; + SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); + if (ppTask == NULL || *ppTask == NULL) { + tqError("vgId:%d failed to acquire task:0x%x when handling update, it may have been dropped already", pMeta->vgId, + req.taskId); + rsp.code = TSDB_CODE_SUCCESS; + streamMetaWUnLock(pMeta); + + taosArrayDestroy(req.pNodeList); + return rsp.code; + } + + SStreamTask* pTask = *ppTask; + + if (pMeta->updateInfo.transId != req.transId) { + pMeta->updateInfo.transId = req.transId; + tqInfo("s-task:%s receive new trans to update nodeEp msg from mnode, transId:%d", pTask->id.idStr, req.transId); + // info needs to be kept till the new trans to update the nodeEp arrived. + taosHashClear(pMeta->updateInfo.pTasks); + } else { + tqDebug("s-task:%s recv trans to update nodeEp from mnode, transId:%d", pTask->id.idStr, req.transId); + } + + STaskUpdateEntry entry = {.streamId = req.streamId, .taskId = req.taskId, .transId = req.transId}; + void* exist = taosHashGet(pMeta->updateInfo.pTasks, &entry, sizeof(STaskUpdateEntry)); + if (exist != NULL) { + tqDebug("s-task:%s (vgId:%d) already update in trans:%d, discard the nodeEp update msg", pTask->id.idStr, vgId, + req.transId); + rsp.code = TSDB_CODE_SUCCESS; + streamMetaWUnLock(pMeta); + taosArrayDestroy(req.pNodeList); + return rsp.code; + } + + streamMetaWUnLock(pMeta); + + // the following two functions should not be executed within the scope of meta lock to avoid deadlock + streamTaskUpdateEpsetInfo(pTask, req.pNodeList); + streamTaskResetStatus(pTask); + + // continue after lock the meta again + streamMetaWLock(pMeta); + + SStreamTask** ppHTask = NULL; + if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { + ppHTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &pTask->hTaskInfo.id, sizeof(pTask->hTaskInfo.id)); + if (ppHTask == NULL || *ppHTask == NULL) { + tqError("vgId:%d failed to acquire fill-history task:0x%x when handling update, it may have been dropped already", + pMeta->vgId, req.taskId); + CLEAR_RELATED_FILLHISTORY_TASK(pTask); + } else { + tqDebug("s-task:%s fill-history task update nodeEp along with stream task", (*ppHTask)->id.idStr); + streamTaskUpdateEpsetInfo(*ppHTask, req.pNodeList); + } + } + + { + streamMetaSaveTask(pMeta, pTask); + if (ppHTask != NULL) { + streamMetaSaveTask(pMeta, *ppHTask); + } + + if (streamMetaCommit(pMeta) < 0) { + // persist to disk + } + } + + streamTaskStop(pTask); + + // keep the already handled info + taosHashPut(pMeta->updateInfo.pTasks, &entry, sizeof(entry), NULL, 0); + + if (ppHTask != NULL) { + streamTaskStop(*ppHTask); + tqDebug("s-task:%s task nodeEp update completed, streamTask and related fill-history task closed", pTask->id.idStr); + taosHashPut(pMeta->updateInfo.pTasks, &(*ppHTask)->id, sizeof(pTask->id), NULL, 0); + } else { + tqDebug("s-task:%s task nodeEp update completed, streamTask closed", pTask->id.idStr); + } + + rsp.code = 0; + + // possibly only handle the stream task. + int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); + int32_t updateTasks = taosHashGetSize(pMeta->updateInfo.pTasks); + + pMeta->startInfo.tasksWillRestart = 1; + + if (updateTasks < numOfTasks) { + tqDebug("vgId:%d closed tasks:%d, unclosed:%d, all tasks will be started when nodeEp update completed", vgId, + updateTasks, (numOfTasks - updateTasks)); + streamMetaWUnLock(pMeta); + } else { + if (!restored) { + tqDebug("vgId:%d vnode restore not completed, not restart the tasks, clear the start after nodeUpdate flag", vgId); + pMeta->startInfo.tasksWillRestart = 0; + streamMetaWUnLock(pMeta); + } else { + tqDebug("vgId:%d all %d task(s) nodeEp updated and closed", vgId, numOfTasks); +#if 1 + tqStreamTaskStartAsync(pMeta, cb, true); + streamMetaWUnLock(pMeta); +#else + streamMetaWUnLock(pMeta); + + // For debug purpose. + // the following procedure consume many CPU resource, result in the re-election of leader + // with high probability. So we employ it as a test case for the stream processing framework, with + // checkpoint/restart/nodeUpdate etc. + while (1) { + int32_t startVal = atomic_val_compare_exchange_32(&pMeta->startInfo.taskStarting, 0, 1); + if (startVal == 0) { + break; + } + + tqDebug("vgId:%d in start stream tasks procedure, wait for 500ms and recheck", vgId); + taosMsleep(500); + } + + while (streamMetaTaskInTimer(pMeta)) { + tqDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId); + taosMsleep(100); + } + + streamMetaWLock(pMeta); + + int32_t code = streamMetaReopen(pMeta); + if (code != 0) { + tqError("vgId:%d failed to reopen stream meta", vgId); + streamMetaWUnLock(pMeta); + taosArrayDestroy(req.pNodeList); + return -1; + } + + streamMetaInitBackend(pMeta); + + if (streamMetaLoadAllTasks(pTq->pStreamMeta) < 0) { + tqError("vgId:%d failed to load stream tasks", vgId); + streamMetaWUnLock(pMeta); + taosArrayDestroy(req.pNodeList); + return -1; + } + + if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) { + tqInfo("vgId:%d start all stream tasks after all being updated", vgId); + resetStreamTaskStatus(pTq->pStreamMeta); + tqStartStreamTaskAsync(pTq, false); + } else { + tqInfo("vgId:%d, follower node not start stream tasks", vgId); + } + streamMetaWUnLock(pMeta); +#endif + } + } + + taosArrayDestroy(req.pNodeList); + return rsp.code; +} + +int32_t tqStreamTaskProcessDispatchReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { + char* msgStr = pMsg->pCont; + char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead)); + int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); + + SStreamDispatchReq req = {0}; + + SDecoder decoder; + tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen); + if (tDecodeStreamDispatchReq(&decoder, &req) < 0) { + tDecoderClear(&decoder); + return TSDB_CODE_MSG_DECODE_ERROR; + } + tDecoderClear(&decoder); + + tqDebug("s-task:0x%x recv dispatch msg from 0x%x(vgId:%d)", req.taskId, req.upstreamTaskId, req.upstreamNodeId); + + SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.taskId); + if (pTask) { + SRpcMsg rsp = {.info = pMsg->info, .code = 0}; + if (streamProcessDispatchMsg(pTask, &req, &rsp) != 0){ + return -1; + } + tDeleteStreamDispatchReq(&req); + streamMetaReleaseTask(pMeta, pTask); + return 0; + } else { + tqError("vgId:%d failed to find task:0x%x to handle the dispatch req, it may have been destroyed already", + pMeta->vgId, req.taskId); + + SMsgHead* pRspHead = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp)); + if (pRspHead == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + tqError("s-task:0x%x send dispatch error rsp, out of memory", req.taskId); + return -1; + } + + pRspHead->vgId = htonl(req.upstreamNodeId); + ASSERT(pRspHead->vgId != 0); + + SStreamDispatchRsp* pRsp = POINTER_SHIFT(pRspHead, sizeof(SMsgHead)); + pRsp->streamId = htobe64(req.streamId); + pRsp->upstreamTaskId = htonl(req.upstreamTaskId); + pRsp->upstreamNodeId = htonl(req.upstreamNodeId); + pRsp->downstreamNodeId = htonl(pMeta->vgId); + pRsp->downstreamTaskId = htonl(req.taskId); + pRsp->msgId = htonl(req.msgId); + pRsp->stage = htobe64(req.stage); + pRsp->inputStatus = TASK_OUTPUT_STATUS__NORMAL; + + int32_t len = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp); + SRpcMsg rsp = {.code = TSDB_CODE_STREAM_TASK_NOT_EXIST, .info = pMsg->info, .contLen = len, .pCont = pRspHead}; + tqError("s-task:0x%x send dispatch error rsp, no task", req.taskId); + + tmsgSendRsp(&rsp); + tDeleteStreamDispatchReq(&req); + + return 0; + } +} + +int32_t tqStreamTaskProcessDispatchRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { + SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + + int32_t vgId = pMeta->vgId; + pRsp->upstreamTaskId = htonl(pRsp->upstreamTaskId); + pRsp->streamId = htobe64(pRsp->streamId); + pRsp->downstreamTaskId = htonl(pRsp->downstreamTaskId); + pRsp->downstreamNodeId = htonl(pRsp->downstreamNodeId); + pRsp->stage = htobe64(pRsp->stage); + pRsp->msgId = htonl(pRsp->msgId); + + SStreamTask* pTask = streamMetaAcquireTask(pMeta, pRsp->streamId, pRsp->upstreamTaskId); + if (pTask) { + streamProcessDispatchRsp(pTask, pRsp, pMsg->code); + streamMetaReleaseTask(pMeta, pTask); + return TSDB_CODE_SUCCESS; + } else { + tqDebug("vgId:%d failed to handle the dispatch rsp, since find task:0x%x failed", vgId, pRsp->upstreamTaskId); + terrno = TSDB_CODE_STREAM_TASK_NOT_EXIST; + return terrno; + } +} + +int32_t tqStreamTaskProcessRetrieveReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { + char* msgStr = pMsg->pCont; + char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead)); + int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); + SDecoder decoder; + + SStreamRetrieveReq req; + tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen); + tDecodeStreamRetrieveReq(&decoder, &req); + tDecoderClear(&decoder); + + SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.dstTaskId); + if (pTask == NULL) { + tqError("vgId:%d process retrieve req, failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId, + req.dstTaskId); + return -1; + } + + SRpcMsg rsp = {.info = pMsg->info, .code = 0}; + streamProcessRetrieveReq(pTask, &req, &rsp); + + streamMetaReleaseTask(pMeta, pTask); + tDeleteStreamRetrieveReq(&req); + return 0; +} + +int32_t tqStreamTaskProcessScanHistoryFinishReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { + char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); + + // deserialize + SStreamScanHistoryFinishReq req = {0}; + + SDecoder decoder; + tDecoderInit(&decoder, (uint8_t*)msg, msgLen); + tDecodeStreamScanHistoryFinishReq(&decoder, &req); + tDecoderClear(&decoder); + + SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.downstreamTaskId); + if (pTask == NULL) { + tqError("vgId:%d process scan history finish msg, failed to find task:0x%x, it may be destroyed", + pMeta->vgId, req.downstreamTaskId); + return -1; + } + + tqDebug("s-task:%s receive scan-history finish msg from task:0x%x", pTask->id.idStr, req.upstreamTaskId); + + int32_t code = streamProcessScanHistoryFinishReq(pTask, &req, &pMsg->info); + streamMetaReleaseTask(pMeta, pTask); + return code; +} + +int32_t tqStreamTaskProcessScanHistoryFinishRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { + char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); + + // deserialize + SStreamCompleteHistoryMsg req = {0}; + + SDecoder decoder; + tDecoderInit(&decoder, (uint8_t*)msg, msgLen); + tDecodeCompleteHistoryDataMsg(&decoder, &req); + tDecoderClear(&decoder); + + SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.upstreamTaskId); + if (pTask == NULL) { + tqError("vgId:%d process scan history finish rsp, failed to find task:0x%x, it may be destroyed", + pMeta->vgId, req.upstreamTaskId); + return -1; + } + + int32_t remain = atomic_sub_fetch_32(&pTask->notReadyTasks, 1); + if (remain > 0) { + tqDebug("s-task:%s scan-history finish rsp received from downstream task:0x%x, unfinished remain:%d", + pTask->id.idStr, req.downstreamId, remain); + } else { + tqDebug( + "s-task:%s scan-history finish rsp received from downstream task:0x%x, all downstream tasks rsp scan-history " + "completed msg", + pTask->id.idStr, req.downstreamId); + streamProcessScanHistoryFinishRsp(pTask); + } + + streamMetaReleaseTask(pMeta, pTask); + return 0; +} + +int32_t tqStreamTaskProcessCheckReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { + char* msgStr = pMsg->pCont; + char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead)); + int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); + + SStreamTaskCheckReq req; + SDecoder decoder; + + tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen); + tDecodeStreamTaskCheckReq(&decoder, &req); + tDecoderClear(&decoder); + + int32_t taskId = req.downstreamTaskId; + + SStreamTaskCheckRsp rsp = { + .reqId = req.reqId, + .streamId = req.streamId, + .childId = req.childId, + .downstreamNodeId = req.downstreamNodeId, + .downstreamTaskId = req.downstreamTaskId, + .upstreamNodeId = req.upstreamNodeId, + .upstreamTaskId = req.upstreamTaskId, + }; + + // only the leader node handle the check request + if (pMeta->role == NODE_ROLE_FOLLOWER) { + tqError("s-task:0x%x invalid check msg from upstream:0x%x(vgId:%d), vgId:%d is follower, not handle check status msg", + taskId, req.upstreamTaskId, req.upstreamNodeId, pMeta->vgId); + rsp.status = TASK_DOWNSTREAM_NOT_LEADER; + } else { + SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, taskId); + if (pTask != NULL) { + rsp.status = streamTaskCheckStatus(pTask, req.upstreamTaskId, req.upstreamNodeId, req.stage, &rsp.oldStage); + streamMetaReleaseTask(pMeta, pTask); + + char* p = NULL; + streamTaskGetStatus(pTask, &p); + tqDebug("s-task:%s status:%s, stage:%"PRId64" recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), check_status:%d", + pTask->id.idStr, p, rsp.oldStage, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); + } else { + rsp.status = TASK_DOWNSTREAM_NOT_READY; + tqDebug("tq recv task check(taskId:0x%" PRIx64 "-0x%x not built yet) req(reqId:0x%" PRIx64 + ") from task:0x%x (vgId:%d), rsp check_status %d", + req.streamId, taskId, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); + } + } + + return streamSendCheckRsp(pMeta, &req, &rsp, &pMsg->info, taskId); +} + +int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader) { + char* pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + int32_t len = pMsg->contLen - sizeof(SMsgHead); + int32_t vgId = pMeta->vgId; + + int32_t code; + SStreamTaskCheckRsp rsp; + + SDecoder decoder; + tDecoderInit(&decoder, (uint8_t*)pReq, len); + code = tDecodeStreamTaskCheckRsp(&decoder, &rsp); + if (code < 0) { + terrno = TSDB_CODE_INVALID_MSG; + tDecoderClear(&decoder); + tqError("vgId:%d failed to parse check rsp msg, code:%s", vgId, tstrerror(terrno)); + return -1; + } + + tDecoderClear(&decoder); + tqDebug("tq task:0x%x (vgId:%d) recv check rsp(reqId:0x%" PRIx64 ") from 0x%x (vgId:%d) status %d", + rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.status); + + if (!isLeader) { + tqError("vgId:%d not leader, task:0x%x not handle the check rsp, downstream:0x%x (vgId:%d)", vgId, + rsp.upstreamTaskId, rsp.downstreamTaskId, rsp.downstreamNodeId); + return code; + } + + SStreamTask* pTask = streamMetaAcquireTask(pMeta, rsp.streamId, rsp.upstreamTaskId); + if (pTask == NULL) { + tqError("tq failed to locate the stream task:0x%" PRIx64 "-0x%x (vgId:%d), it may have been destroyed or stopped", + rsp.streamId, rsp.upstreamTaskId, vgId); + terrno = TSDB_CODE_STREAM_TASK_NOT_EXIST; + return -1; + } + + code = streamProcessCheckRsp(pTask, &rsp); + streamMetaReleaseTask(pMeta, pTask); + return code; +} + +int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg) { + int32_t vgId = pMeta->vgId; + char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + int32_t len = pMsg->contLen - sizeof(SMsgHead); + int32_t code = 0; + + SStreamCheckpointReadyMsg req = {0}; + + SDecoder decoder; + tDecoderInit(&decoder, (uint8_t*)msg, len); + if (tDecodeStreamCheckpointReadyMsg(&decoder, &req) < 0) { + code = TSDB_CODE_MSG_DECODE_ERROR; + tDecoderClear(&decoder); + return code; + } + tDecoderClear(&decoder); + + SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.upstreamTaskId); + if (pTask == NULL) { + tqError("vgId:%d failed to find s-task:0x%x, it may have been destroyed already", vgId, req.downstreamTaskId); + return code; + } + + tqDebug("vgId:%d s-task:%s received the checkpoint ready msg from task:0x%x (vgId:%d), handle it", vgId, + pTask->id.idStr, req.downstreamTaskId, req.downstreamNodeId); + + streamProcessCheckpointReadyMsg(pTask); + streamMetaReleaseTask(pMeta, pTask); + return code; +} + +int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, int64_t sversion, char* msg, int32_t msgLen, bool isLeader, bool restored) { + int32_t code = 0; + int32_t vgId = pMeta->vgId; + + if (tsDisableStream) { + tqInfo("vgId:%d stream disabled, not deploy stream tasks", vgId); + return code; + } + + tqDebug("vgId:%d receive new stream task deploy msg, start to build stream task", vgId); + + // 1.deserialize msg and build task + int32_t size = sizeof(SStreamTask); + SStreamTask* pTask = taosMemoryCalloc(1, size); + if (pTask == NULL) { + tqError("vgId:%d failed to create stream task due to out of memory, alloc size:%d", vgId, size); + return TSDB_CODE_OUT_OF_MEMORY; + } + + SDecoder decoder; + tDecoderInit(&decoder, (uint8_t*)msg, msgLen); + code = tDecodeStreamTask(&decoder, pTask); + tDecoderClear(&decoder); + + if (code != TSDB_CODE_SUCCESS) { + taosMemoryFree(pTask); + return TSDB_CODE_INVALID_MSG; + } + + // 2.save task, use the latest commit version as the initial start version of stream task. + int32_t taskId = pTask->id.taskId; + int64_t streamId = pTask->id.streamId; + bool added = false; + + streamMetaWLock(pMeta); + code = streamMetaRegisterTask(pMeta, sversion, pTask, &added); + int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); + streamMetaWUnLock(pMeta); + + if (code < 0) { + tqError("failed to add s-task:0x%x into vgId:%d meta, total:%d, code:%s", vgId, taskId, numOfTasks, tstrerror(code)); + tFreeStreamTask(pTask); + return code; + } + + // added into meta store, pTask cannot be reference since it may have been destroyed by other threads already now if + // it is added into the meta store + if (added) { + // only handled in the leader node + if (isLeader) { + tqDebug("vgId:%d s-task:0x%x is deployed and add into meta, numOfTasks:%d", vgId, taskId, numOfTasks); + SStreamTask* p = streamMetaAcquireTask(pMeta, streamId, taskId); + + if (p != NULL && restored && p->info.fillHistory == 0) { + EStreamTaskEvent event = (HAS_RELATED_FILLHISTORY_TASK(p)) ? TASK_EVENT_INIT_STREAM_SCANHIST : TASK_EVENT_INIT; + streamTaskHandleEvent(p->status.pSM, event); + } else if (!restored) { + tqWarn("s-task:%s not launched since vnode(vgId:%d) not ready", p->id.idStr, vgId); + } + + if (p != NULL) { + streamMetaReleaseTask(pMeta, p); + } + } else { + tqDebug("vgId:%d not leader, not launch stream task s-task:0x%x", vgId, taskId); + } + } else { + tqWarn("vgId:%d failed to add s-task:0x%x, since already exists in meta store", vgId, taskId); + tFreeStreamTask(pTask); + } + + return code; +} + +int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen) { + SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg; + + int32_t vgId = pMeta->vgId; + tqDebug("vgId:%d receive msg to drop s-task:0x%x", vgId, pReq->taskId); + + SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId); + if (pTask != NULL) { + // drop the related fill-history task firstly + if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { + STaskId* pHTaskId = &pTask->hTaskInfo.id; + streamMetaUnregisterTask(pMeta, pHTaskId->streamId, pHTaskId->taskId); + tqDebug("vgId:%d drop fill-history task:0x%x dropped firstly", vgId, (int32_t)pHTaskId->taskId); + } + streamMetaReleaseTask(pMeta, pTask); + } + + // drop the stream task now + streamMetaUnregisterTask(pMeta, pReq->streamId, pReq->taskId); + + // commit the update + streamMetaWLock(pMeta); + int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); + tqDebug("vgId:%d task:0x%x dropped, remain tasks:%d", vgId, pReq->taskId, numOfTasks); + + if (streamMetaCommit(pMeta) < 0) { + // persist to disk + } + streamMetaWUnLock(pMeta); + + return 0; +} + +int32_t startStreamTasks(SStreamMeta* pMeta) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t vgId = pMeta->vgId; + + int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); + tqDebug("vgId:%d start to check all %d stream task(s) downstream status", vgId, numOfTasks); + if (numOfTasks == 0) { + return TSDB_CODE_SUCCESS; + } + + SArray* pTaskList = NULL; + streamMetaWLock(pMeta); + pTaskList = taosArrayDup(pMeta->pTaskList, NULL); + taosHashClear(pMeta->startInfo.pReadyTaskSet); + taosHashClear(pMeta->startInfo.pFailedTaskSet); + pMeta->startInfo.startTs = taosGetTimestampMs(); + streamMetaWUnLock(pMeta); + + // broadcast the check downstream tasks msg + for (int32_t i = 0; i < numOfTasks; ++i) { + SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i); + SStreamTask* pTask = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId); + if (pTask == NULL) { + continue; + } + + // fill-history task can only be launched by related stream tasks. + if (pTask->info.fillHistory == 1) { + streamMetaReleaseTask(pMeta, pTask); + continue; + } + + if (pTask->status.downstreamReady == 1) { + if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { + tqDebug("s-task:%s downstream ready, no need to check downstream, check only related fill-history task", + pTask->id.idStr); + streamLaunchFillHistoryTask(pTask); + } + + streamMetaUpdateTaskDownstreamStatus(pTask, pTask->execInfo.init, pTask->execInfo.start, true); + streamMetaReleaseTask(pMeta, pTask); + continue; + } + + EStreamTaskEvent event = (HAS_RELATED_FILLHISTORY_TASK(pTask)) ? TASK_EVENT_INIT_STREAM_SCANHIST : TASK_EVENT_INIT; + int32_t ret = streamTaskHandleEvent(pTask->status.pSM, event); + if (ret != TSDB_CODE_SUCCESS) { + code = ret; + } + + streamMetaReleaseTask(pMeta, pTask); + } + + taosArrayDestroy(pTaskList); + return code; +} + +int32_t resetStreamTaskStatus(SStreamMeta* pMeta) { + int32_t vgId = pMeta->vgId; + int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); + + tqDebug("vgId:%d reset all %d stream task(s) status to be uninit", vgId, numOfTasks); + if (numOfTasks == 0) { + return TSDB_CODE_SUCCESS; + } + + for (int32_t i = 0; i < numOfTasks; ++i) { + SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i); + + STaskId id = {.streamId = pTaskId->streamId, .taskId = pTaskId->taskId}; + SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); + streamTaskResetStatus(*pTask); + } + + return 0; +} + +static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) { + int32_t vgId = pMeta->vgId; + int32_t code = 0; + int64_t st = taosGetTimestampMs(); + + while(1) { + int32_t startVal = atomic_val_compare_exchange_32(&pMeta->startInfo.taskStarting, 0, 1); + if (startVal == 0) { + break; + } + + tqDebug("vgId:%d in start stream tasks procedure, wait for 500ms and recheck", vgId); + taosMsleep(500); + } + + terrno = 0; + tqInfo("vgId:%d tasks are all updated and stopped, restart all tasks, triggered by transId:%d", vgId, + pMeta->updateInfo.transId); + + while (streamMetaTaskInTimer(pMeta)) { + tqDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId); + taosMsleep(100); + } + + streamMetaWLock(pMeta); + code = streamMetaReopen(pMeta); + if (code != TSDB_CODE_SUCCESS) { + tqError("vgId:%d failed to reopen stream meta", vgId); + streamMetaWUnLock(pMeta); + code = terrno; + return code; + } + + streamMetaInitBackend(pMeta); + int64_t el = taosGetTimestampMs() - st; + + tqInfo("vgId:%d close&reload state elapsed time:%.3fs", vgId, el/1000.); + + code = streamMetaLoadAllTasks(pMeta); + if (code != TSDB_CODE_SUCCESS) { + tqError("vgId:%d failed to load stream tasks, code:%s", vgId, tstrerror(terrno)); + streamMetaWUnLock(pMeta); + code = terrno; + return code; + } + + if (isLeader && !tsDisableStream) { + tqInfo("vgId:%d restart all stream tasks after all tasks being updated", vgId); + resetStreamTaskStatus(pMeta); + + streamMetaWUnLock(pMeta); + startStreamTasks(pMeta); + } else { + streamMetaResetStartInfo(&pMeta->startInfo); + streamMetaWUnLock(pMeta); + tqInfo("vgId:%d, follower node not start stream tasks", vgId); + } + + code = terrno; + return code; +} + +int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader) { + SStreamTaskRunReq* pReq = pMsg->pCont; + + int32_t taskId = pReq->taskId; + int32_t vgId = pMeta->vgId; + + if (taskId == STREAM_EXEC_START_ALL_TASKS_ID) { + startStreamTasks(pMeta); + return 0; + } else if (taskId == STREAM_EXEC_RESTART_ALL_TASKS_ID) { + restartStreamTasks(pMeta, isLeader); + return 0; + } + + SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, taskId); + if (pTask != NULL) { // even in halt status, the data in inputQ must be processed + char* p = NULL; + if (streamTaskReadyToRun(pTask, &p)) { + tqDebug("vgId:%d s-task:%s start to process block from inputQ, next checked ver:%" PRId64, vgId, pTask->id.idStr, + pTask->chkInfo.nextProcessVer); + streamExecTask(pTask); + } else { + int8_t status = streamTaskSetSchedStatusInactive(pTask); + tqDebug("vgId:%d s-task:%s ignore run req since not in ready state, status:%s, sched-status:%d", vgId, + pTask->id.idStr, p, status); + } + + streamMetaReleaseTask(pMeta, pTask); + return 0; + } else { // NOTE: pTask->status.schedStatus is not updated since it is not be handled by the run exec. + // todo add one function to handle this + tqError("vgId:%d failed to found s-task, taskId:0x%x may have been dropped", vgId, taskId); + return -1; + } +} + + diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 8cbca403e3..1f951097a4 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -463,7 +463,6 @@ int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) { break; } -_exit: if (code) { vError("vgId:%d, failed to preprocess write request since %s, msg type:%s", TD_VID(pVnode), tstrerror(code), TMSG_INFO(pMsg->msgType)); @@ -767,7 +766,7 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) case TDMT_STREAM_TASK_RUN: return tqProcessTaskRunReq(pVnode->pTq, pMsg); case TDMT_STREAM_TASK_DISPATCH: - return tqProcessTaskDispatchReq(pVnode->pTq, pMsg, true); + return tqProcessTaskDispatchReq(pVnode->pTq, pMsg); case TDMT_STREAM_TASK_DISPATCH_RSP: return tqProcessTaskDispatchRsp(pVnode->pTq, pMsg); case TDMT_VND_STREAM_TASK_CHECK: diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 3944f8ed91..817d5124a2 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -18,6 +18,7 @@ #include "sync.h" #include "tsdb.h" #include "vnd.h" +#include "tqCommon.h" #define BATCH_ENABLE 0 @@ -569,8 +570,8 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) vInfo("vgId:%d, sync restore finished, not launch stream tasks, since stream tasks are disabled", vgId); } else { vInfo("vgId:%d sync restore finished, start to launch stream tasks", pVnode->config.vgId); - tqResetStreamTaskStatus(pVnode->pTq); - tqStartStreamTaskAsync(pVnode->pTq, false); + resetStreamTaskStatus(pVnode->pTq->pStreamMeta); + tqStreamTaskStartAsync(pMeta, &pVnode->msgCb, false); } } else { vInfo("vgId:%d, sync restore finished, not launch stream tasks since not leader", vgId); diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index ab7951bb92..1c874f34de 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -276,6 +276,7 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S int32_t code = buildDispatchRsp(pTask, pReq, status, &pRsp->pCont); if (code != TSDB_CODE_SUCCESS) { stError("s-task:%s failed to build dispatch rsp, msgId:%d, code:%s", id, pReq->msgId, tstrerror(code)); + terrno = code; return code; } diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index 469813defc..71f1e9b45b 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -409,7 +409,6 @@ static void addIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId) { int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) { ASSERT(pTask->id.taskId == pRsp->upstreamTaskId); const char* id = pTask->id.idStr; - int32_t vgId = pTask->pMeta->vgId; if (streamTaskShouldStop(pTask)) { stDebug("s-task:%s should stop, do not do check downstream again", id); @@ -461,14 +460,15 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs "s-task:%s vgId:%d self vnode-transfer/leader-change/restart detected, old stage:%"PRId64", current stage:%"PRId64", " "not check wait for downstream task nodeUpdate, and all tasks restart", id, pRsp->upstreamNodeId, pRsp->oldStage, pTask->pMeta->stage); + addIntoNodeUpdateList(pTask, pRsp->upstreamNodeId); } else { stError( "s-task:%s downstream taskId:0x%x (vgId:%d) not leader, self dispatch epset needs to be updated, not check " "downstream again, nodeUpdate needed", id, pRsp->downstreamTaskId, pRsp->downstreamNodeId); + addIntoNodeUpdateList(pTask, pRsp->downstreamNodeId); } - addIntoNodeUpdateList(pTask, pRsp->downstreamNodeId); streamMetaUpdateTaskDownstreamStatus(pTask, pTask->execInfo.init, taosGetTimestampMs(), false); } else { // TASK_DOWNSTREAM_NOT_READY, let's retry in 100ms diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 0623860bd9..db0217f000 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -791,4 +791,5 @@ void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc) pDst->sinkDataSize = pSrc->sinkDataSize; pDst->activeCheckpointId = pSrc->activeCheckpointId; pDst->checkpointFailed = pSrc->checkpointFailed; -} \ No newline at end of file +} + diff --git a/source/os/src/osFile.c b/source/os/src/osFile.c index 229387a1b6..15aca85fc2 100644 --- a/source/os/src/osFile.c +++ b/source/os/src/osFile.c @@ -648,6 +648,7 @@ int taosOpenFileNotStream(const char *path, int32_t tdFileOptions) { access |= (tdFileOptions & TD_FILE_TEXT) ? O_TEXT : 0; access |= (tdFileOptions & TD_FILE_EXCL) ? O_EXCL : 0; access |= (tdFileOptions & TD_FILE_CLOEXEC) ? O_CLOEXEC : 0; + int fd = open(path, access, S_IRWXU | S_IRWXG | S_IRWXO); return fd; } diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 7ff42ad05e..8cb23c081f 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -299,6 +299,8 @@ e ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/precisionUS.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/precisionNS.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/test_ts4219.py +,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/test_ts4295.py +,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/test_td27388.py ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/show.py ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/show_tag_index.py ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/information_schema.py diff --git a/tests/system-test/1-insert/test_td27388.py b/tests/system-test/1-insert/test_td27388.py new file mode 100644 index 0000000000..7b49a63dbb --- /dev/null +++ b/tests/system-test/1-insert/test_td27388.py @@ -0,0 +1,97 @@ +import random +import string +from util.log import * +from util.cases import * +from util.sql import * +from util.sqlset import * +from util import constant +from util.common import * + + +class TDTestCase: + """Verify the insert with format exception for task TD-27388 + """ + def init(self, conn, logSql, replicaVar=1): + self.replicaVar = int(replicaVar) + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor()) + self.dbname = 'db' + self.stbname = 'st' + tdSql.execute("create database {};".format(self.dbname)) + tdSql.execute("use {};".format(self.dbname)) + tdSql.execute("create table st (ts timestamp, col1 int, col2 varchar(64)) tags (t1 int, t2 varchar(32));") + + def test_half_quotes(self): + sql_list = [ + "insert into t1 using st tags(1, 'tag1) values(now, 1, 'test msg');", + "insert into t1 using st tags(1, tag1') values(now, 1, 'test msg');", + "insert into t1 using st tags(1, 'tag1') values(now, 1, 'test msg);", + "insert into t1 using st tags(1, 'tag1') values(now, 1, test msg');", + "insert into t1 using st tags(1, 'tag1' values(now, 1, test msg');", + "insert into t1 using st tags(1, 'tag1') values(now, 1, 'test msg)';", + "insert into t1 using st tags(1, 'tag1') values(now, 1, 'test msg);", + "insert into t1 using st tags(1, 'tag1') values(now, 1, 'test msg') t2 using st tags(2, 'tag2) values(now, 2, 'test msg');", + "insert into t1 using st tags(1, 'tag1') values(now, 1, 'test msg') t2 using st tags(2, tag2') values(now, 2, 'test msg');", + "insert into t1 using st tags(1, 'tag1) values(now, 1, 'test msg') t2 using st tags(2, 'tag2) values(now, 2, 'test msg');", + "insert into t1 using st tags(1, 'tag1') values(now, 1, 'test msg) t2 using st tags(2, 'tag2') values(now, 2, test msg');", + "insert into t1 using st tags(1, 'tag1') values(now, 1, 'test msg') t2 using st tags(2, 'tag2') values(now, 2, 'test msg);", + "insert into t1 using st tags(1, 'tag1') values(now, 1, 'test msg') t2 using st tags(2, 'tag2') values(now, 2, 'test msg);" + ] + for sql in sql_list: + tdLog.debug("execute harlf quotes sql: %s" % sql) + tdSql.error(sql) + + def test_esc(self): + sql_list = [ + "insert into t1 using st tags(1, 'tag1\\') values(now, 1, 'test msg');", + "insert into t1 using st tags(1, \\'tag1') values(now, 1, 'test msg');", + "insert into t1 using st tags(1, 'tag1') values(now, 1, 'test msg\\');", + "insert into t1 using st tags(1, 'tag1') values(now, 1, \\'test msg');", + "insert into t1 using st tags(1, \\'tag1\\') values(now, 1, 'test msg');", + "insert into t1 using st tags(1, 'tag1') values(now, 1, \\'test msg\\');", + "insert into t1 using st tags(1, \\'tag1\\') values(now, 1, \\'test msg\\');", + "insert into t1 using st tags(1, 'tag1') values(now, 1, 'test msg') t2 using st tags(2, 'tag2\\') values(now, 2, 'test msg');", + "insert into t1 using st tags(1, 'tag1') values(now, 1, 'test msg') t2 using st tags(2, \\'tag2') values(now, 2, 'test msg');", + "insert into t1 using st tags(1, 'tag1') values(now, 1, 'test msg') t2 using st tags(2, 'tag2') values(now, 2, \\'test msg');", + "insert into t1 using st tags(1, 'tag1') values(now, 1, 'test msg') t2 using st tags(2, 'tag2') values(now, 2, 'test msg\\');", + "insert into t1 using st tags(1, \\'tag1') values(now, 1, 'test msg') t2 using st tags(2, 'tag2\\') values(now, 2, 'test msg');", + "insert into t1 using st tags(1, 'tag1') values(now, 1, 'test msg') t2 using st tags(2, 'tag2') values(now, 2, \\'test msg\\');" + ] + for sql in sql_list: + tdLog.debug("execute escape character sql: %s" % sql) + tdSql.error(sql) + + def test_specific_character(self): + sql_list = [ + "insert into t1 using st tags(1, 'tag1$) values(now, 1, 'test msg');", + "insert into t1 using st tags(1, 'tag1,) values(now, 1, 'test msg');", + "insert into t1 using st tags(1, 'tag1'') values(now, 1, 'test msg');", + "insert into t1 using st tags(1, 'tag1() values(now, 1, 'test msg');", + "insert into t1 using st tags(1, 'tag1*) values(now, 1, 'test msg');", + "insert into t1 using st tags(1, 'tag1+) values(now, 1, 'test msg');", + "insert into t1 using st tags(1, 'tag1,) values(now, 1, 'test msg');", + "isnert into t1 using st tags(1, 'tag1-) values(now, 1, 'test msg');", + "insert into t1 using st tags(1, 'tag1.) values(now, 1, 'test msg');", + "insert into t1 using st tags(1, 'tag1') values(now, 1, 'test msg$);", + "insert into t1 using st tags(1, 'tag1') values(now, 1, 'test msg,);", + "insert into t1 using st tags(1, 'tag1') values(now, 1, 'test msg+%+-.);", + "insert into t1 using st tags(1, 'tag1') values(now, 1, 'test msg') t2 using st tags(2, 'tag2$) values(now, 2, 'test msg');", + "insert into t1 using st tags(1, 'tag1') values(now, 1, 'test msg') t2 using st tags(2, 'tag2,) values(now, 2, 'test msg');", + "insert into t1 using st tags(1, 'tag1') values(now, 1, 'test msg') t2 using st tags(2, 'tag2'') values(now, 2, 'test msg');", + "insert into t1 using st tags(1, 'tag1') values(now, 1, 'test msg') t2 using st tags(2, 'tag2() values(now, 2, 'test msg');" + ] + for sql in sql_list: + tdLog.debug("execute specific character sql: %s" % sql) + tdSql.error(sql) + + def run(self): + self.test_half_quotes() + self.test_esc() + self.test_specific_character() + + def stop(self): + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + +tdCases.addWindows(__file__, TDTestCase()) +tdCases.addLinux(__file__, TDTestCase()) diff --git a/tests/system-test/1-insert/test_ts4295.py b/tests/system-test/1-insert/test_ts4295.py new file mode 100644 index 0000000000..89e445f3c1 --- /dev/null +++ b/tests/system-test/1-insert/test_ts4295.py @@ -0,0 +1,49 @@ +import os +import sys +from util.log import * +from util.cases import * +from util.sql import * +from util.dnodes import tdDnodes +from math import inf +import taos + +class TDTestCase: + """Verify inserting varbinary type data of ts-4295 + """ + def init(self, conn, logSql, replicaVer=1): + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor(), True) + self.conn = conn + self.db_name = "db" + self.stable_name = "st" + + def run(self): + tdSql.execute("create database if not exists %s" % self.db_name) + tdSql.execute("use %s" % self.db_name) + # create super table + tdSql.execute("create table %s (ts timestamp, c1 varbinary(32)) tags (t1 int)" % self.stable_name) + # create child table + child_table_list = [] + for i in range(10): + child_table_name = "ct_" + str(i+1) + child_table_list.append(child_table_name) + tdSql.execute("create table %s using st tags(%s);" % (child_table_name, str(i+1))) + tdLog.info("create table %s successfully" % child_table_name) + # insert data + for i in range(100): + sql = "insert into table_name values" + for j in range(10000): + sql += "(now+%ss, '0x7661726331')," % str(j+1) + for child_table in child_table_list: + tdSql.execute(sql.replace("table_name", child_table)) + tdLog.info("Insert data into %s successfully" % child_table) + tdLog.info("Insert data round %s successfully" % str(i+1)) + tdSql.execute("flush database %s" % self.db_name) + + def stop(self): + tdSql.execute("drop database if exists %s" % self.db_name) + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + +tdCases.addWindows(__file__, TDTestCase()) +tdCases.addLinux(__file__, TDTestCase())