diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index a1964026e9..d721c7f659 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -627,13 +627,23 @@ int32_t doWaitForDstTableCreated(SVnode* pVnode, SStreamTask* pTask, STableSinkI int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDataBlock, char* stbFullName, SSubmitTbData* pTableData) { uint64_t groupId = pDataBlock->info.id.groupId; - char* dstTableName = pDataBlock->info.parTbName; int32_t numOfRows = pDataBlock->info.rows; const char* id = pTask->id.idStr; int64_t suid = pTask->outputInfo.tbSink.stbUid; STSchema* pTSchema = pTask->outputInfo.tbSink.pTSchema; int32_t vgId = TD_VID(pVnode); STableSinkInfo* pTableSinkInfo = NULL; + char* dstTableName = NULL; + int32_t code = 0; + if(pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER){ + dstTableName = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN); + if(dstTableName == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + strcpy(dstTableName, pDataBlock->info.parTbName); + }else{ + dstTableName = pDataBlock->info.parTbName; + } bool alreadyCached = tqGetTableInfo(pTask->outputInfo.tbSink.pTblInfo, groupId, &pTableSinkInfo); @@ -664,7 +674,8 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat int32_t nameLen = strlen(dstTableName); pTableSinkInfo = taosMemoryCalloc(1, sizeof(STableSinkInfo) + nameLen); if (pTableSinkInfo == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; + goto END; } pTableSinkInfo->name.len = nameLen; @@ -677,7 +688,8 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat if (pTableData->uid == 0) { tqTrace("s-task:%s cached tableInfo uid is invalid, acquire it from meta", id); - return doWaitForDstTableCreated(pVnode, pTask, pTableSinkInfo, dstTableName, &pTableData->uid); + code = doWaitForDstTableCreated(pVnode, pTask, pTableSinkInfo, dstTableName, &pTableData->uid); + goto END; } else { tqTrace("s-task:%s set the dstTable uid from cache:%"PRId64, id, pTableData->uid); } @@ -705,8 +717,8 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat if (pTableData->pCreateTbReq == NULL) { tqError("s-task:%s failed to build auto create table req, code:%s", id, tstrerror(terrno)); - taosMemoryFree(pTableSinkInfo); - return terrno; + code = terrno; + goto END; } pTableSinkInfo->uid = 0; @@ -715,10 +727,10 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat bool isValid = isValidDstChildTable(&mr, vgId, dstTableName, suid); if (!isValid) { metaReaderClear(&mr); - taosMemoryFree(pTableSinkInfo); tqError("s-task:%s vgId:%d table:%s already exists, but not child table, stream results is discarded", id, vgId, dstTableName); - return terrno; + code = terrno; + goto END; } else { pTableData->uid = mr.me.uid; pTableSinkInfo->uid = mr.me.uid; @@ -729,7 +741,12 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat } } - return TSDB_CODE_SUCCESS; +END: + if(pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER){ + taosMemoryFree(dstTableName); + } + taosMemoryFree(pTableSinkInfo); + return code; } int32_t setDstTableDataPayload(uint64_t suid, const STSchema *pTSchema, int32_t blockIndex, SSDataBlock* pDataBlock, diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 7252c258de..a9b83a148e 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -18,11 +18,6 @@ #include "trpc.h" #include "ttimer.h" -typedef struct SBlockName { - uint32_t hashValue; - char parTbName[TSDB_TABLE_NAME_LEN]; -} SBlockName; - typedef struct { int32_t upStreamTaskId; SEpSet upstreamNodeEpset; @@ -537,40 +532,24 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S void* pVal = tSimpleHashGet(pTask->pNameMap, &groupId, sizeof(int64_t)); if (pVal) { - SBlockName* pBln = (SBlockName*)pVal; - hashValue = pBln->hashValue; - if (!pDataBlock->info.parTbName[0]) { - memset(pDataBlock->info.parTbName, 0, TSDB_TABLE_NAME_LEN); - memcpy(pDataBlock->info.parTbName, pBln->parTbName, strlen(pBln->parTbName)); - } + hashValue = *(uint32_t*)pVal; } else { - char* ctbName = taosMemoryCalloc(1, TSDB_TABLE_FNAME_LEN); - if (ctbName == NULL) { - return -1; - } - + char ctbName[TSDB_TABLE_FNAME_LEN] = {0}; + char parTbName[TSDB_TABLE_NAME_LEN] = {0}; if (pDataBlock->info.parTbName[0]) { if(pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER){ - buildCtbNameAddGruopId(pDataBlock->info.parTbName, groupId); + strcpy(parTbName, pDataBlock->info.parTbName); + buildCtbNameAddGruopId(parTbName, groupId); } - snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->outputInfo.shuffleDispatcher.dbInfo.db, - pDataBlock->info.parTbName); } else { - buildCtbNameByGroupIdImpl(pTask->outputInfo.shuffleDispatcher.stbFullName, groupId, pDataBlock->info.parTbName); - snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->outputInfo.shuffleDispatcher.dbInfo.db, - pDataBlock->info.parTbName); + buildCtbNameByGroupIdImpl(pTask->outputInfo.shuffleDispatcher.stbFullName, groupId, parTbName); } - + snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->outputInfo.shuffleDispatcher.dbInfo.db, parTbName); /*uint32_t hashValue = MurmurHash3_32(ctbName, strlen(ctbName));*/ SUseDbRsp* pDbInfo = &pTask->outputInfo.shuffleDispatcher.dbInfo; - hashValue = - taosGetTbHashVal(ctbName, strlen(ctbName), pDbInfo->hashMethod, pDbInfo->hashPrefix, pDbInfo->hashSuffix); - taosMemoryFree(ctbName); - SBlockName bln = {0}; - bln.hashValue = hashValue; - memcpy(bln.parTbName, pDataBlock->info.parTbName, strlen(pDataBlock->info.parTbName)); + hashValue = taosGetTbHashVal(ctbName, strlen(ctbName), pDbInfo->hashMethod, pDbInfo->hashPrefix, pDbInfo->hashSuffix); if (tSimpleHashGetSize(pTask->pNameMap) < MAX_BLOCK_NAME_NUM) { - tSimpleHashPut(pTask->pNameMap, &groupId, sizeof(int64_t), &bln, sizeof(SBlockName)); + tSimpleHashPut(pTask->pNameMap, &groupId, sizeof(int64_t), &hashValue, sizeof(uint32_t)); } }