diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 4a3f0d3a70..613e33fec7 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -788,7 +788,7 @@ int32_t* taosGetErrno(); // stream #define TSDB_CODE_STREAM_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x4100) -#define TSDB_CODE_STREAM_BACKPRESSURE_OUT_OF_QUEUE TAOS_DEF_ERROR_CODE(0, 0x4101) +#define TSDB_CODE_STREAM_EXEC_CANCELLED TAOS_DEF_ERROR_CODE(0, 0x4102) // TDLite #define TSDB_CODE_TDLITE_IVLD_OPEN_FLAGS TAOS_DEF_ERROR_CODE(0, 0x5100) diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 14ca7f3b02..c6a424666c 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -155,7 +155,7 @@ int32_t tqOffsetCommitFile(STqOffsetStore* pStore); // tqSink int32_t tqBuildDeleteReq(const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq, const char* pIdStr); -void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, void* data); +void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data); // tqOffset char* tqOffsetBuildFName(const char* path, int32_t fVer); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 3d1827e8bc..e2888e8e9a 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -798,7 +798,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { pTask->smaSink.smaSink = smaHandleRes; } else if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) { pTask->tbSink.vnode = pTq->pVnode; - pTask->tbSink.tbSinkFunc = tqSinkToTablePipeline; + pTask->tbSink.tbSinkFunc = tqSinkDataIntoDstTable; int32_t ver1 = 1; SMetaInfo info = {0}; diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index a5a685580d..7f52750348 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -29,8 +29,18 @@ static int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBloc static int32_t setDstTableDataPayload(SStreamTask* pTask, int32_t blockIndex, SSDataBlock* pDataBlock, SSubmitTbData* pTableData); static int32_t doBuildAndSendDeleteMsg(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock, SStreamTask* pTask, - int64_t suid); + int64_t suid); static int32_t tqBuildSubmitReq(SSubmitReq2* pSubmitReq, int32_t vgId, void** pMsg, int32_t* msgLen); +static int32_t tsAscendingSortFn(const void* p1, const void* p2); +static int32_t doConvertRows(SSubmitTbData* pTableData, STSchema* pTSchema, SSDataBlock* pDataBlock, const char* id); +static int32_t doWaitForDstTableCreated(SVnode* pVnode, SStreamTask* pTask, STableSinkInfo* pTableSinkInfo, + const char* dstTableName, int64_t* uid); +static int32_t doPutIntoCache(SSHashObj* pSinkTableMap, STableSinkInfo* pTableSinkInfo, uint64_t groupId, const char* id); +static SVCreateTbReq* buildAutoCreateTableReq(char* stbFullName, int64_t suid, int32_t numOfCols, + SSDataBlock* pDataBlock); +static bool isValidDstChildTable(SMetaReader* pReader, int32_t vgId, const char* ctbName, int64_t suid); +static int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData* pNew, const char* id); +static int32_t doBuildAndSendSubmitMsg(SVnode* pVnode, SStreamTask* pTask, SSubmitReq2* pReq, int32_t numOfBlocks); int32_t tqBuildDeleteReq(const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq, const char* pIdStr) { @@ -115,14 +125,6 @@ static bool tqGetTableInfo(SSHashObj* pTableInfoMap,uint64_t groupId, STableSink return false; } -static int32_t tqPutTableInfo(SSHashObj* tblInfo ,uint64_t groupId, STableSinkInfo* pTbl) { - if (tSimpleHashGetSize(tblInfo) > MAX_CACHE_TABLE_INFO_NUM) { - return TSDB_CODE_FAILED; - } - - return tSimpleHashPut(tblInfo, &groupId, sizeof(uint64_t), &pTbl, POINTER_BYTES); -} - static int32_t tqPutReqToQueue(SVnode* pVnode, SVCreateTbBatchReq* pReqs) { void* buf = NULL; int32_t tlen = 0; @@ -246,7 +248,7 @@ static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, S return code; } -static int32_t doBuildAndSendSubmitMsg(SVnode* pVnode, SStreamTask* pTask, SSubmitReq2* pReq, int32_t numOfBlocks) { +int32_t doBuildAndSendSubmitMsg(SVnode* pVnode, SStreamTask* pTask, SSubmitReq2* pReq, int32_t numOfBlocks) { const char* id = pTask->id.idStr; int32_t vgId = TD_VID(pVnode); int32_t len = 0; @@ -283,7 +285,7 @@ static int32_t doBuildAndSendSubmitMsg(SVnode* pVnode, SStreamTask* pTask, SSubm // merge the new submit table block with the existed blocks // if ts in the new data block overlap with existed one, replace it -static int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData* pNew, const char* id) { +int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData* pNew, const char* id) { int32_t oldLen = taosArrayGetSize(pExisted->aRowP); int32_t newLen = taosArrayGetSize(pNew->aRowP); @@ -330,135 +332,6 @@ static int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData* return TSDB_CODE_SUCCESS; } -void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, void* data) { - const SArray* pBlocks = (const SArray*)data; - SVnode* pVnode = (SVnode*)vnode; - int64_t suid = pTask->tbSink.stbUid; - char* stbFullName = pTask->tbSink.stbFullName; - STSchema* pTSchema = pTask->tbSink.pTSchema; - int32_t vgId = TD_VID(pVnode); - int32_t numOfBlocks = taosArrayGetSize(pBlocks); - int32_t code = TSDB_CODE_SUCCESS; - const char* id = pTask->id.idStr; - - if (pTask->tsInfo.sinkStart == 0) { - pTask->tsInfo.sinkStart = taosGetTimestampMs(); - } - - bool onlySubmitData = true; - for(int32_t i = 0; i < numOfBlocks; ++i) { - SSDataBlock* p = taosArrayGet(pBlocks, i); - if (p->info.type == STREAM_DELETE_RESULT || p->info.type == STREAM_CREATE_CHILD_TABLE) { - onlySubmitData = false; - break; - } - } - - if (!onlySubmitData) { - tqDebug("vgId:%d, s-task:%s write %d stream resBlock(s) into table, has delete block, submit one-by-one", vgId, id, - numOfBlocks); - - for(int32_t i = 0; i < numOfBlocks; ++i) { - SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i); - if (pDataBlock->info.type == STREAM_DELETE_RESULT) { - code = doBuildAndSendDeleteMsg(pVnode, stbFullName, pDataBlock, pTask, suid); - } else if (pDataBlock->info.type == STREAM_CREATE_CHILD_TABLE) { - code = doBuildAndSendCreateTableMsg(pVnode, stbFullName, pDataBlock, pTask, suid); - } else if (pDataBlock->info.type == STREAM_CHECKPOINT) { - continue; - } else { - pTask->sinkRecorder.numOfBlocks += 1; - - SSubmitReq2 submitReq = {.aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData))}; - if (submitReq.aSubmitTbData == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - tqError("s-task:%s vgId:%d failed to prepare submit msg in sink task, code:%s", id, vgId, tstrerror(code)); - return; - } - - SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version}; - code = setDstTableDataUid(pVnode, pTask, pDataBlock, stbFullName, &tbData); - if (code != TSDB_CODE_SUCCESS) { - continue; - } - - code = setDstTableDataPayload(pTask, i, pDataBlock, &tbData); - if (code != TSDB_CODE_SUCCESS) { - continue; - } - - taosArrayPush(submitReq.aSubmitTbData, &tbData); - code = doBuildAndSendSubmitMsg(pVnode, pTask, &submitReq, 1); - } - } - } else { - tqDebug("vgId:%d, s-task:%s write %d stream resBlock(s) into table, merge submit msg", vgId, id, numOfBlocks); - SHashObj* pTableIndexMap = taosHashInit(numOfBlocks, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); - - SSubmitReq2 submitReq = {.aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData))}; - if (submitReq.aSubmitTbData == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - tqError("s-task:%s vgId:%d failed to prepare submit msg in sink task, code:%s", id, vgId, tstrerror(code)); - taosHashCleanup(pTableIndexMap); - return; - } - - bool hasSubmit = false; - for (int32_t i = 0; i < numOfBlocks; i++) { - SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i); - if (pDataBlock->info.type == STREAM_CHECKPOINT) { - continue; - } - - hasSubmit = true; - pTask->sinkRecorder.numOfBlocks += 1; - uint64_t groupId = pDataBlock->info.id.groupId; - - SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version}; - - int32_t* index = taosHashGet(pTableIndexMap, &groupId, sizeof(groupId)); - if (index == NULL) { // no data yet, append it - code = setDstTableDataUid(pVnode, pTask, pDataBlock, stbFullName, &tbData); - if (code != TSDB_CODE_SUCCESS) { - continue; - } - - code = setDstTableDataPayload(pTask, i, pDataBlock, &tbData); - if (code != TSDB_CODE_SUCCESS) { - continue; - } - - taosArrayPush(submitReq.aSubmitTbData, &tbData); - - int32_t size = (int32_t)taosArrayGetSize(submitReq.aSubmitTbData) - 1; - taosHashPut(pTableIndexMap, &groupId, sizeof(groupId), &size, sizeof(size)); - } else { - code = setDstTableDataPayload(pTask, i, pDataBlock, &tbData); - if (code != TSDB_CODE_SUCCESS) { - continue; - } - - SSubmitTbData* pExisted = taosArrayGet(submitReq.aSubmitTbData, *index); - code = doMergeExistedRows(pExisted, &tbData, id); - if (code != TSDB_CODE_SUCCESS) { - continue; - } - } - - pTask->sinkRecorder.numOfRows += pDataBlock->info.rows; - } - - taosHashCleanup(pTableIndexMap); - - if (hasSubmit) { - doBuildAndSendSubmitMsg(pVnode, pTask, &submitReq, numOfBlocks); - } else { - tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE); - tqDebug("vgId:%d, s-task:%s write results completed", vgId, id); - } - } -} - int32_t doBuildAndSendDeleteMsg(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock, SStreamTask* pTask, int64_t suid) { SBatchDeleteReq deleteReq = {.suid = suid, .deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq))}; @@ -498,22 +371,25 @@ int32_t doBuildAndSendDeleteMsg(SVnode* pVnode, char* stbFullName, SSDataBlock* return TSDB_CODE_SUCCESS; } -static bool isValidDestChildTable(SMetaReader* pReader, int32_t vgId, char* ctbName, int64_t suid) { +bool isValidDstChildTable(SMetaReader* pReader, int32_t vgId, const char* ctbName, int64_t suid) { if (pReader->me.type != TSDB_CHILD_TABLE) { tqError("vgId:%d, failed to write into %s, since table type:%d incorrect", vgId, ctbName, pReader->me.type); + terrno = TSDB_CODE_TDB_INVALID_TABLE_TYPE; return false; } if (pReader->me.ctbEntry.suid != suid) { tqError("vgId:%d, failed to write into %s, since suid mismatch, expect suid:%" PRId64 ", actual:%" PRId64, vgId, ctbName, suid, pReader->me.ctbEntry.suid); + terrno = TSDB_CODE_TDB_TABLE_IN_OTHER_STABLE; return false; } + terrno = 0; return true; } -static SVCreateTbReq* buildAutoCreateTableReq(char* stbFullName, int64_t suid, int32_t numOfCols, SSDataBlock* pDataBlock) { +SVCreateTbReq* buildAutoCreateTableReq(char* stbFullName, int64_t suid, int32_t numOfCols, SSDataBlock* pDataBlock) { char* ctbName = pDataBlock->info.parTbName; SVCreateTbReq* pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateStbReq)); @@ -570,11 +446,12 @@ static SVCreateTbReq* buildAutoCreateTableReq(char* stbFullName, int64_t suid, i return pCreateTbReq; } -static int32_t doPutIntoCache(SSHashObj* pSinkTableMap, STableSinkInfo* pTableSinkInfo, uint64_t groupId, uint64_t uid, - const char* id) { - pTableSinkInfo->uid = uid; +int32_t doPutIntoCache(SSHashObj* pSinkTableMap, STableSinkInfo* pTableSinkInfo, uint64_t groupId, const char* id) { + if (tSimpleHashGetSize(pSinkTableMap) > MAX_CACHE_TABLE_INFO_NUM) { + return TSDB_CODE_FAILED; + } - int32_t code = tqPutTableInfo(pSinkTableMap, groupId, pTableSinkInfo); + int32_t code = tSimpleHashPut(pSinkTableMap, &groupId, sizeof(uint64_t), &pTableSinkInfo, POINTER_BYTES); if (code != TSDB_CODE_SUCCESS) { taosMemoryFreeClear(pTableSinkInfo); } else { @@ -625,7 +502,7 @@ int32_t tqBuildSubmitReq(SSubmitReq2* pSubmitReq, int32_t vgId, void** pMsg, int return TSDB_CODE_SUCCESS; } -static int32_t tsAscendingSortFn(const void* p1, const void* p2) { +int32_t tsAscendingSortFn(const void* p1, const void* p2) { SRow* pRow1 = *(SRow**) p1; SRow* pRow2 = *(SRow**) p2; @@ -636,7 +513,7 @@ static int32_t tsAscendingSortFn(const void* p1, const void* p2) { } } -static int32_t doConvertRows(SSubmitTbData* pTableData, STSchema* pTSchema, SSDataBlock* pDataBlock, const char* id) { +int32_t doConvertRows(SSubmitTbData* pTableData, STSchema* pTSchema, SSDataBlock* pDataBlock, const char* id) { int32_t numOfRows = pDataBlock->info.rows; int32_t code = TSDB_CODE_SUCCESS; @@ -707,6 +584,46 @@ static int32_t doConvertRows(SSubmitTbData* pTableData, STSchema* pTSchema, SSDa return TSDB_CODE_SUCCESS; } +int32_t doWaitForDstTableCreated(SVnode* pVnode, SStreamTask* pTask, STableSinkInfo* pTableSinkInfo, + const char* dstTableName, int64_t* uid) { + int32_t vgId = TD_VID(pVnode); + int64_t suid = pTask->tbSink.stbUid; + const char* id = pTask->id.idStr; + + while (pTableSinkInfo->uid == 0) { + if (streamTaskShouldStop(&pTask->status)) { + tqDebug("s-task:%s task will stop, quit from waiting for table:%s create", id, dstTableName); + return TSDB_CODE_STREAM_EXEC_CANCELLED; + } + + // wait for the table to be created + SMetaReader mr = {0}; + metaReaderDoInit(&mr, pVnode->pMeta, 0); + + int32_t code = metaGetTableEntryByName(&mr, dstTableName); + if (code == 0) { // table already exists, check its type and uid + bool isValid = isValidDstChildTable(&mr, vgId, dstTableName, suid); + if (isValid) { // not valid table, ignore it + tqDebug("s-task:%s set uid:%" PRIu64 " for dstTable:%s from meta", id, mr.me.uid, pTableSinkInfo->name.data); + ASSERT(terrno == 0); + + // set the destination table uid + (*uid) = mr.me.uid; + pTableSinkInfo->uid = mr.me.uid; + } + + metaReaderClear(&mr); + return terrno; + } else { // not exist, wait and retry + metaReaderClear(&mr); + taosMsleep(100); + tqDebug("s-task:%s wait 100ms for the table:%s ready before insert data", id, dstTableName); + } + } + + return TSDB_CODE_SUCCESS; +} + int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDataBlock, char* stbFullName, SSubmitTbData* pTableData) { uint64_t groupId = pDataBlock->info.id.groupId; @@ -719,6 +636,7 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat STableSinkInfo* pTableSinkInfo = NULL; bool alreadyCached = tqGetTableInfo(pTask->tbSink.pTblInfo, groupId, &pTableSinkInfo); + if (alreadyCached) { if (dstTableName[0] == 0) { // data block does not set the destination table name tstrncpy(dstTableName, pTableSinkInfo->name.data, pTableSinkInfo->name.len + 1); @@ -741,6 +659,9 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat int32_t nameLen = strlen(dstTableName); pTableSinkInfo = taosMemoryCalloc(1, sizeof(STableSinkInfo) + nameLen); + if (pTableSinkInfo == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } pTableSinkInfo->name.len = nameLen; memcpy(pTableSinkInfo->name.data, dstTableName, nameLen); @@ -752,45 +673,16 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat if (pTableData->uid == 0) { tqDebug("s-task:%s cached tableInfo uid is invalid, acquire it from meta", id); + return doWaitForDstTableCreated(pVnode, pTask, pTableSinkInfo, dstTableName, &pTableData->uid); + } else { + tqDebug("s-task:%s set the dstTable uid from cache:%"PRId64, id, pTableData->uid); } - - while (pTableSinkInfo->uid == 0) { - if (streamTaskShouldStop(&pTask->status)) { - tqDebug("s-task:%s task will stop, quit from waiting for table:%s create", id, dstTableName); - return TSDB_CODE_SUCCESS; - } - - // wait for the table to be created - SMetaReader mr = {0}; - metaReaderDoInit(&mr, pVnode->pMeta, 0); - - int32_t code = metaGetTableEntryByName(&mr, dstTableName); - if (code == 0) { // table already exists, check its type and uid - bool isValid = isValidDestChildTable(&mr, vgId, dstTableName, suid); - if (!isValid) { // not valid table, ignore it - metaReaderClear(&mr); - return TSDB_CODE_SUCCESS; - } else { - tqDebug("s-task:%s set uid:%" PRIu64 " for dstTable:%s from meta", id, mr.me.uid, pTableSinkInfo->name.data); - - pTableData->uid = mr.me.uid; - pTableSinkInfo->uid = mr.me.uid; - metaReaderClear(&mr); - } - } else { // not exist, wait and retry - metaReaderClear(&mr); - taosMsleep(100); - tqDebug("s-task:%s wait 100ms for the table:%s ready before insert data", id, dstTableName); - } - } - } else { - // todo: this check is not safe, and results in losing of submit message from WAL. // The auto-create option will always set to be open for those submit messages, which arrive during the period // the creating of the destination table, due to the absence of the user-specified table in TSDB. When scanning // data from WAL, those submit messages, with auto-created table option, will be discarded expect the first, for // those mismatched table uids. Only the FIRST table has the correct table uid, and those remain all have - // randomly generated false table uid in the WAL. + // randomly generated, but false table uid in the WAL. SMetaReader mr = {0}; metaReaderDoInit(&mr, pVnode->pMeta, 0); @@ -808,19 +700,22 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat return terrno; } - doPutIntoCache(pTask->tbSink.pTblInfo, pTableSinkInfo, groupId, 0, id); + pTableSinkInfo->uid = 0; + doPutIntoCache(pTask->tbSink.pTblInfo, pTableSinkInfo, groupId, id); } else { - bool isValid = isValidDestChildTable(&mr, vgId, dstTableName, suid); + bool isValid = isValidDstChildTable(&mr, vgId, dstTableName, suid); if (!isValid) { metaReaderClear(&mr); taosMemoryFree(pTableSinkInfo); tqError("s-task:%s vgId:%d table:%s already exists, but not child table, stream results is discarded", id, vgId, dstTableName); - return TSDB_CODE_PAR_DUPLICATED_COLUMN; + return terrno; } else { pTableData->uid = mr.me.uid; + pTableSinkInfo->uid = mr.me.uid; + metaReaderClear(&mr); - doPutIntoCache(pTask->tbSink.pTblInfo, pTableSinkInfo, groupId, pTableData->uid, id); + doPutIntoCache(pTask->tbSink.pTblInfo, pTableSinkInfo, groupId, id); } } } @@ -848,3 +743,140 @@ int32_t setDstTableDataPayload(SStreamTask* pTask, int32_t blockIndex, SSDataBlo tqDebug("s-task:%s build submit msg for dstTable:%s, numOfRows:%d", id, dstTableName, numOfRows); return code; } + +void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) { + const SArray* pBlocks = (const SArray*)data; + SVnode* pVnode = (SVnode*)vnode; + int64_t suid = pTask->tbSink.stbUid; + char* stbFullName = pTask->tbSink.stbFullName; + STSchema* pTSchema = pTask->tbSink.pTSchema; + int32_t vgId = TD_VID(pVnode); + int32_t numOfBlocks = taosArrayGetSize(pBlocks); + int32_t code = TSDB_CODE_SUCCESS; + const char* id = pTask->id.idStr; + + if (pTask->tsInfo.sinkStart == 0) { + pTask->tsInfo.sinkStart = taosGetTimestampMs(); + } + + bool onlySubmitData = true; + for(int32_t i = 0; i < numOfBlocks; ++i) { + SSDataBlock* p = taosArrayGet(pBlocks, i); + if (p->info.type == STREAM_DELETE_RESULT || p->info.type == STREAM_CREATE_CHILD_TABLE) { + onlySubmitData = false; + break; + } + } + + if (!onlySubmitData) { + tqDebug("vgId:%d, s-task:%s write %d stream resBlock(s) into table, has delete block, submit one-by-one", vgId, id, + numOfBlocks); + + for(int32_t i = 0; i < numOfBlocks; ++i) { + if (streamTaskShouldStop(&pTask->status)) { + return; + } + + SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i); + if (pDataBlock->info.type == STREAM_DELETE_RESULT) { + code = doBuildAndSendDeleteMsg(pVnode, stbFullName, pDataBlock, pTask, suid); + } else if (pDataBlock->info.type == STREAM_CREATE_CHILD_TABLE) { + code = doBuildAndSendCreateTableMsg(pVnode, stbFullName, pDataBlock, pTask, suid); + } else if (pDataBlock->info.type == STREAM_CHECKPOINT) { + continue; + } else { + pTask->sinkRecorder.numOfBlocks += 1; + + SSubmitReq2 submitReq = {.aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData))}; + if (submitReq.aSubmitTbData == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + tqError("s-task:%s vgId:%d failed to prepare submit msg in sink task, code:%s", id, vgId, tstrerror(code)); + return; + } + + SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version}; + code = setDstTableDataUid(pVnode, pTask, pDataBlock, stbFullName, &tbData); + if (code != TSDB_CODE_SUCCESS) { + continue; + } + + code = setDstTableDataPayload(pTask, i, pDataBlock, &tbData); + if (code != TSDB_CODE_SUCCESS) { + continue; + } + + taosArrayPush(submitReq.aSubmitTbData, &tbData); + code = doBuildAndSendSubmitMsg(pVnode, pTask, &submitReq, 1); + } + } + } else { + tqDebug("vgId:%d, s-task:%s write %d stream resBlock(s) into table, merge submit msg", vgId, id, numOfBlocks); + SHashObj* pTableIndexMap = taosHashInit(numOfBlocks, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); + + SSubmitReq2 submitReq = {.aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData))}; + if (submitReq.aSubmitTbData == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + tqError("s-task:%s vgId:%d failed to prepare submit msg in sink task, code:%s", id, vgId, tstrerror(code)); + taosHashCleanup(pTableIndexMap); + return; + } + + bool hasSubmit = false; + for (int32_t i = 0; i < numOfBlocks; i++) { + if (streamTaskShouldStop(&pTask->status)) { + return; + } + + SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i); + if (pDataBlock->info.type == STREAM_CHECKPOINT) { + continue; + } + + hasSubmit = true; + pTask->sinkRecorder.numOfBlocks += 1; + uint64_t groupId = pDataBlock->info.id.groupId; + + SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version}; + + int32_t* index = taosHashGet(pTableIndexMap, &groupId, sizeof(groupId)); + if (index == NULL) { // no data yet, append it + code = setDstTableDataUid(pVnode, pTask, pDataBlock, stbFullName, &tbData); + if (code != TSDB_CODE_SUCCESS) { + continue; + } + + code = setDstTableDataPayload(pTask, i, pDataBlock, &tbData); + if (code != TSDB_CODE_SUCCESS) { + continue; + } + + taosArrayPush(submitReq.aSubmitTbData, &tbData); + + int32_t size = (int32_t)taosArrayGetSize(submitReq.aSubmitTbData) - 1; + taosHashPut(pTableIndexMap, &groupId, sizeof(groupId), &size, sizeof(size)); + } else { + code = setDstTableDataPayload(pTask, i, pDataBlock, &tbData); + if (code != TSDB_CODE_SUCCESS) { + continue; + } + + SSubmitTbData* pExisted = taosArrayGet(submitReq.aSubmitTbData, *index); + code = doMergeExistedRows(pExisted, &tbData, id); + if (code != TSDB_CODE_SUCCESS) { + continue; + } + } + + pTask->sinkRecorder.numOfRows += pDataBlock->info.rows; + } + + taosHashCleanup(pTableIndexMap); + + if (hasSubmit) { + doBuildAndSendSubmitMsg(pVnode, pTask, &submitReq, numOfBlocks); + } else { + tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE); + tqDebug("vgId:%d, s-task:%s write results completed", vgId, id); + } + } +} diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 278b23b5a9..461ccc5dce 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -651,8 +651,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_SAME_COMMITTED_VALUE, "Same committed valu // stream TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_NOT_EXIST, "Stream task not exist") -TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_BACKPRESSURE_OUT_OF_QUEUE,"Out of memory in stream queue") - +TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_EXEC_CANCELLED, "Stream task exec cancelled") // TDLite TAOS_DEFINE_ERROR(TSDB_CODE_TDLITE_IVLD_OPEN_FLAGS, "Invalid TDLite open flags") TAOS_DEFINE_ERROR(TSDB_CODE_TDLITE_IVLD_OPEN_DIR, "Invalid TDLite open directory")