fix:[TD-28032]concat subtable and hash val as new subtable name to avoid data from multi time line write to one time line
This commit is contained in:
parent
8add4dc177
commit
9ba2f71487
|
@ -627,13 +627,23 @@ int32_t doWaitForDstTableCreated(SVnode* pVnode, SStreamTask* pTask, STableSinkI
|
||||||
int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDataBlock, char* stbFullName,
|
int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDataBlock, char* stbFullName,
|
||||||
SSubmitTbData* pTableData) {
|
SSubmitTbData* pTableData) {
|
||||||
uint64_t groupId = pDataBlock->info.id.groupId;
|
uint64_t groupId = pDataBlock->info.id.groupId;
|
||||||
char* dstTableName = pDataBlock->info.parTbName;
|
|
||||||
int32_t numOfRows = pDataBlock->info.rows;
|
int32_t numOfRows = pDataBlock->info.rows;
|
||||||
const char* id = pTask->id.idStr;
|
const char* id = pTask->id.idStr;
|
||||||
int64_t suid = pTask->outputInfo.tbSink.stbUid;
|
int64_t suid = pTask->outputInfo.tbSink.stbUid;
|
||||||
STSchema* pTSchema = pTask->outputInfo.tbSink.pTSchema;
|
STSchema* pTSchema = pTask->outputInfo.tbSink.pTSchema;
|
||||||
int32_t vgId = TD_VID(pVnode);
|
int32_t vgId = TD_VID(pVnode);
|
||||||
STableSinkInfo* pTableSinkInfo = NULL;
|
STableSinkInfo* pTableSinkInfo = NULL;
|
||||||
|
char* dstTableName = NULL;
|
||||||
|
int32_t code = 0;
|
||||||
|
if(pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER){
|
||||||
|
dstTableName = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN);
|
||||||
|
if(dstTableName == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
strcpy(dstTableName, pDataBlock->info.parTbName);
|
||||||
|
}else{
|
||||||
|
dstTableName = pDataBlock->info.parTbName;
|
||||||
|
}
|
||||||
|
|
||||||
bool alreadyCached = tqGetTableInfo(pTask->outputInfo.tbSink.pTblInfo, groupId, &pTableSinkInfo);
|
bool alreadyCached = tqGetTableInfo(pTask->outputInfo.tbSink.pTblInfo, groupId, &pTableSinkInfo);
|
||||||
|
|
||||||
|
@ -664,7 +674,8 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat
|
||||||
int32_t nameLen = strlen(dstTableName);
|
int32_t nameLen = strlen(dstTableName);
|
||||||
pTableSinkInfo = taosMemoryCalloc(1, sizeof(STableSinkInfo) + nameLen);
|
pTableSinkInfo = taosMemoryCalloc(1, sizeof(STableSinkInfo) + nameLen);
|
||||||
if (pTableSinkInfo == NULL) {
|
if (pTableSinkInfo == NULL) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto END;
|
||||||
}
|
}
|
||||||
|
|
||||||
pTableSinkInfo->name.len = nameLen;
|
pTableSinkInfo->name.len = nameLen;
|
||||||
|
@ -677,7 +688,8 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat
|
||||||
|
|
||||||
if (pTableData->uid == 0) {
|
if (pTableData->uid == 0) {
|
||||||
tqTrace("s-task:%s cached tableInfo uid is invalid, acquire it from meta", id);
|
tqTrace("s-task:%s cached tableInfo uid is invalid, acquire it from meta", id);
|
||||||
return doWaitForDstTableCreated(pVnode, pTask, pTableSinkInfo, dstTableName, &pTableData->uid);
|
code = doWaitForDstTableCreated(pVnode, pTask, pTableSinkInfo, dstTableName, &pTableData->uid);
|
||||||
|
goto END;
|
||||||
} else {
|
} else {
|
||||||
tqTrace("s-task:%s set the dstTable uid from cache:%"PRId64, id, pTableData->uid);
|
tqTrace("s-task:%s set the dstTable uid from cache:%"PRId64, id, pTableData->uid);
|
||||||
}
|
}
|
||||||
|
@ -705,8 +717,8 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat
|
||||||
|
|
||||||
if (pTableData->pCreateTbReq == NULL) {
|
if (pTableData->pCreateTbReq == NULL) {
|
||||||
tqError("s-task:%s failed to build auto create table req, code:%s", id, tstrerror(terrno));
|
tqError("s-task:%s failed to build auto create table req, code:%s", id, tstrerror(terrno));
|
||||||
taosMemoryFree(pTableSinkInfo);
|
code = terrno;
|
||||||
return terrno;
|
goto END;
|
||||||
}
|
}
|
||||||
|
|
||||||
pTableSinkInfo->uid = 0;
|
pTableSinkInfo->uid = 0;
|
||||||
|
@ -715,10 +727,10 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat
|
||||||
bool isValid = isValidDstChildTable(&mr, vgId, dstTableName, suid);
|
bool isValid = isValidDstChildTable(&mr, vgId, dstTableName, suid);
|
||||||
if (!isValid) {
|
if (!isValid) {
|
||||||
metaReaderClear(&mr);
|
metaReaderClear(&mr);
|
||||||
taosMemoryFree(pTableSinkInfo);
|
|
||||||
tqError("s-task:%s vgId:%d table:%s already exists, but not child table, stream results is discarded", id, vgId,
|
tqError("s-task:%s vgId:%d table:%s already exists, but not child table, stream results is discarded", id, vgId,
|
||||||
dstTableName);
|
dstTableName);
|
||||||
return terrno;
|
code = terrno;
|
||||||
|
goto END;
|
||||||
} else {
|
} else {
|
||||||
pTableData->uid = mr.me.uid;
|
pTableData->uid = mr.me.uid;
|
||||||
pTableSinkInfo->uid = mr.me.uid;
|
pTableSinkInfo->uid = mr.me.uid;
|
||||||
|
@ -729,7 +741,12 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
END:
|
||||||
|
if(pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER){
|
||||||
|
taosMemoryFree(dstTableName);
|
||||||
|
}
|
||||||
|
taosMemoryFree(pTableSinkInfo);
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t setDstTableDataPayload(uint64_t suid, const STSchema *pTSchema, int32_t blockIndex, SSDataBlock* pDataBlock,
|
int32_t setDstTableDataPayload(uint64_t suid, const STSchema *pTSchema, int32_t blockIndex, SSDataBlock* pDataBlock,
|
||||||
|
|
|
@ -18,11 +18,6 @@
|
||||||
#include "trpc.h"
|
#include "trpc.h"
|
||||||
#include "ttimer.h"
|
#include "ttimer.h"
|
||||||
|
|
||||||
typedef struct SBlockName {
|
|
||||||
uint32_t hashValue;
|
|
||||||
char parTbName[TSDB_TABLE_NAME_LEN];
|
|
||||||
} SBlockName;
|
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t upStreamTaskId;
|
int32_t upStreamTaskId;
|
||||||
SEpSet upstreamNodeEpset;
|
SEpSet upstreamNodeEpset;
|
||||||
|
@ -537,40 +532,24 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S
|
||||||
|
|
||||||
void* pVal = tSimpleHashGet(pTask->pNameMap, &groupId, sizeof(int64_t));
|
void* pVal = tSimpleHashGet(pTask->pNameMap, &groupId, sizeof(int64_t));
|
||||||
if (pVal) {
|
if (pVal) {
|
||||||
SBlockName* pBln = (SBlockName*)pVal;
|
hashValue = *(uint32_t*)pVal;
|
||||||
hashValue = pBln->hashValue;
|
|
||||||
if (!pDataBlock->info.parTbName[0]) {
|
|
||||||
memset(pDataBlock->info.parTbName, 0, TSDB_TABLE_NAME_LEN);
|
|
||||||
memcpy(pDataBlock->info.parTbName, pBln->parTbName, strlen(pBln->parTbName));
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
char* ctbName = taosMemoryCalloc(1, TSDB_TABLE_FNAME_LEN);
|
char ctbName[TSDB_TABLE_FNAME_LEN] = {0};
|
||||||
if (ctbName == NULL) {
|
char parTbName[TSDB_TABLE_NAME_LEN] = {0};
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pDataBlock->info.parTbName[0]) {
|
if (pDataBlock->info.parTbName[0]) {
|
||||||
if(pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER){
|
if(pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER){
|
||||||
buildCtbNameAddGruopId(pDataBlock->info.parTbName, groupId);
|
strcpy(parTbName, pDataBlock->info.parTbName);
|
||||||
|
buildCtbNameAddGruopId(parTbName, groupId);
|
||||||
}
|
}
|
||||||
snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->outputInfo.shuffleDispatcher.dbInfo.db,
|
|
||||||
pDataBlock->info.parTbName);
|
|
||||||
} else {
|
} else {
|
||||||
buildCtbNameByGroupIdImpl(pTask->outputInfo.shuffleDispatcher.stbFullName, groupId, pDataBlock->info.parTbName);
|
buildCtbNameByGroupIdImpl(pTask->outputInfo.shuffleDispatcher.stbFullName, groupId, parTbName);
|
||||||
snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->outputInfo.shuffleDispatcher.dbInfo.db,
|
|
||||||
pDataBlock->info.parTbName);
|
|
||||||
}
|
}
|
||||||
|
snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->outputInfo.shuffleDispatcher.dbInfo.db, parTbName);
|
||||||
/*uint32_t hashValue = MurmurHash3_32(ctbName, strlen(ctbName));*/
|
/*uint32_t hashValue = MurmurHash3_32(ctbName, strlen(ctbName));*/
|
||||||
SUseDbRsp* pDbInfo = &pTask->outputInfo.shuffleDispatcher.dbInfo;
|
SUseDbRsp* pDbInfo = &pTask->outputInfo.shuffleDispatcher.dbInfo;
|
||||||
hashValue =
|
hashValue = taosGetTbHashVal(ctbName, strlen(ctbName), pDbInfo->hashMethod, pDbInfo->hashPrefix, pDbInfo->hashSuffix);
|
||||||
taosGetTbHashVal(ctbName, strlen(ctbName), pDbInfo->hashMethod, pDbInfo->hashPrefix, pDbInfo->hashSuffix);
|
|
||||||
taosMemoryFree(ctbName);
|
|
||||||
SBlockName bln = {0};
|
|
||||||
bln.hashValue = hashValue;
|
|
||||||
memcpy(bln.parTbName, pDataBlock->info.parTbName, strlen(pDataBlock->info.parTbName));
|
|
||||||
if (tSimpleHashGetSize(pTask->pNameMap) < MAX_BLOCK_NAME_NUM) {
|
if (tSimpleHashGetSize(pTask->pNameMap) < MAX_BLOCK_NAME_NUM) {
|
||||||
tSimpleHashPut(pTask->pNameMap, &groupId, sizeof(int64_t), &bln, sizeof(SBlockName));
|
tSimpleHashPut(pTask->pNameMap, &groupId, sizeof(int64_t), &hashValue, sizeof(uint32_t));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue