diff --git a/include/libs/executor/dataSinkMgt.h b/include/libs/executor/dataSinkMgt.h index ed7cbc8125..ce7d038d42 100644 --- a/include/libs/executor/dataSinkMgt.h +++ b/include/libs/executor/dataSinkMgt.h @@ -29,7 +29,6 @@ extern "C" { #define DS_BUF_FULL 2 #define DS_BUF_EMPTY 3 -struct SDataSink; struct SSDataBlock; typedef struct SDeleterRes { diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 103f807191..aade34e965 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -335,18 +335,17 @@ struct SStreamTask { // meta typedef struct SStreamMeta { - char* path; - TDB* db; - TTB* pTaskDb; - TTB* pCheckpointDb; - SHashObj* pTasks; - void* ahandle; - TXN* txn; - FTaskExpand* expandFunc; - int32_t vgId; - SRWLatch lock; - int8_t walScan; - bool quit; + char* path; + TDB* db; + TTB* pTaskDb; + TTB* pCheckpointDb; + SHashObj* pTasks; + void* ahandle; + TXN* txn; + FTaskExpand* expandFunc; + int32_t vgId; + SRWLatch lock; + int8_t walScan; } SStreamMeta; int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo); @@ -359,10 +358,6 @@ void tFreeStreamTask(SStreamTask* pTask); int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem); bool tInputQueueIsFull(const SStreamTask* pTask); -static FORCE_INLINE void streamTaskInputFail(SStreamTask* pTask) { - atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED); -} - typedef struct { SMsgHead head; int64_t streamId; @@ -538,6 +533,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pMsg); // int32_t streamProcessRetrieveRsp(SStreamTask* pTask, SStreamRetrieveRsp* pRsp); +void streamTaskInputFail(SStreamTask* pTask); int32_t streamTryExec(SStreamTask* pTask); int32_t streamSchedExec(SStreamTask* pTask); int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBlock* pBlock); diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index c007f84790..db17e4f533 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -179,6 +179,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver); int32_t tqStreamTasksScanWal(STQ* pTq); // tq util +char* createStreamTaskIdStr(int64_t streamId, int32_t taskId); void createStreamTaskOffsetKey(char* dst, uint64_t streamId, uint32_t taskId); int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem, int64_t ver); int32_t launchTaskForWalBlock(SStreamTask* pTask, SFetchRet* pRet, STqOffset* pOffset); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index ce4efe905f..8f26d5868c 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -20,6 +20,8 @@ // 2: wait to be inited or cleaup #define WAL_READ_TASKS_ID (-1) +static int32_t tqInitialize(STQ* pTq); + int32_t tqInit() { int8_t old; while (1) { @@ -109,25 +111,32 @@ STQ* tqOpen(const char* path, SVnode* pVnode) { pTq->pCheckInfo = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK); taosHashSetFreeFp(pTq->pCheckInfo, (FDelete)tDeleteSTqCheckInfo); + tqInitialize(pTq); + return pTq; +} + +int32_t tqInitialize(STQ* pTq) { if (tqMetaOpen(pTq) < 0) { - return NULL; + return -1; } pTq->pOffsetStore = tqOffsetOpen(pTq); if (pTq->pOffsetStore == NULL) { - return NULL; + return -1; } - pTq->pStreamMeta = streamMetaOpen(path, pTq, (FTaskExpand*)tqExpandTask, pTq->pVnode->config.vgId); + pTq->pStreamMeta = streamMetaOpen(pTq->path, pTq, (FTaskExpand*)tqExpandTask, pTq->pVnode->config.vgId); if (pTq->pStreamMeta == NULL) { - return NULL; + return -1; } - if (streamLoadTasks(pTq->pStreamMeta, walGetCommittedVer(pVnode->pWal)) < 0) { - return NULL; + // the version is kept in task's meta data + // todo check if this version is required or not + if (streamLoadTasks(pTq->pStreamMeta, walGetCommittedVer(pTq->pVnode->pWal)) < 0) { + return -1; } - return pTq; + return 0; } void tqClose(STQ* pTq) { @@ -548,12 +557,8 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg } int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { - // todo extract method - char buf[128] = {0}; - sprintf(buf, "0x%"PRIx64"-%d", pTask->id.streamId, pTask->id.taskId); - int32_t vgId = TD_VID(pTq->pVnode); - pTask->id.idStr = taosStrdup(buf); + pTask->id.idStr = createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId); pTask->refCnt = 1; pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE; pTask->inputQueue = streamQueueOpen(); @@ -633,8 +638,11 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { } streamSetupTrigger(pTask); - tqInfo("vgId:%d expand stream task, s-task:%s, ver:%" PRId64 " child id:%d, level:%d", vgId, pTask->id.idStr, + tqInfo("vgId:%d expand stream task, s-task:%s, checkpoint ver:%" PRId64 " child id:%d, level:%d", vgId, pTask->id.idStr, pTask->chkInfo.version, pTask->selfChildId, pTask->taskLevel); + + // next valid version will add one + pTask->chkInfo.version += 1; return 0; } @@ -750,7 +758,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms tDecoderClear(&decoder); - // 2.save task + // 2.save task, use the newest commit version as the initial start version of stream task. code = streamMetaAddDeployedTask(pTq->pStreamMeta, sversion, pTask); if (code < 0) { tqError("vgId:%d failed to add s-task:%s, total:%d", TD_VID(pTq->pVnode), pTask->id.idStr, @@ -1278,6 +1286,13 @@ int32_t tqStartStreamTasks(STQ* pTq) { SStreamMeta* pMeta = pTq->pStreamMeta; taosWLockLatch(&pMeta->lock); + int32_t numOfTasks = taosHashGetSize(pTq->pStreamMeta->pTasks); + if (numOfTasks == 0) { + tqInfo("vgId:%d no stream tasks exists", vgId); + taosWUnLockLatch(&pTq->pStreamMeta->lock); + return 0; + } + pMeta->walScan += 1; if (pMeta->walScan > 1) { @@ -1294,8 +1309,6 @@ int32_t tqStartStreamTasks(STQ* pTq) { return -1; } - int32_t numOfTasks = taosHashGetSize(pTq->pStreamMeta->pTasks); - tqInfo("vgId:%d start wal scan stream tasks, tasks:%d", vgId, numOfTasks); initOffsetForAllRestoreTasks(pTq); diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index 6ed74ddcc3..cba51cdee4 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -52,9 +52,6 @@ int tqStreamTasksScanWal(STQ* pTq) { double el = (taosGetTimestampMs() - st) / 1000.0; tqInfo("vgId:%d scan wal for stream tasks completed, elapsed time:%.2f sec", vgId, el); - - // restore wal scan flag -// atomic_store_8(&pTq->pStreamMeta->walScan, 0); return 0; } @@ -99,8 +96,8 @@ int32_t streamTaskReplayWal(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetSto continue; } - if (pTask->status.taskStatus == TASK_STATUS__RECOVER_PREPARE || - pTask->status.taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) { + int8_t status = pTask->status.taskStatus; + if (status == TASK_STATUS__RECOVER_PREPARE || status == TASK_STATUS__WAIT_DOWNSTREAM) { tqDebug("s-task:%s skip push data, not ready for processing, status %d", pTask->id.idStr, pTask->status.taskStatus); continue; diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 4c37e1052f..5ac747947f 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -19,6 +19,12 @@ static int32_t tqSendMetaPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqMetaRsp* pRsp); +char* createStreamTaskIdStr(int64_t streamId, int32_t taskId) { + char buf[128] = {0}; + sprintf(buf, "0x%" PRIx64 "-%d", streamId, taskId); + return taosStrdup(buf); +} + // stream_task:stream_id:task_id void createStreamTaskOffsetKey(char* dst, uint64_t streamId, uint32_t taskId) { int32_t n = 12; @@ -144,6 +150,21 @@ static int32_t tqInitTaosxRsp(STaosxRsp* pRsp, const SMqPollReq* pReq) { pRsp->blockSchema = taosArrayInit(0, sizeof(void*)); if (pRsp->blockData == NULL || pRsp->blockDataLen == NULL || pRsp->blockTbName == NULL || pRsp->blockSchema == NULL) { + if (pRsp->blockData != NULL) { + pRsp->blockData = taosArrayDestroy(pRsp->blockData); + } + + if (pRsp->blockDataLen != NULL) { + pRsp->blockDataLen = taosArrayDestroy(pRsp->blockDataLen); + } + + if (pRsp->blockTbName != NULL) { + pRsp->blockTbName = taosArrayDestroy(pRsp->blockTbName); + } + + if (pRsp->blockSchema != NULL) { + pRsp->blockSchema = taosArrayDestroy(pRsp->blockSchema); + } return -1; } @@ -277,6 +298,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, if (offset->type != TMQ_OFFSET__LOG) { if (tqScanTaosx(pTq, pHandle, &taosxRsp, &metaRsp, offset) < 0) { + tDeleteSTaosxRsp(&taosxRsp); return -1; } @@ -352,6 +374,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, tDeleteSTaosxRsp(&taosxRsp); return code; } + code = 0; taosMemoryFreeClear(pCkHead); tDeleteSTaosxRsp(&taosxRsp); diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 89686c3d33..362dc51ad5 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -1881,8 +1881,8 @@ static FORCE_INLINE STSchema* getLatestTableSchema(STsdbReader* pReader, uint64_ return pReader->pSchema; } - pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, uid, -1, 1); - if (pReader->pSchema == NULL) { + int32_t code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, -1, &pReader->pSchema); + if (code != TSDB_CODE_SUCCESS || pReader->pSchema == NULL) { tsdbError("failed to get table schema, uid:%" PRIu64 ", it may have been dropped, ver:-1, %s", uid, pReader->idStr); } @@ -1890,9 +1890,15 @@ static FORCE_INLINE STSchema* getLatestTableSchema(STsdbReader* pReader, uint64_ } static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader* pReader, uint64_t uid) { + int32_t code = 0; + // always set the newest schema version in pReader->pSchema if (pReader->pSchema == NULL) { - pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, uid, -1, 1); + code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, -1, &pReader->pSchema); + if (code != TSDB_CODE_SUCCESS) { + terrno = code; + return NULL; + } } if (pReader->pSchema && sversion == pReader->pSchema->version) { @@ -1905,7 +1911,7 @@ static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader* } STSchema* ptr = NULL; - int32_t code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &ptr); + code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &ptr); if (code != TSDB_CODE_SUCCESS) { terrno = code; return NULL; @@ -1969,7 +1975,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* // DESC: mem -----> imem -----> last block -----> file block if (pReader->order == TSDB_ORDER_ASC) { if (minKey == key) { - init = true; + init = true; // todo check if pReader->pSchema is null or not int32_t code = tsdbRowMergerInit(&merge, NULL, &fRow, pReader->pSchema); if (code != TSDB_CODE_SUCCESS) { return code; @@ -2014,6 +2020,10 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* if (minKey == k.ts) { init = true; STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid); + if (pSchema == NULL) { + return terrno; + } + int32_t code = tsdbRowMergerInit(&merge, NULL, pRow, pSchema); if (code != TSDB_CODE_SUCCESS) { return code; @@ -2222,6 +2232,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* if (pSchema == NULL) { return code; } + STSchema* piSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid); if (piSchema == NULL) { return code; @@ -3843,11 +3854,8 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, return terrno; } - if (pReader->pSchema == NULL) { - pReader->pSchema = pTSchema; - } - - code = tsdbRowMergerInit(&merge, pReader->pSchema, ¤t, pTSchema); + STSchema* ps = (pReader->pSchema != NULL)? pReader->pSchema:pTSchema; + code = tsdbRowMergerInit(&merge, ps, ¤t, pTSchema); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -3891,7 +3899,14 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p TSDBKEY k = TSDBROW_KEY(pRow); TSDBKEY ik = TSDBROW_KEY(piRow); STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid); + if (pSchema == NULL) { + return terrno; + } + STSchema* piSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid); + if (piSchema == NULL) { + return terrno; + } if (ASCENDING_TRAVERSE(pReader->order)) { // ascending order imem --> mem int32_t code = tsdbRowMergerInit(&merge, pSchema, piRow, piSchema); @@ -4000,10 +4015,11 @@ int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, SRow* pT int64_t uid = pScanInfo->uid; int32_t code = TSDB_CODE_SUCCESS; - int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock); - SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo; STSchema* pSchema = doGetSchemaForTSRow(pTSRow->sver, pReader, uid); + if (pSchema == NULL) { + return terrno; + } SColVal colVal = {0}; int32_t i = 0, j = 0; @@ -5187,8 +5203,6 @@ int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) { } int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int64_t* suid) { - int32_t sversion = 1; - SMetaReader mr = {0}; metaReaderInit(&mr, pVnode->pMeta, 0); int32_t code = metaGetTableEntryByUidCache(&mr, uid); @@ -5200,6 +5214,7 @@ int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int6 *suid = 0; + // only child table and ordinary table is allowed, super table is not allowed. if (mr.me.type == TSDB_CHILD_TABLE) { tDecoderClear(&mr.coder); *suid = mr.me.ctbEntry.suid; @@ -5209,9 +5224,7 @@ int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int6 metaReaderClear(&mr); return terrno; } - sversion = mr.me.stbEntry.schemaRow.version; - } else if (mr.me.type == TSDB_NORMAL_TABLE) { - sversion = mr.me.ntbEntry.schemaRow.version; + } else if (mr.me.type == TSDB_NORMAL_TABLE) { // do nothing } else { terrno = TSDB_CODE_INVALID_PARA; metaReaderClear(&mr); @@ -5219,9 +5232,10 @@ int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int6 } metaReaderClear(&mr); - *pSchema = metaGetTbTSchema(pVnode->pMeta, uid, sversion, 1); - return TSDB_CODE_SUCCESS; + // get the newest table schema version + code = metaGetTbTSchemaEx(pVnode->pMeta, *suid, uid, -1, pSchema); + return code; } int32_t tsdbTakeReadSnap(STsdbReader* pReader, _query_reseek_func_t reseek, STsdbReadSnap** ppSnap) { diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index d4a394b584..dc2d709d76 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -552,7 +552,7 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) walApplyVer(pVnode->pWal, commitIdx); pVnode->restored = true; - vInfo("vgId:%d, sync restore finished", pVnode->config.vgId); + vInfo("vgId:%d, sync restore finished, start to restore stream tasks by replay wal", pVnode->config.vgId); // start to restore all stream tasks tqStartStreamTasks(pVnode->pTq); diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index 9b8f034e44..2e92f9e396 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -108,6 +108,7 @@ uint64_t tableListGetSize(const STableListInfo* pTableList); uint64_t tableListGetSuid(const STableListInfo* pTableList); STableKeyInfo* tableListGetInfo(const STableListInfo* pTableList, int32_t index); int32_t tableListFind(const STableListInfo* pTableList, uint64_t uid, int32_t startIndex); +void tableListGetSourceTableInfo(const STableListInfo* pTableList, uint64_t* psuid, uint64_t* uid, int32_t* type); size_t getResultRowSize(struct SqlFunctionCtx* pCtx, int32_t numOfOutput); void initResultRowInfo(SResultRowInfo* pResultRowInfo); diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 2cb6626b03..85424fd7de 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -427,7 +427,6 @@ typedef struct STimeWindowAggSupp { } STimeWindowAggSupp; typedef struct SStreamScanInfo { - uint64_t tableUid; // queried super table uid SExprInfo* pPseudoExpr; int32_t numOfPseudoExpr; SExprSupp tbnameCalSup; diff --git a/source/libs/executor/src/dataDeleter.c b/source/libs/executor/src/dataDeleter.c index dc3bf83a91..d693faf7f1 100644 --- a/source/libs/executor/src/dataDeleter.c +++ b/source/libs/executor/src/dataDeleter.c @@ -240,6 +240,7 @@ int32_t createDataDeleter(SDataSinkManager* pManager, const SDataSinkNode* pData SDataDeleterHandle* deleter = taosMemoryCalloc(1, sizeof(SDataDeleterHandle)); if (NULL == deleter) { + taosMemoryFree(pParam); code = TSDB_CODE_OUT_OF_MEMORY; goto _end; } diff --git a/source/libs/executor/src/dataInserter.c b/source/libs/executor/src/dataInserter.c index e9c46843c0..90d740bebd 100644 --- a/source/libs/executor/src/dataInserter.c +++ b/source/libs/executor/src/dataInserter.c @@ -408,6 +408,7 @@ int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDat void* pParam) { SDataInserterHandle* inserter = taosMemoryCalloc(1, sizeof(SDataInserterHandle)); if (NULL == inserter) { + taosMemoryFree(pParam); terrno = TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY; } diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 7d318786ba..f61fd1ae01 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -27,15 +27,21 @@ #include "executorimpl.h" #include "tcompression.h" +typedef struct STableListIdInfo { + uint64_t suid; + uint64_t uid; + int32_t tableType; +} STableListIdInfo; + // If the numOfOutputGroups is 1, the data blocks that belongs to different groups will be provided randomly // The numOfOutputGroups is specified by physical plan. and will not be affect by numOfGroups struct STableListInfo { - bool oneTableForEachGroup; - int32_t numOfOuputGroups; // the data block will be generated one by one - int32_t* groupOffset; // keep the offset value for each group in the tableList - SArray* pTableList; - SHashObj* map; // speedup acquire the tableQueryInfo by table uid - uint64_t suid; + bool oneTableForEachGroup; + int32_t numOfOuputGroups; // the data block will be generated one by one + int32_t* groupOffset; // keep the offset value for each group in the tableList + SArray* pTableList; + SHashObj* map; // speedup acquire the tableQueryInfo by table uid + STableListIdInfo idInfo; // this maybe the super table or ordinary table }; typedef struct tagFilterAssist { @@ -470,7 +476,7 @@ int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableLis } // int64_t stt = taosGetTimestampUs(); - code = metaGetTableTags(metaHandle, pTableListInfo->suid, pUidTagList); + code = metaGetTableTags(metaHandle, pTableListInfo->idInfo.suid, pUidTagList); if (code != TSDB_CODE_SUCCESS) { goto end; } @@ -953,7 +959,7 @@ static int32_t doFilterByTagCond(STableListInfo* pListInfo, SArray* pUidList, SN FilterCondType condType = checkTagCond(pTagCond); - int32_t filter = optimizeTbnameInCond(metaHandle, pListInfo->suid, pUidTagList, pTagCond); + int32_t filter = optimizeTbnameInCond(metaHandle, pListInfo->idInfo.suid, pUidTagList, pTagCond); if (filter == 0) { // tbname in filter is activated, do nothing and return taosArrayClear(pUidList); @@ -966,12 +972,12 @@ static int32_t doFilterByTagCond(STableListInfo* pListInfo, SArray* pUidList, SN terrno = 0; } else { if ((condType == FILTER_NO_LOGIC || condType == FILTER_AND) && status != SFLT_NOT_INDEX) { - code = metaGetTableTagsByUids(metaHandle, pListInfo->suid, pUidTagList); + code = metaGetTableTagsByUids(metaHandle, pListInfo->idInfo.suid, pUidTagList); } else { - code = metaGetTableTags(metaHandle, pListInfo->suid, pUidTagList); + code = metaGetTableTags(metaHandle, pListInfo->idInfo.suid, pUidTagList); } if (code != TSDB_CODE_SUCCESS) { - qError("failed to get table tags from meta, reason:%s, suid:%" PRIu64, tstrerror(code), pListInfo->suid); + qError("failed to get table tags from meta, reason:%s, suid:%" PRIu64, tstrerror(code), pListInfo->idInfo.suid); terrno = code; goto end; } @@ -1025,15 +1031,17 @@ int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode, int32_t code = TSDB_CODE_SUCCESS; size_t numOfTables = 0; - pListInfo->suid = pScanNode->suid; + pListInfo->idInfo.suid = pScanNode->suid; + pListInfo->idInfo.tableType = pScanNode->tableType; + SArray* pUidList = taosArrayInit(8, sizeof(uint64_t)); SIdxFltStatus status = SFLT_NOT_INDEX; if (pScanNode->tableType != TSDB_SUPER_TABLE) { + pListInfo->idInfo.uid = pScanNode->uid; if (metaIsTableExist(metaHandle, pScanNode->uid)) { taosArrayPush(pUidList, &pScanNode->uid); } - code = doFilterByTagCond(pListInfo, pUidList, pTagCond, metaHandle, status); if (code != TSDB_CODE_SUCCESS) { goto _end; @@ -1794,7 +1802,9 @@ uint64_t tableListGetSize(const STableListInfo* pTableList) { return taosArrayGetSize(pTableList->pTableList); } -uint64_t tableListGetSuid(const STableListInfo* pTableList) { return pTableList->suid; } +uint64_t tableListGetSuid(const STableListInfo* pTableList) { + return pTableList->idInfo.suid; +} STableKeyInfo* tableListGetInfo(const STableListInfo* pTableList, int32_t index) { if (taosArrayGetSize(pTableList->pTableList) == 0) { @@ -1819,6 +1829,12 @@ int32_t tableListFind(const STableListInfo* pTableList, uint64_t uid, int32_t st return -1; } +void tableListGetSourceTableInfo(const STableListInfo* pTableList, uint64_t* psuid, uint64_t* uid, int32_t* type) { + *psuid = pTableList->idInfo.suid; + *uid = pTableList->idInfo.uid; + *type = pTableList->idInfo.tableType; +} + uint64_t getTableGroupId(const STableListInfo* pTableList, uint64_t tableUid) { int32_t* slot = taosHashGet(pTableList->map, &tableUid, sizeof(tableUid)); ASSERT(pTableList->map != NULL && slot != NULL); diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 6e3a7d8725..1732ec04a7 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -14,6 +14,7 @@ */ #include "executor.h" +#include #include "executorimpl.h" #include "planner.h" #include "tdatablock.h" @@ -327,6 +328,13 @@ static SArray* filterUnqualifiedTables(const SStreamScanInfo* pScanInfo, const S return qa; } + STableScanInfo* pTableScanInfo = pScanInfo->pTableScanOp->info; + + uint64_t suid = 0; + uint64_t uid = 0; + int32_t type = 0; + tableListGetSourceTableInfo(pTableScanInfo->base.pTableListInfo, &suid, &uid, &type); + // let's discard the tables those are not created according to the queried super table. SMetaReader mr = {0}; metaReaderInit(&mr, pScanInfo->readHandle.meta, 0); @@ -341,9 +349,21 @@ static SArray* filterUnqualifiedTables(const SStreamScanInfo* pScanInfo, const S tDecoderClear(&mr.coder); - // TODO handle ntb case - if (mr.me.type != TSDB_CHILD_TABLE || mr.me.ctbEntry.suid != pScanInfo->tableUid) { + if (mr.me.type == TSDB_SUPER_TABLE) { continue; + } else { + if (type == TSDB_SUPER_TABLE) { + // this new created child table does not belong to the scanned super table. + if (mr.me.type != TSDB_CHILD_TABLE || mr.me.ctbEntry.suid != suid) { + continue; + } + } else { // ordinary table + // In case that the scanned target table is an ordinary table. When replay the WAL during restore the vnode, we + // should check all newly created ordinary table to make sure that this table isn't the destination table. + if (mr.me.uid != uid) { + continue; + } + } } if (pScanInfo->pTagCond != NULL) { @@ -382,7 +402,7 @@ int32_t qUpdateTableListForStreamScanner(qTaskInfo_t tinfo, const SArray* tableI SStreamScanInfo* pScanInfo = pInfo->info; if (isAdd) { // add new table id - SArray* qa = filterUnqualifiedTables(pScanInfo, tableIdList, GET_TASKID(pTaskInfo)); + SArray* qa = filterUnqualifiedTables(pScanInfo, tableIdList, id); int32_t numOfQualifiedTables = taosArrayGetSize(qa); qDebug("%d qualified child tables added into stream scanner, %s", numOfQualifiedTables, id); code = tqReaderAddTbUidList(pScanInfo->tqReader, qa); @@ -497,10 +517,8 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, goto _error; } + // pSinkParam has been freed during create sinker. code = dsCreateDataSinker(pSubplan->pDataSink, handle, pSinkParam, (*pTask)->id.str); - if (code != TSDB_CODE_SUCCESS) { - taosMemoryFreeClear(pSinkParam); - } } qDebug("subplan task create completed, TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, pSubplan->id.queryId); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index ae396a4c68..2389c7252e 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -690,7 +690,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { } uint32_t status = 0; - int32_t code = loadDataBlock(pOperator, &pTableScanInfo->base, pBlock, &status); + code = loadDataBlock(pOperator, &pTableScanInfo->base, pBlock, &status); if (code != TSDB_CODE_SUCCESS) { T_LONG_JMP(pTaskInfo->env, code); } @@ -2441,7 +2441,6 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys } pInfo->readHandle = *pHandle; - pInfo->tableUid = pScanPhyNode->uid; pTaskInfo->streamInfo.snapshotVer = pHandle->version; pInfo->pCreateTbRes = buildCreateTableBlock(&pInfo->tbnameCalSup, &pInfo->tagCalSup); blockDataEnsureCapacity(pInfo->pCreateTbRes, 8); diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 7171b52912..0f000f1f50 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -16,7 +16,7 @@ #include "streamInc.h" #include "ttimer.h" -#define STREAM_TASK_INPUT_QUEUEU_CAPACITY 2000 +#define STREAM_TASK_INPUT_QUEUEU_CAPACITY 100000 int32_t streamInit() { int8_t old; @@ -352,4 +352,8 @@ void* streamQueueNextItem(SStreamQueue* queue) { } return streamQueueCurItem(queue); } +} + +void streamTaskInputFail(SStreamTask* pTask) { + atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED); } \ No newline at end of file diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index a9f6d29bf5..549374ed94 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -230,23 +230,21 @@ int32_t streamDispatchOneCheckReq(SStreamTask* pTask, const SStreamTaskCheckReq* SEncoder encoder; tEncoderInit(&encoder, abuf, tlen); if ((code = tEncodeSStreamTaskCheckReq(&encoder, pReq)) < 0) { - goto FAIL; + rpcFreeCont(buf); + return code; } + tEncoderClear(&encoder); msg.contLen = tlen + sizeof(SMsgHead); msg.pCont = buf; msg.msgType = TDMT_STREAM_TASK_CHECK; - qDebug("dispatch from s-task:%s to downstream s-task:%"PRIx64":%d node %d: check msg", pTask->id.idStr, - pReq->streamId, pReq->downstreamTaskId, nodeId); + qDebug("dispatch from s-task:%s to downstream s-task:%" PRIx64 ":%d node %d: check msg", pTask->id.idStr, + pReq->streamId, pReq->downstreamTaskId, nodeId); tmsgSendReq(pEpSet, &msg); - return 0; -FAIL: - if (buf) rpcFreeCont(buf); - return code; } int32_t streamDispatchOneRecoverFinishReq(SStreamTask* pTask, const SStreamRecoverFinishReq* pReq, int32_t vgId, diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 3d896c08ac..9a6ff302ef 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -189,7 +189,10 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { qDebug("task %d scan exec dispatch block num %d", pTask->id.taskId, batchCnt); streamDispatch(pTask); } - if (finished) break; + + if (finished) { + break; + } } return 0; } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index c3a7b70904..c9ea0c382a 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -298,8 +298,7 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) { tDecodeStreamTask(&decoder, pTask); tDecoderClear(&decoder); - // todo set correct initial version. - if (pMeta->expandFunc(pMeta->ahandle, pTask, 0) < 0) { + if (pMeta->expandFunc(pMeta->ahandle, pTask, pTask->chkInfo.version) < 0) { tdbFree(pKey); tdbFree(pVal); tdbTbcClose(pCur); diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 03afc0692d..55c745e417 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -102,8 +102,10 @@ int32_t streamRecheckOneDownstream(SStreamTask* pTask, const SStreamTaskCheckRsp .downstreamNodeId = pRsp->downstreamNodeId, .childId = pRsp->childId, }; - qDebug("task %d at node %d check downstream task %d at node %d (recheck)", pTask->id.taskId, pTask->nodeId, + + qDebug("s-task:%s at node %d check downstream task %d at node %d (recheck)", pTask->id.idStr, pTask->nodeId, req.downstreamTaskId, req.downstreamNodeId); + if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { streamDispatchOneCheckReq(pTask, &req, pRsp->downstreamNodeId, &pTask->fixedEpDispatcher.epSet); } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { @@ -116,6 +118,7 @@ int32_t streamRecheckOneDownstream(SStreamTask* pTask, const SStreamTaskCheckRsp } } } + return 0; } @@ -158,9 +161,10 @@ int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* } else { ASSERT(0); } - } else { + } else { // not ready, it should wait for at least 100ms and then retry streamRecheckOneDownstream(pTask, pRsp); } + return 0; } diff --git a/source/util/src/tworker.c b/source/util/src/tworker.c index a49ff0cd5b..6edee27c05 100644 --- a/source/util/src/tworker.c +++ b/source/util/src/tworker.c @@ -217,8 +217,8 @@ STaosQueue *tAutoQWorkerAllocQueue(SAutoQWorkerPool *pool, void *ahandle, FItem int32_t queueNum = taosGetQueueNumber(pool->qset); int32_t curWorkerNum = taosArrayGetSize(pool->workers); - int32_t dstWorkerNum = ceil(queueNum * pool->ratio); - if (dstWorkerNum < 2) dstWorkerNum = 2; + int32_t dstWorkerNum = ceilf(queueNum * pool->ratio); + if (dstWorkerNum < 1) dstWorkerNum = 1; // spawn a thread to process queue while (curWorkerNum < dstWorkerNum) { diff --git a/tests/system-test/1-insert/delete_childtable.py b/tests/system-test/1-insert/delete_childtable.py index e3144edb45..a12f884981 100644 --- a/tests/system-test/1-insert/delete_childtable.py +++ b/tests/system-test/1-insert/delete_childtable.py @@ -27,7 +27,7 @@ class TDTestCase: def init(self, conn, logSql, replicaVar=1): self.replicaVar = int(replicaVar) tdLog.debug("start to execute %s" % __file__) - tdSql.init(conn.cursor()) + tdSql.init(conn.cursor(), True) self.dbname = 'db_test' self.setsql = TDSetSql() self.stbname = 'stb'