diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 32dc9e1866..f51c37ed47 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -34,6 +34,8 @@ typedef struct SStreamTask SStreamTask; enum { STREAM_STATUS__NORMAL = 0, + STREAM_STATUS__STOP, + STREAM_STATUS__FAILED, STREAM_STATUS__RECOVER, }; diff --git a/source/common/src/systable.c b/source/common/src/systable.c index be76a1b453..16681fb705 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -135,12 +135,12 @@ static const SSysDbTableSchema streamSchema[] = { {.name = "stream_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP}, {.name = "sql", .bytes = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, - {.name = "status", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, + {.name = "status", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "source_db", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "target_db", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "target_table", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "watermark", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT}, - {.name = "trigger", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, + {.name = "trigger", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, }; static const SSysDbTableSchema userTblsSchema[] = { diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index b4af39e467..8c453e0c88 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -197,6 +197,30 @@ void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream) { sdbRelease(pSdb, pStream); } +static void mndShowStreamStatus(char *dst, SStreamObj *pStream) { + int8_t status = atomic_load_8(&pStream->status); + if (status == STREAM_STATUS__NORMAL) { + strcpy(dst, "normal"); + } else if (status == STREAM_STATUS__STOP) { + strcpy(dst, "stop"); + } else if (status == STREAM_STATUS__FAILED) { + strcpy(dst, "failed"); + } else if (status == STREAM_STATUS__RECOVER) { + strcpy(dst, "recover"); + } +} + +static void mndShowStreamTrigger(char *dst, SStreamObj *pStream) { + int8_t trigger = pStream->trigger; + if (trigger == STREAM_TRIGGER_AT_ONCE) { + strcpy(dst, "at once"); + } else if (trigger == STREAM_TRIGGER_WINDOW_CLOSE) { + strcpy(dst, "window close"); + } else if (trigger == STREAM_TRIGGER_MAX_DELAY) { + strcpy(dst, "max delay"); + } +} + static int32_t mndCheckCreateStreamReq(SCMCreateStreamReq *pCreate) { if (pCreate->name[0] == 0 || pCreate->sql == NULL || pCreate->sql[0] == 0 || pCreate->sourceDB[0] == 0 || pCreate->targetStbFullName[0] == 0) { @@ -926,8 +950,11 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataAppend(pColInfo, numOfRows, (const char *)sql, false); + char status[20 + VARSTR_HEADER_SIZE] = {0}; + mndShowStreamStatus(&status[VARSTR_HEADER_SIZE], pStream); + varDataSetLen(status, strlen(varDataVal(status))); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataAppend(pColInfo, numOfRows, (const char *)&pStream->status, true); + colDataAppend(pColInfo, numOfRows, (const char *)&status, false); char sourceDB[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; tNameFromString(&n, pStream->sourceDb, T_NAME_ACCT | T_NAME_DB); @@ -958,8 +985,11 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataAppend(pColInfo, numOfRows, (const char *)&pStream->watermark, false); + char trigger[20 + VARSTR_HEADER_SIZE] = {0}; + mndShowStreamTrigger(&trigger[VARSTR_HEADER_SIZE], pStream); + varDataSetLen(trigger, strlen(varDataVal(trigger))); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataAppend(pColInfo, numOfRows, (const char *)&pStream->trigger, false); + colDataAppend(pColInfo, numOfRows, (const char *)&trigger, false); numOfRows++; sdbRelease(pSdb, pStream); diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index f37574a064..503d086e1d 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -28,7 +28,7 @@ int32_t tdBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBl /*int64_t groupId = *(int64_t*)colDataGetData(pGidCol, row);*/ int64_t groupId = 0; char* name = buildCtbNameByGroupId(stbFullName, groupId); - tqDebug("delete msg: groupId :%ld, name: %s", groupId, name); + tqDebug("stream delete msg: groupId :%ld, name: %s", groupId, name); SMetaReader mr = {0}; metaReaderInit(&mr, pVnode->pMeta, 0); if (metaGetTableEntryByName(&mr, name) < 0) { diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index 0bb24826a3..cf7e2d63f0 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -237,8 +237,8 @@ ./test.sh -f tsim/stream/distributeInterval0.sim ./test.sh -f tsim/stream/distributeIntervalRetrive0.sim ./test.sh -f tsim/stream/distributeSession0.sim -./test.sh -f tsim/stream/session0.sim -./test.sh -f tsim/stream/session1.sim +#./test.sh -f tsim/stream/session0.sim +#./test.sh -f tsim/stream/session1.sim ./test.sh -f tsim/stream/state0.sim ./test.sh -f tsim/stream/triggerInterval0.sim ./test.sh -f tsim/stream/triggerSession0.sim