diff --git a/source/common/src/systable.c b/source/common/src/systable.c index 7cd33955c1..12ec97080f 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -160,7 +160,7 @@ static const SSysDbTableSchema streamSchema[] = { static const SSysDbTableSchema streamTaskSchema[] = { {.name = "stream_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, - {.name = "task_id", .bytes = 8, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, + {.name = "task_id", .bytes = 32, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "node_type", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "node_id", .bytes = 8, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, {.name = "level", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index a3f03a46a9..be6cfae0be 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1221,12 +1221,16 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock while (numOfRows < rowsCapacity) { pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream); - if (pShow->pIter == NULL) break; + if (pShow->pIter == NULL) { + break; + } // lock taosRLockLatch(&pStream->lock); + // count task num int32_t sz = taosArrayGetSize(pStream->tasks); + int32_t count = 0; for (int32_t i = 0; i < sz; i++) { SArray *pLevel = taosArrayGetP(pStream->tasks, i); @@ -1236,10 +1240,12 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock if (numOfRows + count > rowsCapacity) { blockDataEnsureCapacity(pBlock, numOfRows + count); } + // add row for each task for (int32_t i = 0; i < sz; i++) { SArray *pLevel = taosArrayGetP(pStream->tasks, i); int32_t levelCnt = taosArrayGetSize(pLevel); + for (int32_t j = 0; j < levelCnt; j++) { SStreamTask *pTask = taosArrayGetP(pLevel, j); @@ -1249,12 +1255,19 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock // stream name char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; STR_WITH_MAXSIZE_TO_VARSTR(streamName, mndGetDbStr(pStream->name), sizeof(streamName)); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, (const char *)streamName, false); // task id pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetVal(pColInfo, numOfRows, (const char *)&pTask->id.taskId, false); + + char idstr[128] = {0}; + int32_t len = tintToHex(pTask->id.taskId, &idstr[4]); + idstr[2] = '0'; + idstr[3] = 'x'; + varDataSetLen(idstr, len + 2); + colDataSetVal(pColInfo, numOfRows, idstr, false); // node type char nodeType[20 + VARSTR_HEADER_SIZE] = {0}; @@ -1283,8 +1296,8 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock } else if (pTask->info.taskLevel == TASK_LEVEL__SINK) { memcpy(varDataVal(level), "sink", 4); varDataSetLen(level, 4); - } else if (pTask->info.taskLevel == TASK_LEVEL__SINK) { } + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, (const char *)&level, false); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 4d5e32c7cd..8ddf920985 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1203,8 +1203,8 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { if (pTask->historyTaskId.taskId == 0) { *pWindow = (STimeWindow){INT64_MIN, INT64_MAX}; - tqDebug("s-task:%s without associated stream task, reset the time window:%" PRId64 " - %" PRId64, pId, - pWindow->skey, pWindow->ekey); + tqDebug("s-task:%s no associated task, reset the time window:%" PRId64 " - %" PRId64, pId, pWindow->skey, + pWindow->ekey); } else { tqDebug("s-task:%s history data scan completed, now start to scan data from wal, start ver:%" PRId64 ", window:%" PRId64 " - %" PRId64, pId, pTask->chkInfo.currentVer, pWindow->skey, pWindow->ekey); diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index 833f59fe8d..ea7327f741 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -207,7 +207,7 @@ int32_t doSetOffsetForWalReader(SStreamTask *pTask, int32_t vgId) { int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { *pScanIdle = true; - bool noNewDataInWal = true; + bool noDataInWal = true; int32_t vgId = pStreamMeta->vgId; int32_t numOfTasks = taosArrayGetSize(pStreamMeta->pTaskList); @@ -235,7 +235,6 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { int32_t status = pTask->status.taskStatus; if (pTask->info.taskLevel != TASK_LEVEL__SOURCE) { -// tqTrace("s-task:%s level:%d not source task, no need to start", pTask->id.idStr, pTask->info.taskLevel); streamMetaReleaseTask(pStreamMeta, pTask); continue; } @@ -261,36 +260,44 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { continue; } + int32_t numOfItemsInQ = taosQueueItemSize(pTask->inputQueue->queue); + // append the data for the stream SStreamQueueItem* pItem = NULL; code = extractMsgFromWal(pTask->exec.pWalReader, (void**) &pItem, pTask->id.idStr); - if (code != TSDB_CODE_SUCCESS) { // failed, continue + + if ((code != TSDB_CODE_SUCCESS || pItem == NULL) && (numOfItemsInQ == 0)) { // failed, continue streamMetaReleaseTask(pStreamMeta, pTask); continue; } - // delete ignore - if (pItem == NULL) { - streamMetaReleaseTask(pStreamMeta, pTask); - continue; + noDataInWal = false; + + if (pItem != NULL) { + code = tAppendDataToInputQueue(pTask, pItem); + if (code == TSDB_CODE_SUCCESS) { + pTask->chkInfo.currentVer = walReaderGetCurrentVer(pTask->exec.pWalReader); + tqDebug("s-task:%s set the ver:%" PRId64 " from WALReader after extract block from WAL", pTask->id.idStr, + pTask->chkInfo.currentVer); + } else { + tqError("s-task:%s append input queue failed, too many in inputQ, ver:%" PRId64, pTask->id.idStr, + pTask->chkInfo.currentVer); + } } - noNewDataInWal = false; - - code = tqAddInputBlockNLaunchTask(pTask, pItem); - if (code == TSDB_CODE_SUCCESS) { - pTask->chkInfo.currentVer = walReaderGetCurrentVer(pTask->exec.pWalReader); - tqDebug("s-task:%s set the ver:%" PRId64 " from WALReader after extract block from WAL", pTask->id.idStr, - pTask->chkInfo.currentVer); - } else { - tqError("s-task:%s append input queue failed, ver:%" PRId64, pTask->id.idStr, pTask->chkInfo.currentVer); + if ((code == TSDB_CODE_SUCCESS) || (numOfItemsInQ > 0)) { + code = streamSchedExec(pTask); + if (code != TSDB_CODE_SUCCESS) { + streamMetaReleaseTask(pStreamMeta, pTask); + return -1; + } } streamMetaReleaseTask(pStreamMeta, pTask); } // all wal are checked, and no new data available in wal. - if (noNewDataInWal) { + if (noDataInWal) { *pScanIdle = true; } diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index ad79bc87d7..fab608c2b5 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -131,10 +131,9 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu return doSetStreamBlock(pOperator->pDownstream[0], input, numOfBlocks, type, id); } else { pOperator->status = OP_NOT_OPENED; - SStreamScanInfo* pInfo = pOperator->info; - qDebug("s-task:%s in this batch, all %d blocks need to be processed and dump results", id, (int32_t)numOfBlocks); + qDebug("s-task:%s in this batch, %d blocks need to be processed", id, (int32_t)numOfBlocks); ASSERT(pInfo->validBlockIndex == 0 && taosArrayGetSize(pInfo->pBlockLists) == 0); if (type == STREAM_INPUT__MERGED_SUBMIT) { diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index a1cc74054a..b5c83bc10b 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -113,6 +113,7 @@ int32_t streamSchedExec(SStreamTask* pTask) { if (pRunReq == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); + qError("failed to create msg to aunch s-task:%s, reason out of memory", pTask->id.idStr); return -1; } @@ -170,8 +171,8 @@ int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq, // enqueue if (pData != NULL) { - qDebug("s-task:%s (child %d) recv retrieve req from task:0x%x, reqId:0x%" PRIx64, pTask->id.idStr, pTask->info.selfChildId, - pReq->srcTaskId, pReq->reqId); + qDebug("s-task:%s (child %d) recv retrieve req from task:0x%x(vgId:%d), reqId:0x%" PRIx64, pTask->id.idStr, pTask->info.selfChildId, + pReq->srcTaskId, pReq->srcNodeId, pReq->reqId); pData->type = STREAM_INPUT__DATA_RETRIEVE; pData->srcVgId = 0; @@ -308,9 +309,7 @@ int32_t streamProcessRunReq(SStreamTask* pTask) { } int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pRsp) { - qDebug("s-task:%s receive retrieve req from taskId:0x%x (vgId:%d)", pTask->id.idStr, pReq->srcTaskId, pReq->srcNodeId); streamTaskEnqueueRetrieve(pTask, pReq, pRsp); - ASSERT(pTask->info.taskLevel != TASK_LEVEL__SINK); streamSchedExec(pTask); return 0; diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 4768f5aed3..3c150e924f 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -111,7 +111,8 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i block.info.childId = pTask->info.selfChildId; taosArrayPush(pRes, &block); numOfBlocks += 1; - qDebug("s-task:%s(child %d) processed retrieve, reqId:0x%" PRIx64, pTask->id.idStr, pTask->info.selfChildId, + + qDebug("s-task:%s(child %d) retrieve process completed, reqId:0x%" PRIx64" dump results", pTask->id.idStr, pTask->info.selfChildId, pRetrieveBlock->reqId); } diff --git a/tests/script/tsim/stream/sliding.sim b/tests/script/tsim/stream/sliding.sim index 7617233dc6..18893245fa 100644 --- a/tests/script/tsim/stream/sliding.sim +++ b/tests/script/tsim/stream/sliding.sim @@ -575,7 +575,6 @@ endi $loop_count = 0 print step 7 - sql create database test3 vgroups 6; sql use test3; sql create stable st(ts timestamp, a int, b int, c int, d double) tags(ta int,tb int,tc int); diff --git a/tests/system-test/1-insert/db_tb_name_check.py b/tests/system-test/1-insert/db_tb_name_check.py index 23bb539620..fa43603e25 100644 --- a/tests/system-test/1-insert/db_tb_name_check.py +++ b/tests/system-test/1-insert/db_tb_name_check.py @@ -44,7 +44,7 @@ class TDTestCase: new_dbname = list(dbname) new_dbname.insert(i,j) dbname_1 = ''.join(new_dbname) - tdSql.execute(f'create database if not exists `{dbname_1}`') + tdSql.execute(f'create database if not exists `{dbname_1}` vgroups 1 replica 1') tdSql.query('select * from information_schema.ins_databases') tdSql.checkEqual(tdSql.queryResult[2][0],str(dbname_1)) tdSql.execute(f'drop database `{dbname_1}`') @@ -56,7 +56,7 @@ class TDTestCase: def tb_name_check(self): dbname = tdCom.getLongName(10) - tdSql.execute(f'create database if not exists `{dbname}`') + tdSql.execute(f'create database if not exists `{dbname}` vgroups 1 replica 1') tdSql.execute(f'use `{dbname}`') tbname = tdCom.getLongName(5) for i in self.special_name: