Merge pull request #21632 from taosdata/fix/TD-24008
opt stream block dispatch
This commit is contained in:
commit
d7f9857a30
|
@ -248,6 +248,7 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** pReq, const SSDataBlock* pData
|
||||||
tb_uid_t suid);
|
tb_uid_t suid);
|
||||||
|
|
||||||
char* buildCtbNameByGroupId(const char* stbName, uint64_t groupId);
|
char* buildCtbNameByGroupId(const char* stbName, uint64_t groupId);
|
||||||
|
int32_t buildCtbNameByGroupIdImpl(const char* stbName, uint64_t groupId, char* pBuf);
|
||||||
|
|
||||||
static FORCE_INLINE int32_t blockGetEncodeSize(const SSDataBlock* pBlock) {
|
static FORCE_INLINE int32_t blockGetEncodeSize(const SSDataBlock* pBlock) {
|
||||||
return blockDataGetSerialMetaSize(taosArrayGetSize(pBlock->pDataBlock)) + blockDataGetSize(pBlock);
|
return blockDataGetSerialMetaSize(taosArrayGetSize(pBlock->pDataBlock)) + blockDataGetSize(pBlock);
|
||||||
|
|
|
@ -327,6 +327,7 @@ struct SStreamTask {
|
||||||
int64_t checkpointingId;
|
int64_t checkpointingId;
|
||||||
int32_t checkpointAlignCnt;
|
int32_t checkpointAlignCnt;
|
||||||
struct SStreamMeta* pMeta;
|
struct SStreamMeta* pMeta;
|
||||||
|
SSHashObj* pNameMap;
|
||||||
};
|
};
|
||||||
|
|
||||||
// meta
|
// meta
|
||||||
|
|
|
@ -2465,19 +2465,31 @@ _end:
|
||||||
}
|
}
|
||||||
|
|
||||||
char* buildCtbNameByGroupId(const char* stbFullName, uint64_t groupId) {
|
char* buildCtbNameByGroupId(const char* stbFullName, uint64_t groupId) {
|
||||||
if (stbFullName[0] == 0) {
|
char* pBuf = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN + 1);
|
||||||
|
if (!pBuf) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
int32_t code = buildCtbNameByGroupIdImpl(stbFullName, groupId, pBuf);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
taosMemoryFree(pBuf);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
return pBuf;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t buildCtbNameByGroupIdImpl(const char* stbFullName, uint64_t groupId, char* cname) {
|
||||||
|
if (stbFullName[0] == 0) {
|
||||||
|
return TSDB_CODE_FAILED;
|
||||||
|
}
|
||||||
|
|
||||||
SArray* tags = taosArrayInit(0, sizeof(SSmlKv));
|
SArray* tags = taosArrayInit(0, sizeof(SSmlKv));
|
||||||
if (tags == NULL) {
|
if (tags == NULL) {
|
||||||
return NULL;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
|
||||||
void* cname = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN + 1);
|
|
||||||
if (cname == NULL) {
|
if (cname == NULL) {
|
||||||
taosArrayDestroy(tags);
|
taosArrayDestroy(tags);
|
||||||
return NULL;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
|
||||||
SSmlKv pTag = {.key = "group_id",
|
SSmlKv pTag = {.key = "group_id",
|
||||||
|
@ -2499,9 +2511,9 @@ char* buildCtbNameByGroupId(const char* stbFullName, uint64_t groupId) {
|
||||||
taosArrayDestroy(tags);
|
taosArrayDestroy(tags);
|
||||||
|
|
||||||
if ((rname.ctbShortName && rname.ctbShortName[0]) == 0) {
|
if ((rname.ctbShortName && rname.ctbShortName[0]) == 0) {
|
||||||
return NULL;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
return rname.ctbShortName;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) {
|
int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) {
|
||||||
|
|
|
@ -298,10 +298,8 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d
|
||||||
if (res == TSDB_CODE_SUCCESS) {
|
if (res == TSDB_CODE_SUCCESS) {
|
||||||
memcpy(ctbName, pTableSinkInfo->tbName, strlen(pTableSinkInfo->tbName));
|
memcpy(ctbName, pTableSinkInfo->tbName, strlen(pTableSinkInfo->tbName));
|
||||||
} else {
|
} else {
|
||||||
char* tmp = buildCtbNameByGroupId(stbFullName, pDataBlock->info.id.groupId);
|
buildCtbNameByGroupIdImpl(stbFullName, pDataBlock->info.id.groupId, ctbName);
|
||||||
memcpy(ctbName, tmp, strlen(tmp));
|
memcpy(pTableSinkInfo->tbName, ctbName, strlen(ctbName));
|
||||||
memcpy(pTableSinkInfo->tbName, tmp, strlen(tmp));
|
|
||||||
taosMemoryFree(tmp);
|
|
||||||
tqDebug("vgId:%d, gropuId:%" PRIu64 " datablock table name is null", TD_VID(pVnode),
|
tqDebug("vgId:%d, gropuId:%" PRIu64 " datablock table name is null", TD_VID(pVnode),
|
||||||
pDataBlock->info.id.groupId);
|
pDataBlock->info.id.groupId);
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,6 +15,13 @@
|
||||||
|
|
||||||
#include "streamInc.h"
|
#include "streamInc.h"
|
||||||
|
|
||||||
|
#define MAX_BLOCK_NAME_NUM 1024
|
||||||
|
|
||||||
|
typedef struct SBlockName {
|
||||||
|
uint32_t hashValue;
|
||||||
|
char parTbName[TSDB_TABLE_NAME_LEN];
|
||||||
|
} SBlockName;
|
||||||
|
|
||||||
int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq) {
|
int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq) {
|
||||||
if (tStartEncode(pEncoder) < 0) return -1;
|
if (tStartEncode(pEncoder) < 0) return -1;
|
||||||
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
|
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
|
||||||
|
@ -331,6 +338,20 @@ FAIL:
|
||||||
|
|
||||||
int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDataBlock* pDataBlock, int32_t vgSz,
|
int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDataBlock* pDataBlock, int32_t vgSz,
|
||||||
int64_t groupId) {
|
int64_t groupId) {
|
||||||
|
uint32_t hashValue = 0;
|
||||||
|
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
|
||||||
|
if (pTask->pNameMap == NULL) {
|
||||||
|
pTask->pNameMap = tSimpleHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
|
||||||
|
}
|
||||||
|
|
||||||
|
void* pVal = tSimpleHashGet(pTask->pNameMap, &groupId, sizeof(int64_t));
|
||||||
|
if (pVal) {
|
||||||
|
SBlockName* pBln = (SBlockName*)pVal;
|
||||||
|
hashValue = pBln->hashValue;
|
||||||
|
if (!pDataBlock->info.parTbName[0]) {
|
||||||
|
memcpy(pDataBlock->info.parTbName, pBln->parTbName, strlen(pBln->parTbName));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
char* ctbName = taosMemoryCalloc(1, TSDB_TABLE_FNAME_LEN);
|
char* ctbName = taosMemoryCalloc(1, TSDB_TABLE_FNAME_LEN);
|
||||||
if (ctbName == NULL) {
|
if (ctbName == NULL) {
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -339,18 +360,24 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S
|
||||||
if (pDataBlock->info.parTbName[0]) {
|
if (pDataBlock->info.parTbName[0]) {
|
||||||
snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->shuffleDispatcher.dbInfo.db, pDataBlock->info.parTbName);
|
snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->shuffleDispatcher.dbInfo.db, pDataBlock->info.parTbName);
|
||||||
} else {
|
} else {
|
||||||
char* ctbShortName = buildCtbNameByGroupId(pTask->shuffleDispatcher.stbFullName, groupId);
|
buildCtbNameByGroupIdImpl(pTask->shuffleDispatcher.stbFullName, groupId, pDataBlock->info.parTbName);
|
||||||
snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->shuffleDispatcher.dbInfo.db, ctbShortName);
|
snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->shuffleDispatcher.dbInfo.db, pDataBlock->info.parTbName);
|
||||||
taosMemoryFree(ctbShortName);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
|
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
|
||||||
|
|
||||||
/*uint32_t hashValue = MurmurHash3_32(ctbName, strlen(ctbName));*/
|
/*uint32_t hashValue = MurmurHash3_32(ctbName, strlen(ctbName));*/
|
||||||
SUseDbRsp* pDbInfo = &pTask->shuffleDispatcher.dbInfo;
|
SUseDbRsp* pDbInfo = &pTask->shuffleDispatcher.dbInfo;
|
||||||
uint32_t hashValue =
|
hashValue =
|
||||||
taosGetTbHashVal(ctbName, strlen(ctbName), pDbInfo->hashMethod, pDbInfo->hashPrefix, pDbInfo->hashSuffix);
|
taosGetTbHashVal(ctbName, strlen(ctbName), pDbInfo->hashMethod, pDbInfo->hashPrefix, pDbInfo->hashSuffix);
|
||||||
taosMemoryFree(ctbName);
|
taosMemoryFree(ctbName);
|
||||||
|
SBlockName bln = {0};
|
||||||
|
bln.hashValue = hashValue;
|
||||||
|
memcpy(bln.parTbName, pDataBlock->info.parTbName, strlen(pDataBlock->info.parTbName));
|
||||||
|
if (tSimpleHashGetSize(pTask->pNameMap) < MAX_BLOCK_NAME_NUM) {
|
||||||
|
tSimpleHashPut(pTask->pNameMap, &groupId, sizeof(int64_t), &bln, sizeof(SBlockName));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
bool found = false;
|
bool found = false;
|
||||||
// TODO: optimize search
|
// TODO: optimize search
|
||||||
|
|
|
@ -224,5 +224,9 @@ void tFreeStreamTask(SStreamTask* pTask) {
|
||||||
taosMemoryFree((void*)pTask->id.idStr);
|
taosMemoryFree((void*)pTask->id.idStr);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pTask->pNameMap) {
|
||||||
|
tSimpleHashCleanup(pTask->pNameMap);
|
||||||
|
}
|
||||||
|
|
||||||
taosMemoryFree(pTask);
|
taosMemoryFree(pTask);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue