diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 58c1707e1f..a189cee0bb 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -236,7 +236,7 @@ typedef struct { void* vnode; // not available to encoder and decoder FTbSink* tbSinkFunc; STSchema* pTSchema; - SSHashObj* pTblInfo; + SSHashObj* pTbInfo; } STaskSinkTb; typedef struct { diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index a37a9787c9..b75baea08d 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -746,13 +746,13 @@ int32_t tqBuildStreamTask(void* pTqObj, SStreamTask* pTask, int64_t nextProcessV return terrno; } - pOutputInfo->tbSink.pTblInfo = tSimpleHashInit(10240, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT)); - if (pOutputInfo->tbSink.pTblInfo == NULL) { + pOutputInfo->tbSink.pTbInfo = tSimpleHashInit(10240, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT)); + if (pOutputInfo->tbSink.pTbInfo == NULL) { tqError("vgId:%d failed init sink tableInfo, code:%s", vgId, tstrerror(terrno)); return terrno; } - tSimpleHashSetFreeFp(pOutputInfo->tbSink.pTblInfo, freePtr); + tSimpleHashSetFreeFp(pOutputInfo->tbSink.pTbInfo, freePtr); } if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 6daa9213aa..be41f7e99e 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -18,6 +18,8 @@ #include "tmsg.h" #include "tq.h" +#define IS_NEW_SUBTB_RULE(_t) (((_t)->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER) && ((_t)->subtableWithoutMd5 != 1)) + typedef struct STableSinkInfo { uint64_t uid; tstr name; @@ -35,16 +37,22 @@ static int32_t doConvertRows(SSubmitTbData* pTableData, const STSchema* pTSchema int64_t earlyTs, const char* id); static int32_t doWaitForDstTableCreated(SVnode* pVnode, SStreamTask* pTask, STableSinkInfo* pTableSinkInfo, const char* dstTableName, int64_t* uid); -static int32_t doPutIntoCache(SSHashObj* pSinkTableMap, STableSinkInfo* pTableSinkInfo, uint64_t groupId, - const char* id); -static int32_t doRemoveFromCache(SSHashObj* pSinkTableMap, uint64_t groupId, const char* id); + static bool isValidDstChildTable(SMetaReader* pReader, int32_t vgId, const char* ctbName, int64_t suid); static int32_t initCreateTableMsg(SVCreateTbReq* pCreateTableReq, uint64_t suid, const char* stbFullName, int32_t numOfTags); static int32_t createDefaultTagColName(SArray** pColNameList); -static int32_t setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDataBlock, const char* stbFullName, - int64_t gid, bool newSubTableRule); -static int32_t doCreateSinkInfo(const char* pDstTableName, STableSinkInfo** pInfo); +static int32_t setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDataBlock, + const char* stbFullName, int64_t gid, bool newSubTableRule); +static int32_t doCreateSinkTableInfo(const char* pDstTableName, STableSinkInfo** pInfo); +static int32_t doPutSinkTableInfoIntoCache(SSHashObj* pSinkTableMap, STableSinkInfo* pTableSinkInfo, uint64_t groupId, + const char* id); +static bool doGetSinkTableInfoFromCache(SSHashObj* pTableInfoMap, uint64_t groupId, STableSinkInfo** pInfo); +static int32_t doRemoveSinkTableInfoInCache(SSHashObj* pSinkTableMap, uint64_t groupId, const char* id); +static int32_t checkTagSchema(SStreamTask* pTask, SVnode* pVnode); +static void reubuildAndSendMultiResBlock(SStreamTask* pTask, const SArray* pBlocks, SVnode* pVnode, int64_t earlyTs); +static int32_t handleResultBlockMsg(SStreamTask* pTask, SSDataBlock* pDataBlock, int32_t index, SVnode* pVnode, + int64_t earlyTs); int32_t tqBuildDeleteReq(STQ* pTq, const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq, const char* pIdStr, bool newSubTableRule) { @@ -81,7 +89,8 @@ int32_t tqBuildDeleteReq(STQ* pTq, const char* stbFullName, const SSDataBlock* p memcpy(name, varDataVal(varTbName), varDataLen(varTbName)); name[varDataLen(varTbName)] = '\0'; - if (newSubTableRule && !isAutoTableName(name) && !alreadyAddGroupId(name, groupId) && groupId != 0 && stbFullName) { + if (newSubTableRule && !isAutoTableName(name) && !alreadyAddGroupId(name, groupId) && groupId != 0 && + stbFullName) { int32_t code = buildCtbNameAddGroupId(stbFullName, name, groupId, cap); if (code != TSDB_CODE_SUCCESS) { return code; @@ -161,16 +170,6 @@ end: return ret; } -static bool tqGetTableInfo(SSHashObj* pTableInfoMap, uint64_t groupId, STableSinkInfo** pInfo) { - void* pVal = tSimpleHashGet(pTableInfoMap, &groupId, sizeof(uint64_t)); - if (pVal) { - *pInfo = *(STableSinkInfo**)pVal; - return true; - } - - return false; -} - static int32_t tqPutReqToQueue(SVnode* pVnode, SVCreateTbBatchReq* pReqs) { void* buf = NULL; int32_t tlen = 0; @@ -201,7 +200,7 @@ int32_t initCreateTableMsg(SVCreateTbReq* pCreateTableReq, uint64_t suid, const int32_t code = tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); if (code == 0) { pCreateTableReq->ctb.stbName = taosStrdup((char*)tNameGetTableName(&name)); - if (pCreateTableReq->ctb.stbName == NULL) { // ignore this error code + if (pCreateTableReq->ctb.stbName == NULL) { // ignore this error code tqError("failed to duplicate the stb name:%s, failed to init create-table msg and create req table", stbFullName); code = terrno; } @@ -231,7 +230,7 @@ int32_t createDefaultTagColName(SArray** pColNameList) { } int32_t setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDataBlock, const char* stbFullName, - int64_t gid, bool newSubTableRule) { + int64_t gid, bool newSubTableRule) { if (pDataBlock->info.parTbName[0]) { if (newSubTableRule && !isAutoTableName(pDataBlock->info.parTbName) && !alreadyAddGroupId(pDataBlock->info.parTbName, gid) && gid != 0 && stbFullName) { @@ -245,18 +244,17 @@ int32_t setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* if (code != TSDB_CODE_SUCCESS) { return code; } -// tqDebug("gen name from:%s", pDataBlock->info.parTbName); + // tqDebug("gen name from:%s", pDataBlock->info.parTbName); } else { pCreateTableReq->name = taosStrdup(pDataBlock->info.parTbName); if (pCreateTableReq->name == NULL) { return terrno; } -// tqDebug("copy name:%s", pDataBlock->info.parTbName); + // tqDebug("copy name:%s", pDataBlock->info.parTbName); } } else { int32_t code = buildCtbNameByGroupId(stbFullName, gid, &pCreateTableReq->name); return code; -// tqDebug("gen name from stbFullName:%s gid:%"PRId64, stbFullName, gid); } return 0; @@ -264,16 +262,20 @@ int32_t setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock, SStreamTask* pTask, int64_t suid) { - STSchema* pTSchema = pTask->outputInfo.tbSink.pTSchema; - int32_t rows = pDataBlock->info.rows; - SArray* tagArray = taosArrayInit(4, sizeof(STagVal)); - const char* id = pTask->id.idStr; - int32_t vgId = pTask->pMeta->vgId; - int32_t code = 0; + STSchema* pTSchema = pTask->outputInfo.tbSink.pTSchema; + int32_t rows = pDataBlock->info.rows; + SArray* tagArray = NULL; + const char* id = pTask->id.idStr; + int32_t vgId = pTask->pMeta->vgId; + int32_t code = 0; + STableSinkInfo* pInfo = NULL; + SVCreateTbBatchReq reqs = {0}; + SArray* crTblArray = NULL; tqDebug("s-task:%s build create %d table(s) msg", id, rows); - SVCreateTbBatchReq reqs = {0}; - SArray* crTblArray = reqs.pArray = taosArrayInit(1, sizeof(SVCreateTbReq)); + + tagArray = taosArrayInit(4, sizeof(STagVal)); + crTblArray = reqs.pArray = taosArrayInit(1, sizeof(SVCreateTbReq)); if ((NULL == reqs.pArray) || (tagArray == NULL)) { tqError("s-task:%s failed to init create table msg, code:%s", id, tstrerror(terrno)); code = terrno; @@ -291,6 +293,7 @@ static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, S tqError("s-task:%s vgId:%d failed to init create table msg", id, vgId); continue; } + taosArrayClear(tagArray); if (size == 2) { @@ -356,8 +359,7 @@ static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, S } } - code = setCreateTableMsgTableName(pCreateTbReq, pDataBlock, stbFullName, gid, - pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER && pTask->subtableWithoutMd5 != 1); + code = setCreateTableMsgTableName(pCreateTbReq, pDataBlock, stbFullName, gid, IS_NEW_SUBTB_RULE(pTask)); if (code) { goto _end; } @@ -368,16 +370,15 @@ static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, S goto _end; } - STableSinkInfo* pInfo = NULL; - bool alreadyCached = tqGetTableInfo(pTask->outputInfo.tbSink.pTblInfo, gid, &pInfo); + bool alreadyCached = doGetSinkTableInfoFromCache(pTask->outputInfo.tbSink.pTbInfo, gid, &pInfo); if (!alreadyCached) { - code = doCreateSinkInfo(pCreateTbReq->name, &pInfo); + code = doCreateSinkTableInfo(pCreateTbReq->name, &pInfo); if (code) { tqError("vgId:%d failed to create sink tableInfo for table:%s, s-task:%s", vgId, pCreateTbReq->name, id); continue; } - code = doPutIntoCache(pTask->outputInfo.tbSink.pTblInfo, pInfo, gid, id); + code = doPutSinkTableInfoIntoCache(pTask->outputInfo.tbSink.pTbInfo, pInfo, gid, id); if (code) { tqError("vgId:%d failed to put sink tableInfo:%s into cache, s-task:%s", vgId, pCreateTbReq->name, id); } @@ -465,45 +466,45 @@ int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData* pNew, c k += 1; } else { - // check for the existance of primary key - if (pNewRow->numOfPKs == 0) { + // check for the existance of primary key + if (pNewRow->numOfPKs == 0) { + void* p = taosArrayPush(pFinal, &pNewRow); + if (p == NULL) { + return terrno; + } + + k += 1; + j += 1; + tRowDestroy(pOldRow); + } else { + numOfPk = pNewRow->numOfPKs; + + SRowKey kNew, kOld; + tRowGetKey(pNewRow, &kNew); + tRowGetKey(pOldRow, &kOld); + + int32_t ret = tRowKeyCompare(&kNew, &kOld); + if (ret <= 0) { void* p = taosArrayPush(pFinal, &pNewRow); if (p == NULL) { return terrno; } - k += 1; j += 1; - tRowDestroy(pOldRow); - } else { - numOfPk = pNewRow->numOfPKs; - - SRowKey kNew, kOld; - tRowGetKey(pNewRow, &kNew); - tRowGetKey(pOldRow, &kOld); - - int32_t ret = tRowKeyCompare(&kNew, &kOld); - if (ret <= 0) { - void* p = taosArrayPush(pFinal, &pNewRow); - if (p == NULL) { - return terrno; - } - - j += 1; - - if (ret == 0) { - k += 1; - tRowDestroy(pOldRow); - } - } else { - void* p = taosArrayPush(pFinal, &pOldRow); - if (p == NULL) { - return terrno; - } + if (ret == 0) { k += 1; + tRowDestroy(pOldRow); } + } else { + void* p = taosArrayPush(pFinal, &pOldRow); + if (p == NULL) { + return terrno; + } + + k += 1; } + } } } @@ -527,8 +528,8 @@ int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData* pNew, c taosArrayDestroy(pExisted->aRowP); pExisted->aRowP = pFinal; - tqTrace("s-task:%s rows merged, final rows:%d, pk:%d uid:%" PRId64 ", existed auto-create table:%d, new-block:%d", - id, (int32_t)taosArrayGetSize(pFinal), numOfPk, pExisted->uid, (pExisted->pCreateTbReq != NULL), + tqTrace("s-task:%s rows merged, final rows:%d, pk:%d uid:%" PRId64 ", existed auto-create table:%d, new-block:%d", id, + (int32_t)taosArrayGetSize(pFinal), numOfPk, pExisted->uid, (pExisted->pCreateTbReq != NULL), (pNew->pCreateTbReq != NULL)); tdDestroySVCreateTbReq(pNew->pCreateTbReq); @@ -727,7 +728,7 @@ int32_t doConvertRows(SSubmitTbData* pTableData, const STSchema* pTSchema, SSDat dataIndex++; } else { void* colData = colDataGetData(pColData, j); - if (IS_VAR_DATA_TYPE(pCol->type)) { // address copy, no value + if (IS_VAR_DATA_TYPE(pCol->type)) { // address copy, no value SValue sv = (SValue){.type = pCol->type, .nData = varDataLen(colData), .pData = (uint8_t*)varDataVal(colData)}; SColVal cv = COL_VAL_VALUE(pCol->colId, sv); @@ -806,7 +807,7 @@ int32_t doWaitForDstTableCreated(SVnode* pVnode, SStreamTask* pTask, STableSinkI return TSDB_CODE_SUCCESS; } -int32_t doCreateSinkInfo(const char* pDstTableName, STableSinkInfo** pInfo) { +int32_t doCreateSinkTableInfo(const char* pDstTableName, STableSinkInfo** pInfo) { int32_t nameLen = strlen(pDstTableName); (*pInfo) = taosMemoryCalloc(1, sizeof(STableSinkInfo) + nameLen + 1); if (*pInfo == NULL) { @@ -830,7 +831,7 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat STableSinkInfo* pTableSinkInfo = NULL; int32_t code = 0; - bool alreadyCached = tqGetTableInfo(pTask->outputInfo.tbSink.pTblInfo, groupId, &pTableSinkInfo); + bool alreadyCached = doGetSinkTableInfoFromCache(pTask->outputInfo.tbSink.pTbInfo, groupId, &pTableSinkInfo); if (alreadyCached) { if (dstTableName[0] == 0) { // data block does not set the destination table name @@ -870,7 +871,7 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat } } - code = doCreateSinkInfo(dstTableName, &pTableSinkInfo); + code = doCreateSinkTableInfo(dstTableName, &pTableSinkInfo); if (code == 0) { tqDebug("s-task:%s build new sinkTableInfo to add cache, dstTable:%s", id, dstTableName); } else { @@ -906,14 +907,14 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat SArray* pTagArray = taosArrayInit(pTSchema->numOfCols + 1, sizeof(STagVal)); if (pTagArray == NULL) { + tqError("s-task:%s failed to build auto create submit msg in sink, vgId:%d, due to %s", id, vgId, + tstrerror(terrno)); return terrno; } pTableData->flags = SUBMIT_REQ_AUTO_CREATE_TABLE; - code = - buildAutoCreateTableReq(stbFullName, suid, pTSchema->numOfCols + 1, pDataBlock, pTagArray, - (pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER && pTask->subtableWithoutMd5 != 1), - &pTableData->pCreateTbReq); + code = buildAutoCreateTableReq(stbFullName, suid, pTSchema->numOfCols + 1, pDataBlock, pTagArray, + IS_NEW_SUBTB_RULE(pTask), &pTableData->pCreateTbReq); taosArrayDestroy(pTagArray); if (code) { @@ -923,12 +924,12 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat } pTableSinkInfo->uid = 0; - code = doPutIntoCache(pTask->outputInfo.tbSink.pTblInfo, pTableSinkInfo, groupId, id); + code = doPutSinkTableInfoIntoCache(pTask->outputInfo.tbSink.pTbInfo, pTableSinkInfo, groupId, id); } else { metaReaderClear(&mr); - tqError("s-task:%s vgId:%d dst-table:%s not auto-created, and not create in tsdb, discard data", id, - vgId, dstTableName); + tqError("s-task:%s vgId:%d dst-table:%s not auto-created, and not create in tsdb, discard data", id, vgId, + dstTableName); return TSDB_CODE_TDB_TABLE_NOT_EXIST; } } else { @@ -944,7 +945,7 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat pTableSinkInfo->uid = mr.me.uid; metaReaderClear(&mr); - code = doPutIntoCache(pTask->outputInfo.tbSink.pTblInfo, pTableSinkInfo, groupId, id); + code = doPutSinkTableInfoIntoCache(pTask->outputInfo.tbSink.pTbInfo, pTableSinkInfo, groupId, id); } } } @@ -952,8 +953,8 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat return code; } -int32_t tqSetDstTableDataPayload(uint64_t suid, const STSchema *pTSchema, int32_t blockIndex, SSDataBlock* pDataBlock, - SSubmitTbData* pTableData, int64_t earlyTs, const char* id) { +int32_t tqSetDstTableDataPayload(uint64_t suid, const STSchema* pTSchema, int32_t blockIndex, SSDataBlock* pDataBlock, + SSubmitTbData* pTableData, int64_t earlyTs, const char* id) { int32_t numOfRows = pDataBlock->info.rows; char* dstTableName = pDataBlock->info.parTbName; @@ -975,6 +976,43 @@ int32_t tqSetDstTableDataPayload(uint64_t suid, const STSchema *pTSchema, int32_ return code; } +int32_t checkTagSchema(SStreamTask* pTask, SVnode* pVnode) { + int32_t code = TSDB_CODE_SUCCESS; + const char* id = pTask->id.idStr; + STaskOutputInfo* pOutputInfo = &pTask->outputInfo; + int32_t vgId = pTask->pMeta->vgId; + + if (pTask->outputInfo.tbSink.pTagSchema == NULL) { + SMetaReader mer1 = {0}; + metaReaderDoInit(&mer1, pVnode->pMeta, META_READER_LOCK); + + code = metaReaderGetTableEntryByUid(&mer1, pOutputInfo->tbSink.stbUid); + if (code != TSDB_CODE_SUCCESS) { + tqError("s-task:%s vgId:%d failed to get the dst stable, failed to sink results", id, vgId); + metaReaderClear(&mer1); + return code; + } + + pOutputInfo->tbSink.pTagSchema = tCloneSSchemaWrapper(&mer1.me.stbEntry.schemaTag); + metaReaderClear(&mer1); + + if (pOutputInfo->tbSink.pTagSchema == NULL) { + tqError("s-task:%s failed to clone tag schema, code:%s, failed to sink results", id, tstrerror(terrno)); + return terrno; + } + + SSchemaWrapper* pTagSchema = pOutputInfo->tbSink.pTagSchema; + SSchema* pCol1 = &pTagSchema->pSchema[0]; + if (pTagSchema->nCols == 1 && pCol1->type == TSDB_DATA_TYPE_UBIGINT && strcmp(pCol1->name, "group_id") == 0) { + pOutputInfo->tbSink.autoCreateCtb = true; + } else { + pOutputInfo->tbSink.autoCreateCtb = false; + } + } + + return code; +} + void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) { const SArray* pBlocks = (const SArray*)data; SVnode* pVnode = (SVnode*)vnode; @@ -988,27 +1026,9 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) { int64_t earlyTs = tsdbGetEarliestTs(pVnode->pTsdb); STaskOutputInfo* pOutputInfo = &pTask->outputInfo; - if (pTask->outputInfo.tbSink.pTagSchema == NULL) { - SMetaReader mer1 = {0}; - metaReaderDoInit(&mer1, pVnode->pMeta, META_READER_LOCK); - - code = metaReaderGetTableEntryByUid(&mer1, pOutputInfo->tbSink.stbUid); - if (code != TSDB_CODE_SUCCESS) { - tqError("s-task:%s vgId:%d failed to get the dst stable, failed to sink results", id, vgId); - metaReaderClear(&mer1); - return; - } - - pOutputInfo->tbSink.pTagSchema = tCloneSSchemaWrapper(&mer1.me.stbEntry.schemaTag); - metaReaderClear(&mer1); - - SSchemaWrapper* pTagSchema = pOutputInfo->tbSink.pTagSchema; - SSchema* pCol1 = &pTagSchema->pSchema[0]; - if (pTagSchema->nCols == 1 && pCol1->type == TSDB_DATA_TYPE_UBIGINT && strcmp(pCol1->name, "group_id") == 0) { - pOutputInfo->tbSink.autoCreateCtb = true; - } else { - pOutputInfo->tbSink.autoCreateCtb = false; - } + code = checkTagSchema(pTask, pVnode); + if (code != TSDB_CODE_SUCCESS) { + return; } bool onlySubmitData = hasOnlySubmitData(pBlocks, numOfBlocks); @@ -1033,144 +1053,16 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) { } else if (pDataBlock->info.type == STREAM_CHECKPOINT) { continue; } else { - pTask->execInfo.sink.numOfBlocks += 1; - - SSubmitReq2 submitReq = {.aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData))}; - if (submitReq.aSubmitTbData == NULL) { - code = terrno; - tqError("s-task:%s vgId:%d failed to prepare submit msg in sink task, code:%s", id, vgId, tstrerror(code)); - return; - } - - SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .flags = TD_REQ_FROM_APP}; - code = setDstTableDataUid(pVnode, pTask, pDataBlock, stbFullName, &tbData); - if (code != TSDB_CODE_SUCCESS) { - tqError("vgId:%d s-task:%s dst-table not exist, stb:%s discard stream results", vgId, id, stbFullName); - continue; - } - - code = tqSetDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, earlyTs, id); - if (code != TSDB_CODE_SUCCESS || tbData.aRowP == NULL) { - if (tbData.pCreateTbReq != NULL) { - tdDestroySVCreateTbReq(tbData.pCreateTbReq); - (void) doRemoveFromCache(pTask->outputInfo.tbSink.pTblInfo, pDataBlock->info.id.groupId, id); - tbData.pCreateTbReq = NULL; - } - continue; - } - - void* p = taosArrayPush(submitReq.aSubmitTbData, &tbData); - if (p == NULL) { - tqDebug("vgId:%d, s-task:%s failed to build submit msg, data lost", vgId, id); - } - - code = doBuildAndSendSubmitMsg(pVnode, pTask, &submitReq, 1); - if (code) { // failed and continue - tqDebug("vgId:%d, s-task:%s submit msg failed, data lost", vgId, id); - } + code = handleResultBlockMsg(pTask, pDataBlock, i, pVnode, earlyTs); } } } else { tqDebug("vgId:%d, s-task:%s write %d stream resBlock(s) into table, merge submit msg", vgId, id, numOfBlocks); - SHashObj* pTableIndexMap = - taosHashInit(numOfBlocks, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); - - SSubmitReq2 submitReq = {.aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData))}; - if (submitReq.aSubmitTbData == NULL) { - code = terrno; - tqError("s-task:%s vgId:%d failed to prepare submit msg in sink task, code:%s", id, vgId, tstrerror(code)); - taosHashCleanup(pTableIndexMap); + if (streamTaskShouldStop(pTask)) { return; } - bool hasSubmit = false; - for (int32_t i = 0; i < numOfBlocks; i++) { - if (streamTaskShouldStop(pTask)) { - taosHashCleanup(pTableIndexMap); - tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE); - return; - } - - SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i); - if (pDataBlock == NULL) { - continue; - } - - if (pDataBlock->info.type == STREAM_CHECKPOINT) { - continue; - } - - hasSubmit = true; - pTask->execInfo.sink.numOfBlocks += 1; - uint64_t groupId = pDataBlock->info.id.groupId; - - SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .flags = TD_REQ_FROM_APP}; - - int32_t* index = taosHashGet(pTableIndexMap, &groupId, sizeof(groupId)); - if (index == NULL) { // no data yet, append it - code = setDstTableDataUid(pVnode, pTask, pDataBlock, stbFullName, &tbData); - if (code != TSDB_CODE_SUCCESS) { - tqError("vgId:%d dst-table gid:%" PRId64 " not exist, discard stream results", vgId, groupId); - continue; - } - - code = tqSetDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, earlyTs, id); - if (code != TSDB_CODE_SUCCESS || tbData.aRowP == NULL) { - if (tbData.pCreateTbReq != NULL) { - tdDestroySVCreateTbReq(tbData.pCreateTbReq); - (void) doRemoveFromCache(pTask->outputInfo.tbSink.pTblInfo, groupId, id); - tbData.pCreateTbReq = NULL; - } - continue; - } - - void* p = taosArrayPush(submitReq.aSubmitTbData, &tbData); - if (p == NULL) { - tqError("vgId:%d, s-task:%s failed to build submit msg, data lost", vgId, id); - continue; - } - - int32_t size = (int32_t)taosArrayGetSize(submitReq.aSubmitTbData) - 1; - code = taosHashPut(pTableIndexMap, &groupId, sizeof(groupId), &size, sizeof(size)); - if (code) { - tqError("vgId:%d, s-task:%s failed to put group into index map, code:%s", vgId, id, tstrerror(code)); - continue; - } - } else { - code = tqSetDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, earlyTs, id); - if (code != TSDB_CODE_SUCCESS || tbData.aRowP == NULL) { - if (tbData.pCreateTbReq != NULL) { - tdDestroySVCreateTbReq(tbData.pCreateTbReq); - tbData.pCreateTbReq = NULL; - } - continue; - } - - SSubmitTbData* pExisted = taosArrayGet(submitReq.aSubmitTbData, *index); - if (pExisted == NULL) { - continue; - } - - code = doMergeExistedRows(pExisted, &tbData, id); - if (code != TSDB_CODE_SUCCESS) { - continue; - } - } - - pTask->execInfo.sink.numOfRows += pDataBlock->info.rows; - } - - taosHashCleanup(pTableIndexMap); - - if (hasSubmit) { - code = doBuildAndSendSubmitMsg(pVnode, pTask, &submitReq, numOfBlocks); - if (code) { // failed and continue - tqError("vgId:%d failed to build and send submit msg", vgId); - } - } else { - tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE); - tqDebug("vgId:%d, s-task:%s write results completed", vgId, id); - } + reubuildAndSendMultiResBlock(pTask, pBlocks, pVnode, earlyTs); } } @@ -1190,7 +1082,7 @@ bool hasOnlySubmitData(const SArray* pBlocks, int32_t numOfBlocks) { return true; } -int32_t doPutIntoCache(SSHashObj* pSinkTableMap, STableSinkInfo* pTableSinkInfo, uint64_t groupId, const char* id) { +int32_t doPutSinkTableInfoIntoCache(SSHashObj* pSinkTableMap, STableSinkInfo* pTableSinkInfo, uint64_t groupId, const char* id) { int32_t code = tSimpleHashPut(pSinkTableMap, &groupId, sizeof(uint64_t), &pTableSinkInfo, POINTER_BYTES); if (code != TSDB_CODE_SUCCESS) { taosMemoryFreeClear(pTableSinkInfo); @@ -1202,7 +1094,17 @@ int32_t doPutIntoCache(SSHashObj* pSinkTableMap, STableSinkInfo* pTableSinkInfo, return code; } -int32_t doRemoveFromCache(SSHashObj* pSinkTableMap, uint64_t groupId, const char* id) { +bool doGetSinkTableInfoFromCache(SSHashObj* pTableInfoMap, uint64_t groupId, STableSinkInfo** pInfo) { + void* pVal = tSimpleHashGet(pTableInfoMap, &groupId, sizeof(uint64_t)); + if (pVal) { + *pInfo = *(STableSinkInfo**)pVal; + return true; + } + + return false; +} + +int32_t doRemoveSinkTableInfoInCache(SSHashObj* pSinkTableMap, uint64_t groupId, const char* id) { if (tSimpleHashGetSize(pSinkTableMap) == 0) { return TSDB_CODE_SUCCESS; } @@ -1223,8 +1125,8 @@ int32_t doBuildAndSendDeleteMsg(SVnode* pVnode, char* stbFullName, SSDataBlock* return terrno; } - int32_t code = tqBuildDeleteReq(pVnode->pTq, stbFullName, pDataBlock, &deleteReq, pTask->id.idStr, - pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER && pTask->subtableWithoutMd5 != 1); + int32_t code = + tqBuildDeleteReq(pVnode->pTq, stbFullName, pDataBlock, &deleteReq, pTask->id.idStr, IS_NEW_SUBTB_RULE(pTask)); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -1262,3 +1164,155 @@ int32_t doBuildAndSendDeleteMsg(SVnode* pVnode, char* stbFullName, SSDataBlock* return TSDB_CODE_SUCCESS; } + +void reubuildAndSendMultiResBlock(SStreamTask* pTask, const SArray* pBlocks, SVnode* pVnode, int64_t earlyTs) { + int32_t code = 0; + const char* id = pTask->id.idStr; + int32_t vgId = pTask->pMeta->vgId; + int32_t numOfBlocks = taosArrayGetSize(pBlocks); + int64_t suid = pTask->outputInfo.tbSink.stbUid; + STSchema* pTSchema = pTask->outputInfo.tbSink.pTSchema; + char* stbFullName = pTask->outputInfo.tbSink.stbFullName; + + SHashObj* pTableIndexMap = + taosHashInit(numOfBlocks, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); + + SSubmitReq2 submitReq = {.aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData))}; + if (submitReq.aSubmitTbData == NULL) { + code = terrno; + tqError("s-task:%s vgId:%d failed to prepare submit msg in sink task, code:%s", id, vgId, tstrerror(code)); + taosHashCleanup(pTableIndexMap); + return; + } + + bool hasSubmit = false; + for (int32_t i = 0; i < numOfBlocks; i++) { + SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i); + if (pDataBlock == NULL) { + continue; + } + + if (pDataBlock->info.type == STREAM_CHECKPOINT) { + continue; + } + + hasSubmit = true; + pTask->execInfo.sink.numOfBlocks += 1; + uint64_t groupId = pDataBlock->info.id.groupId; + + SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .flags = TD_REQ_FROM_APP}; + + int32_t* index = taosHashGet(pTableIndexMap, &groupId, sizeof(groupId)); + if (index == NULL) { // no data yet, append it + code = setDstTableDataUid(pVnode, pTask, pDataBlock, stbFullName, &tbData); + if (code != TSDB_CODE_SUCCESS) { + tqError("vgId:%d dst-table gid:%" PRId64 " not exist, discard stream results", vgId, groupId); + continue; + } + + code = tqSetDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, earlyTs, id); + if (code != TSDB_CODE_SUCCESS || tbData.aRowP == NULL) { + if (tbData.pCreateTbReq != NULL) { + tdDestroySVCreateTbReq(tbData.pCreateTbReq); + (void)doRemoveSinkTableInfoInCache(pTask->outputInfo.tbSink.pTbInfo, groupId, id); + tbData.pCreateTbReq = NULL; + } + continue; + } + + void* p = taosArrayPush(submitReq.aSubmitTbData, &tbData); + if (p == NULL) { + tqError("vgId:%d, s-task:%s failed to build submit msg, data lost", vgId, id); + continue; + } + + int32_t size = (int32_t)taosArrayGetSize(submitReq.aSubmitTbData) - 1; + code = taosHashPut(pTableIndexMap, &groupId, sizeof(groupId), &size, sizeof(size)); + if (code) { + tqError("vgId:%d, s-task:%s failed to put group into index map, code:%s", vgId, id, tstrerror(code)); + continue; + } + } else { + code = tqSetDstTableDataPayload(suid, pTSchema, i, pDataBlock, &tbData, earlyTs, id); + if (code != TSDB_CODE_SUCCESS || tbData.aRowP == NULL) { + if (tbData.pCreateTbReq != NULL) { + tdDestroySVCreateTbReq(tbData.pCreateTbReq); + tbData.pCreateTbReq = NULL; + } + continue; + } + + SSubmitTbData* pExisted = taosArrayGet(submitReq.aSubmitTbData, *index); + if (pExisted == NULL) { + continue; + } + + code = doMergeExistedRows(pExisted, &tbData, id); + if (code != TSDB_CODE_SUCCESS) { + continue; + } + } + + pTask->execInfo.sink.numOfRows += pDataBlock->info.rows; + } + + taosHashCleanup(pTableIndexMap); + + if (hasSubmit) { + code = doBuildAndSendSubmitMsg(pVnode, pTask, &submitReq, numOfBlocks); + if (code) { // failed and continue + tqError("vgId:%d failed to build and send submit msg", vgId); + } + } else { + tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE); + tqDebug("vgId:%d, s-task:%s write results completed", vgId, id); + } +} + +int32_t handleResultBlockMsg(SStreamTask* pTask, SSDataBlock* pDataBlock, int32_t index, SVnode* pVnode, int64_t earlyTs) { + int32_t code = 0; + STSchema* pTSchema = pTask->outputInfo.tbSink.pTSchema; + int64_t suid = pTask->outputInfo.tbSink.stbUid; + const char* id = pTask->id.idStr; + int32_t vgId = TD_VID(pVnode); + char* stbFullName = pTask->outputInfo.tbSink.stbFullName; + + pTask->execInfo.sink.numOfBlocks += 1; + + SSubmitReq2 submitReq = {.aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData))}; + if (submitReq.aSubmitTbData == NULL) { + tqError("s-task:%s vgId:%d failed to prepare submit msg in sink task, code:%s", id, vgId, tstrerror(terrno)); + return terrno; + } + + SSubmitTbData tbData = {.suid = suid, .uid = 0, .sver = pTSchema->version, .flags = TD_REQ_FROM_APP}; + code = setDstTableDataUid(pVnode, pTask, pDataBlock, stbFullName, &tbData); + if (code != TSDB_CODE_SUCCESS) { + tqError("vgId:%d s-task:%s dst-table not exist, stb:%s discard stream results", vgId, id, stbFullName); + return code; + } + + code = tqSetDstTableDataPayload(suid, pTSchema, index, pDataBlock, &tbData, earlyTs, id); + if (code != TSDB_CODE_SUCCESS || tbData.aRowP == NULL) { + if (tbData.pCreateTbReq != NULL) { + tdDestroySVCreateTbReq(tbData.pCreateTbReq); + (void)doRemoveSinkTableInfoInCache(pTask->outputInfo.tbSink.pTbInfo, pDataBlock->info.id.groupId, id); + tbData.pCreateTbReq = NULL; + } + + return code; + } + + void* p = taosArrayPush(submitReq.aSubmitTbData, &tbData); + if (p == NULL) { + tqDebug("vgId:%d, s-task:%s failed to build submit msg, code:%s, data lost", vgId, id, tstrerror(terrno)); + return terrno; + } + + code = doBuildAndSendSubmitMsg(pVnode, pTask, &submitReq, 1); + if (code) { // failed and continue + tqDebug("vgId:%d, s-task:%s submit msg failed, code:%s data lost", vgId, id, tstrerror(code)); + } + + return code; +} diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 727701e03e..b359cdfc81 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -289,7 +289,7 @@ void tFreeStreamTask(SStreamTask* pTask) { if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) { tDeleteSchemaWrapper(pTask->outputInfo.tbSink.pSchemaWrapper); taosMemoryFree(pTask->outputInfo.tbSink.pTSchema); - tSimpleHashCleanup(pTask->outputInfo.tbSink.pTblInfo); + tSimpleHashCleanup(pTask->outputInfo.tbSink.pTbInfo); tDeleteSchemaWrapper(pTask->outputInfo.tbSink.pTagSchema); } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { taosArrayDestroy(pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos);