From 691c1dfa7dbd175a0a03659517d22de1dd8587b1 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 16 Jul 2024 14:45:09 +0800 Subject: [PATCH 1/3] fix(stream): check dst stable schema before generating auto create table. --- include/libs/stream/tstream.h | 2 + source/dnode/vnode/src/tq/tqSink.c | 78 ++++++++++++++++++++--------- source/libs/stream/src/streamTask.c | 1 + 3 files changed, 57 insertions(+), 24 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 5ba0ce454c..fd9b5667d7 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -237,6 +237,8 @@ typedef struct { int64_t stbUid; char stbFullName[TSDB_TABLE_FNAME_LEN]; SSchemaWrapper* pSchemaWrapper; + SSchemaWrapper* pTagSchema; + bool autoCreateCtb; void* vnode; // not available to encoder and decoder FTbSink* tbSinkFunc; STSchema* pTSchema; diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 9b1e8075da..6494eb1bd1 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -725,24 +725,33 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat if (metaGetTableEntryByName(&mr, dstTableName) < 0) { metaReaderClear(&mr); - tqDebug("s-task:%s stream write into table:%s, table auto created", id, dstTableName); + if (pTask->outputInfo.tbSink.autoCreateCtb) { + tqDebug("s-task:%s stream write into table:%s, table auto created", id, dstTableName); - SArray* pTagArray = taosArrayInit(pTSchema->numOfCols + 1, sizeof(STagVal)); + SArray* pTagArray = taosArrayInit(pTSchema->numOfCols + 1, sizeof(STagVal)); - pTableData->flags = SUBMIT_REQ_AUTO_CREATE_TABLE; - pTableData->pCreateTbReq = - buildAutoCreateTableReq(stbFullName, suid, pTSchema->numOfCols + 1, pDataBlock, pTagArray, - pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER && pTask->subtableWithoutMd5 != 1); - taosArrayDestroy(pTagArray); + pTableData->flags = SUBMIT_REQ_AUTO_CREATE_TABLE; + pTableData->pCreateTbReq = + buildAutoCreateTableReq(stbFullName, suid, pTSchema->numOfCols + 1, pDataBlock, pTagArray, + pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER && pTask->subtableWithoutMd5 != 1); + taosArrayDestroy(pTagArray); - if (pTableData->pCreateTbReq == NULL) { - tqError("s-task:%s failed to build auto create table req, code:%s", id, tstrerror(terrno)); - taosMemoryFree(pTableSinkInfo); - return terrno; + if (pTableData->pCreateTbReq == NULL) { + tqError("s-task:%s failed to build auto create dst-table req:%s, code:%s", id, dstTableName, + tstrerror(terrno)); + taosMemoryFree(pTableSinkInfo); + return terrno; + } + + pTableSinkInfo->uid = 0; + doPutIntoCache(pTask->outputInfo.tbSink.pTblInfo, pTableSinkInfo, groupId, id); + } else { + metaReaderClear(&mr); + + tqError("s-task:%s vgId:%d dst-table:%s not auto-created, and not create in tsdb, discard data", id, + vgId, dstTableName); + return TSDB_CODE_TDB_TABLE_NOT_EXIST; } - - pTableSinkInfo->uid = 0; - doPutIntoCache(pTask->outputInfo.tbSink.pTblInfo, pTableSinkInfo, groupId, id); } else { bool isValid = isValidDstChildTable(&mr, vgId, dstTableName, suid); if (!isValid) { @@ -788,16 +797,34 @@ int32_t tqSetDstTableDataPayload(uint64_t suid, const STSchema *pTSchema, int32_ } void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) { - const SArray* pBlocks = (const SArray*)data; - SVnode* pVnode = (SVnode*)vnode; - int64_t suid = pTask->outputInfo.tbSink.stbUid; - char* stbFullName = pTask->outputInfo.tbSink.stbFullName; - STSchema* pTSchema = pTask->outputInfo.tbSink.pTSchema; - int32_t vgId = TD_VID(pVnode); - int32_t numOfBlocks = taosArrayGetSize(pBlocks); - int32_t code = TSDB_CODE_SUCCESS; - const char* id = pTask->id.idStr; - int64_t earlyTs = tsdbGetEarliestTs(pVnode->pTsdb); + const SArray* pBlocks = (const SArray*)data; + SVnode* pVnode = (SVnode*)vnode; + int64_t suid = pTask->outputInfo.tbSink.stbUid; + char* stbFullName = pTask->outputInfo.tbSink.stbFullName; + STSchema* pTSchema = pTask->outputInfo.tbSink.pTSchema; + int32_t vgId = TD_VID(pVnode); + int32_t numOfBlocks = taosArrayGetSize(pBlocks); + int32_t code = TSDB_CODE_SUCCESS; + const char* id = pTask->id.idStr; + int64_t earlyTs = tsdbGetEarliestTs(pVnode->pTsdb); + STaskOutputInfo* pOutputInfo = &pTask->outputInfo; + + if (pTask->outputInfo.tbSink.pTagSchema == NULL) { + SMetaReader mer1 = {0}; + metaReaderDoInit(&mer1, pVnode->pMeta, META_READER_LOCK); + + code = metaReaderGetTableEntryByUid(&mer1, pOutputInfo->tbSink.stbUid); + pOutputInfo->tbSink.pTagSchema = tCloneSSchemaWrapper(&mer1.me.stbEntry.schemaTag); + metaReaderClear(&mer1); + + SSchemaWrapper* pTagSchema = pOutputInfo->tbSink.pTagSchema; + SSchema* pCol1 = &pTagSchema->pSchema[0]; + if (pTagSchema->nCols == 1 && pCol1->type == TSDB_DATA_TYPE_UBIGINT && strcmp(pCol1->name, "group_id") == 0) { + pOutputInfo->tbSink.autoCreateCtb = true; + } else { + pOutputInfo->tbSink.autoCreateCtb = false; + } + } bool onlySubmitData = hasOnlySubmitData(pBlocks, numOfBlocks); if (!onlySubmitData) { @@ -829,6 +856,8 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) { SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .flags = TD_REQ_FROM_APP}; code = setDstTableDataUid(pVnode, pTask, pDataBlock, stbFullName, &tbData); if (code != TSDB_CODE_SUCCESS) { + tqError("vgId:%d s-task:%s dst-table not exist, stb:%" PRId64 " discard stream results", vgId, id, + stbFullName); continue; } @@ -882,6 +911,7 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) { if (index == NULL) { // no data yet, append it code = setDstTableDataUid(pVnode, pTask, pDataBlock, stbFullName, &tbData); if (code != TSDB_CODE_SUCCESS) { + tqError("vgId:%d dst-table gid:%" PRId64 " not exist, discard stream results", vgId, groupId); continue; } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 4cbe0cb136..a40359150d 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -263,6 +263,7 @@ void tFreeStreamTask(SStreamTask* pTask) { tDeleteSchemaWrapper(pTask->outputInfo.tbSink.pSchemaWrapper); taosMemoryFree(pTask->outputInfo.tbSink.pTSchema); tSimpleHashCleanup(pTask->outputInfo.tbSink.pTblInfo); + tDeleteSchemaWrapper(pTask->outputInfo.tbSink.pTagSchema); } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { taosArrayDestroy(pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos); } From 38ee5dc4723c3597476d2a176b849f2a07e2cd91 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 16 Jul 2024 16:00:59 +0800 Subject: [PATCH 2/3] fix(stream):fix syntax error. --- source/dnode/vnode/src/tq/tqSink.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 6494eb1bd1..3efc653f64 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -856,8 +856,7 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) { SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .flags = TD_REQ_FROM_APP}; code = setDstTableDataUid(pVnode, pTask, pDataBlock, stbFullName, &tbData); if (code != TSDB_CODE_SUCCESS) { - tqError("vgId:%d s-task:%s dst-table not exist, stb:%" PRId64 " discard stream results", vgId, id, - stbFullName); + tqError("vgId:%d s-task:%s dst-table not exist, stb:%s discard stream results", vgId, id, stbFullName); continue; } From 5057749790944407a2ef3f68d7ef5e7ca815df48 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 17 Jul 2024 10:18:58 +0800 Subject: [PATCH 3/3] fix(stream): fix race condition for dispatch msg. --- source/libs/stream/src/streamDispatch.c | 56 +++++++++++++++---------- 1 file changed, 34 insertions(+), 22 deletions(-) diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 617adaa016..1959180a3e 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -1162,10 +1162,10 @@ void streamClearChkptReadyMsg(SActiveCheckpointInfo* pActiveInfo) { static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId, int32_t downstreamNodeId) { stDebug("s-task:%s destroy dispatch msg:%p", pTask->id.idStr, pTask->msgInfo.pData); - bool delayDispatch = (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__CHECKPOINT_TRIGGER); - clearBufferedDispatchMsg(pTask); - int64_t el = taosGetTimestampMs() - pTask->msgInfo.startTs; + bool delayDispatch = (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__CHECKPOINT_TRIGGER); + + clearBufferedDispatchMsg(pTask); // put data into inputQ of current task is also allowed if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) { @@ -1189,13 +1189,24 @@ static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId return 0; } -static int32_t setDispatchRspInfo(SDispatchMsgInfo* pMsgInfo, int32_t vgId, int32_t code, int64_t now, const char* id) { +static bool setDispatchRspInfo(SDispatchMsgInfo* pMsgInfo, int32_t vgId, int32_t code, int64_t now, int32_t* pNotRsp, const char* id) { int32_t numOfRsp = 0; bool alreadySet = false; bool updated = false; + bool allRsp = false; + *pNotRsp = 0; taosThreadMutexLock(&pMsgInfo->lock); - for (int32_t j = 0; j < taosArrayGetSize(pMsgInfo->pSendInfo); ++j) { + int32_t numOfDispatchBranch = taosArrayGetSize(pMsgInfo->pSendInfo); + + for(int32_t i = 0; i < numOfDispatchBranch; ++i) { + SDispatchEntry* pEntry = taosArrayGet(pMsgInfo->pSendInfo, i); + if (pEntry->rspTs != -1) { + numOfRsp += 1; + } + } + + for (int32_t j = 0; j < numOfDispatchBranch; ++j) { SDispatchEntry* pEntry = taosArrayGet(pMsgInfo->pSendInfo, j); if (pEntry->nodeId == vgId) { ASSERT(!alreadySet); @@ -1203,18 +1214,20 @@ static int32_t setDispatchRspInfo(SDispatchMsgInfo* pMsgInfo, int32_t vgId, int3 pEntry->status = code; alreadySet = true; updated = true; - stDebug("s-task:%s record the rsp recv, ts:%" PRId64 " code:%d, idx:%d", id, now, code, j); - } - - if (pEntry->rspTs != -1) { numOfRsp += 1; + + stDebug("s-task:%s record the rsp recv, ts:%" PRId64 " code:%d, idx:%d, total recv:%d/%d", id, now, code, j, + numOfRsp, numOfDispatchBranch); } } - taosThreadMutexUnlock(&pMsgInfo->lock); - ASSERT(updated); + *pNotRsp = numOfDispatchBranch - numOfRsp; + allRsp = (numOfRsp == numOfDispatchBranch); - return numOfRsp; + taosThreadMutexUnlock(&pMsgInfo->lock); + + ASSERT(updated); + return allRsp; } bool isDispatchRspTimeout(SDispatchEntry* pEntry, int64_t now) { @@ -1240,7 +1253,8 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i int32_t vgId = pTask->pMeta->vgId; SDispatchMsgInfo* pMsgInfo = &pTask->msgInfo; int64_t now = taosGetTimestampMs(); - int32_t totalRsp = 0; + bool allRsp = false; + int32_t notRsp = 0; taosThreadMutexLock(&pMsgInfo->lock); int32_t msgId = pMsgInfo->msgId; @@ -1269,18 +1283,18 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i if (code == TSDB_CODE_STREAM_TASK_NOT_EXIST) { // destination task does not exist, not retry anymore stError("s-task:%s failed to dispatch msg to task:0x%x(vgId:%d), msgId:%d no retry, since task destroyed already", id, pRsp->downstreamTaskId, pRsp->downstreamNodeId, msgId); - totalRsp = setDispatchRspInfo(pMsgInfo, pRsp->downstreamNodeId, TSDB_CODE_SUCCESS, now, id); + allRsp = setDispatchRspInfo(pMsgInfo, pRsp->downstreamNodeId, TSDB_CODE_SUCCESS, now, ¬Rsp, id); } else { stError("s-task:%s failed to dispatch msgId:%d to task:0x%x(vgId:%d), code:%s, add to retry list", id, msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, tstrerror(code)); - totalRsp = setDispatchRspInfo(pMsgInfo, pRsp->downstreamNodeId, code, now, id); + allRsp = setDispatchRspInfo(pMsgInfo, pRsp->downstreamNodeId, code, now, ¬Rsp, id); } } else { // code == 0 if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) { pTask->inputq.status = TASK_INPUT_STATUS__BLOCKED; // block the input of current task, to push pressure to upstream - totalRsp = setDispatchRspInfo(pMsgInfo, pRsp->downstreamNodeId, pRsp->inputStatus, now, id); + allRsp = setDispatchRspInfo(pMsgInfo, pRsp->downstreamNodeId, pRsp->inputStatus, now, ¬Rsp, id); stTrace("s-task:%s inputQ of downstream task:0x%x(vgId:%d) is full, wait for retry dispatch", id, pRsp->downstreamTaskId, pRsp->downstreamNodeId); } else { @@ -1292,7 +1306,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i id, pRsp->downstreamTaskId, pRsp->downstreamNodeId); } - totalRsp = setDispatchRspInfo(pMsgInfo, pRsp->downstreamNodeId, TSDB_CODE_SUCCESS, now, id); + allRsp = setDispatchRspInfo(pMsgInfo, pRsp->downstreamNodeId, TSDB_CODE_SUCCESS, now, ¬Rsp, id); { bool delayDispatch = (pMsgInfo->dispatchMsgType == STREAM_INPUT__CHECKPOINT_TRIGGER); @@ -1317,13 +1331,11 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i } } - int32_t notRsp = taosArrayGetSize(pMsgInfo->pSendInfo) - totalRsp; if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { - if (notRsp > 0) { + if (!allRsp) { stDebug( "s-task:%s recv dispatch rsp, msgId:%d from 0x%x(vgId:%d), downstream task input status:%d code:%s, " - "waiting " - "for %d rsp", + "waiting for %d rsp", id, msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, tstrerror(code), notRsp); } else { stDebug( @@ -1337,7 +1349,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i } // all msg rsp already, continue - if (notRsp == 0) { + if (allRsp) { ASSERT(pTask->outputq.status == TASK_OUTPUT_STATUS__WAIT); // we need to re-try send dispatch msg to downstream tasks