enh:[TS-5441] cost too long in tmq write meta data by cache meta and vg info
This commit is contained in:
parent
79b645a93a
commit
e6bf8dcfde
|
@ -4049,18 +4049,17 @@ void tDeleteMqMetaRsp(SMqMetaRsp* pRsp);
|
|||
#define MQ_DATA_RSP_VERSION 100
|
||||
|
||||
typedef struct {
|
||||
struct {
|
||||
SMqRspHead head;
|
||||
STqOffsetVal rspOffset;
|
||||
STqOffsetVal reqOffset;
|
||||
int32_t blockNum;
|
||||
int8_t withTbName;
|
||||
int8_t withSchema;
|
||||
SArray* blockDataLen;
|
||||
SArray* blockData;
|
||||
SArray* blockTbName;
|
||||
SArray* blockSchema;
|
||||
};
|
||||
SMqRspHead head;
|
||||
STqOffsetVal rspOffset;
|
||||
STqOffsetVal reqOffset;
|
||||
int32_t blockNum;
|
||||
int8_t withTbName;
|
||||
int8_t withSchema;
|
||||
SArray* blockDataLen;
|
||||
SArray* blockData;
|
||||
SArray* blockTbName;
|
||||
SArray* blockSchema;
|
||||
SArray* blockSuid;
|
||||
|
||||
union{
|
||||
struct{
|
||||
|
|
|
@ -209,7 +209,8 @@ SMqBatchMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo);
|
|||
|
||||
const SSchemaWrapper* qExtractSchemaFromTask(qTaskInfo_t tinfo);
|
||||
|
||||
const char* qExtractTbnameFromTask(qTaskInfo_t tinfo);
|
||||
const char* qExtractTbnameFromTask(qTaskInfo_t tinfo);
|
||||
const int64_t qExtractSuidFromTask(qTaskInfo_t tinfo);
|
||||
|
||||
void* qExtractReaderFromStreamScanner(void* scanner);
|
||||
|
||||
|
|
|
@ -1856,6 +1856,42 @@ static threadlocal SHashObj* pCreateTbHash = NULL;
|
|||
static threadlocal SHashObj* pNameHash = NULL;
|
||||
static threadlocal SHashObj* pMetaHash = NULL;
|
||||
|
||||
static bool needRefreshMeta(void* rawData, STableMeta* pTableMeta, SSchemaWrapper* pSW){
|
||||
char* p = (char*)rawData;
|
||||
// | version | total length | total rows | blankFill | total columns | flag seg| block group id | column schema | each
|
||||
// column length |
|
||||
p += sizeof(int32_t);
|
||||
p += sizeof(int32_t);
|
||||
p += sizeof(int32_t);
|
||||
p += sizeof(int32_t);
|
||||
p += sizeof(int32_t);
|
||||
p += sizeof(uint64_t);
|
||||
int8_t* fields = p;
|
||||
|
||||
if (pSW->nCols != pTableMeta->tableInfo.numOfColumns) {
|
||||
return true;
|
||||
}
|
||||
for (int i = 0; i < pSW->nCols; i++) {
|
||||
int j = 0;
|
||||
for (; j < pTableMeta->tableInfo.numOfColumns; j++) {
|
||||
SSchema* pColSchema = &pTableMeta->schema[j];
|
||||
char* fieldName = pSW->pSchema[i].name;
|
||||
|
||||
if (strcmp(pColSchema->name, fieldName) == 0) {
|
||||
if (*fields != pColSchema->type && *(int32_t*)(fields + sizeof(int8_t)) != pColSchema->bytes) {
|
||||
return true;
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
fields += sizeof(int8_t) + sizeof(int32_t);
|
||||
|
||||
if (j == pTableMeta->tableInfo.numOfColumns)
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
static int32_t tmqWriteRawImpl(TAOS* taos, uint16_t type, void* data, int32_t dataLen) {
|
||||
if (taos == NULL || data == NULL) {
|
||||
SET_ERROR_MSG("taos:%p or data:%p is NULL", taos, data);
|
||||
|
@ -1905,7 +1941,7 @@ static int32_t tmqWriteRawImpl(TAOS* taos, uint16_t type, void* data, int32_t da
|
|||
RAW_NULL_CHECK(pNameHash);
|
||||
}
|
||||
if (pMetaHash == NULL){
|
||||
pMetaHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
||||
pMetaHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
|
||||
RAW_NULL_CHECK(pMetaHash);
|
||||
taosHashSetFreeFp(pMetaHash, taosMemoryFree);
|
||||
}
|
||||
|
@ -1931,6 +1967,9 @@ static int32_t tmqWriteRawImpl(TAOS* taos, uint16_t type, void* data, int32_t da
|
|||
const char* tbName = (const char*)taosArrayGetP(rspObj.dataRsp.blockTbName, rspObj.resIter);
|
||||
RAW_NULL_CHECK(tbName);
|
||||
|
||||
int64_t* suid = taosArrayGet(rspObj.dataRsp.blockSuid, rspObj.resIter);
|
||||
RAW_NULL_CHECK(suid);
|
||||
|
||||
uDebug(LOG_ID_TAG " write raw data block tbname:%s", LOG_ID_VALUE, tbName);
|
||||
SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
|
||||
(void)strcpy(pName.dbname, pRequest->pDb);
|
||||
|
@ -1960,14 +1999,19 @@ static int32_t tmqWriteRawImpl(TAOS* taos, uint16_t type, void* data, int32_t da
|
|||
vgId = vg->vgId;
|
||||
}
|
||||
|
||||
SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.dataRsp.blockSchema, rspObj.resIter);
|
||||
RAW_NULL_CHECK(pSW);
|
||||
void* rawData = getRawDataFromRes(pRetrieve);
|
||||
RAW_NULL_CHECK(rawData);
|
||||
|
||||
STableMeta* pTableMeta = NULL;
|
||||
STableMeta** pTableMetaTmp = (STableMeta**)taosHashGet(pMetaHash, tbName, strlen(tbName));
|
||||
if (pTableMetaTmp == NULL) {
|
||||
STableMeta** pTableMetaTmp = (STableMeta**)taosHashGet(pMetaHash, suid, POINTER_BYTES);
|
||||
if (pTableMetaTmp == NULL || needRefreshMeta(rawData, *pTableMetaTmp, pSW)) {
|
||||
if (pCreateReqDst) { // change stable name to get meta
|
||||
(void)strcpy(pName.tname, pCreateReqDst->ctb.stbName);
|
||||
}
|
||||
RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta));
|
||||
code = taosHashPut(pMetaHash, tbName, strlen(tbName), &pTableMeta, POINTER_BYTES);
|
||||
code = taosHashPut(pMetaHash, suid, POINTER_BYTES, &pTableMeta, POINTER_BYTES);
|
||||
if (code != 0){
|
||||
taosMemoryFree(pTableMeta);
|
||||
goto end;
|
||||
|
@ -1981,9 +2025,6 @@ static int32_t tmqWriteRawImpl(TAOS* taos, uint16_t type, void* data, int32_t da
|
|||
pTableMeta = *pTableMetaTmp;
|
||||
}
|
||||
|
||||
SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.dataRsp.blockSchema, rspObj.resIter);
|
||||
RAW_NULL_CHECK(pSW);
|
||||
void* rawData = getRawDataFromRes(pRetrieve);
|
||||
char err[ERR_MSG_LEN] = {0};
|
||||
code = rawBlockBindData(pQuery, pTableMeta, rawData, pCreateReqDst, pSW, pSW->nCols, true, err, ERR_MSG_LEN, true);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
|
|
@ -10716,12 +10716,42 @@ _exit:
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t tEncodeSuidArray(SEncoder *pEncoder, const SMqDataRsp *pRsp){
|
||||
for (int32_t i = 0; i < pRsp->blockNum; i++) {
|
||||
if (pRsp->withTbName) {
|
||||
int64_t* suid = taosArrayGet(pRsp->blockSuid, i);
|
||||
if (suid != NULL){
|
||||
TAOS_CHECK_RETURN(tEncodeI64(pEncoder, *suid));
|
||||
}
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
int32_t tEncodeMqDataRsp(SEncoder *pEncoder, const SMqDataRsp *pRsp) {
|
||||
TAOS_CHECK_RETURN(tEncodeMqDataRspCommon(pEncoder, pRsp));
|
||||
TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pRsp->sleepTime));
|
||||
TAOS_CHECK_RETURN(tEncodeSuidArray(pEncoder, pRsp));
|
||||
|
||||
return 0;
|
||||
}
|
||||
int32_t tDecodeSuidArray(SDecoder *pDecoder, SMqDataRsp *pRsp){
|
||||
if (!tDecodeIsEnd(pDecoder)) {
|
||||
if (pRsp->withTbName) {
|
||||
if ((pRsp->blockSuid = taosArrayInit(pRsp->blockNum, sizeof(int64_t))) == NULL) {
|
||||
TAOS_CHECK_RETURN(terrno);
|
||||
}
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < pRsp->blockNum; i++) {
|
||||
int64_t suid = 0;
|
||||
TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &suid));
|
||||
if (taosArrayPush(pRsp->blockSuid, &suid) == NULL) {
|
||||
TAOS_CHECK_RETURN(terrno);
|
||||
}
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
int32_t tDecodeMqDataRspCommon(SDecoder *pDecoder, SMqDataRsp *pRsp) {
|
||||
int32_t code = 0;
|
||||
int32_t lino;
|
||||
|
@ -10798,6 +10828,9 @@ int32_t tDecodeMqDataRsp(SDecoder *pDecoder, SMqDataRsp *pRsp) {
|
|||
if (!tDecodeIsEnd(pDecoder)) {
|
||||
TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pRsp->sleepTime));
|
||||
}
|
||||
if (!tDecodeIsEnd(pDecoder)) {
|
||||
TAOS_CHECK_RETURN(tDecodeSuidArray(pDecoder, pRsp));
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -10811,6 +10844,8 @@ static void tDeleteMqDataRspCommon(SMqDataRsp *pRsp) {
|
|||
pRsp->blockSchema = NULL;
|
||||
taosArrayDestroyP(pRsp->blockTbName, (FDelete)taosMemoryFree);
|
||||
pRsp->blockTbName = NULL;
|
||||
taosArrayDestroy(pRsp->blockSuid);
|
||||
pRsp->blockSuid = NULL;
|
||||
tOffsetDestroy(&pRsp->reqOffset);
|
||||
tOffsetDestroy(&pRsp->rspOffset);
|
||||
}
|
||||
|
@ -10830,6 +10865,8 @@ int32_t tEncodeSTaosxRsp(SEncoder *pEncoder, const SMqDataRsp *pRsp) {
|
|||
TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, createTableReq, createTableLen));
|
||||
}
|
||||
}
|
||||
TAOS_CHECK_EXIT(tEncodeSuidArray(pEncoder, pRsp));
|
||||
|
||||
_exit:
|
||||
return code;
|
||||
}
|
||||
|
@ -10860,6 +10897,9 @@ int32_t tDecodeSTaosxRsp(SDecoder *pDecoder, SMqDataRsp *pRsp) {
|
|||
}
|
||||
}
|
||||
}
|
||||
if (!tDecodeIsEnd(pDecoder)) {
|
||||
TAOS_CHECK_EXIT(tDecodeSuidArray(pDecoder, pRsp));
|
||||
}
|
||||
|
||||
_exit:
|
||||
return code;
|
||||
|
|
|
@ -77,6 +77,14 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp, i
|
|||
tqError("failed to push tbName to blockTbName:%s", tbName);
|
||||
continue;
|
||||
}
|
||||
int64_t suid = 0;
|
||||
if(mr.me.type == TSDB_CHILD_TABLE){
|
||||
suid = mr.me.ctbEntry.suid;
|
||||
}
|
||||
if(taosArrayPush(pRsp->blockSuid, &suid) == NULL){
|
||||
tqError("failed to push suid to blockSuid:%"PRId64, suid);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
metaReaderClear(&mr);
|
||||
return 0;
|
||||
|
@ -210,36 +218,26 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqBat
|
|||
|
||||
if (pDataBlock != NULL && pDataBlock->info.rows > 0) {
|
||||
if (pRsp->withTbName) {
|
||||
if (pOffset->type == TMQ_OFFSET__LOG) {
|
||||
int64_t uid = pExec->pTqReader->lastBlkUid;
|
||||
if (tqAddTbNameToRsp(pTq, uid, pRsp, 1) < 0) {
|
||||
tqError("vgId:%d, failed to add tbname to rsp msg", pTq->pVnode->config.vgId);
|
||||
continue;
|
||||
}
|
||||
} else {
|
||||
char* tbName = taosStrdup(qExtractTbnameFromTask(task));
|
||||
if (tbName == NULL) {
|
||||
tqError("vgId:%d, failed to add tbname to rsp msg, null", pTq->pVnode->config.vgId);
|
||||
return terrno;
|
||||
}
|
||||
if (taosArrayPush(pRsp->blockTbName, &tbName) == NULL){
|
||||
tqError("vgId:%d, failed to add tbname to rsp msg", pTq->pVnode->config.vgId);
|
||||
continue;
|
||||
}
|
||||
char* tbName = taosStrdup(qExtractTbnameFromTask(task));
|
||||
if (tbName == NULL) {
|
||||
tqError("vgId:%d, failed to add tbname to rsp msg, null", pTq->pVnode->config.vgId);
|
||||
return terrno;
|
||||
}
|
||||
if (taosArrayPush(pRsp->blockTbName, &tbName) == NULL){
|
||||
tqError("vgId:%d, failed to add tbname to rsp msg", pTq->pVnode->config.vgId);
|
||||
continue;
|
||||
}
|
||||
int64_t suid = qExtractSuidFromTask(task);
|
||||
if (taosArrayPush(pRsp->blockSuid, &suid) == NULL){
|
||||
tqError("vgId:%d, failed to add suid to rsp msg", pTq->pVnode->config.vgId);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
if (pRsp->withSchema) {
|
||||
if (pOffset->type == TMQ_OFFSET__LOG) {
|
||||
if (tqAddBlockSchemaToRsp(pExec, pRsp) != 0){
|
||||
tqError("vgId:%d, failed to add schema to rsp msg", pTq->pVnode->config.vgId);
|
||||
continue;
|
||||
}
|
||||
} else {
|
||||
SSchemaWrapper* pSW = tCloneSSchemaWrapper(qExtractSchemaFromTask(task));
|
||||
if(taosArrayPush(pRsp->blockSchema, &pSW) == NULL){
|
||||
tqError("vgId:%d, failed to add schema to rsp msg", pTq->pVnode->config.vgId);
|
||||
continue;
|
||||
}
|
||||
SSchemaWrapper* pSW = tCloneSSchemaWrapper(qExtractSchemaFromTask(task));
|
||||
if(taosArrayPush(pRsp->blockSchema, &pSW) == NULL){
|
||||
tqError("vgId:%d, failed to add schema to rsp msg", pTq->pVnode->config.vgId);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -249,12 +247,9 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqBat
|
|||
continue;
|
||||
}
|
||||
pRsp->blockNum++;
|
||||
if (pOffset->type == TMQ_OFFSET__LOG) {
|
||||
continue;
|
||||
} else {
|
||||
rowCnt += pDataBlock->info.rows;
|
||||
if (rowCnt <= tmqRowSize) continue;
|
||||
}
|
||||
rowCnt += pDataBlock->info.rows;
|
||||
if (rowCnt <= tmqRowSize) continue;
|
||||
|
||||
}
|
||||
|
||||
// get meta
|
||||
|
|
|
@ -50,8 +50,11 @@ static int32_t tqInitTaosxRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset) {
|
|||
pRsp->blockDataLen = taosArrayInit(0, sizeof(int32_t));
|
||||
pRsp->blockTbName = taosArrayInit(0, sizeof(void*));
|
||||
pRsp->blockSchema = taosArrayInit(0, sizeof(void*));
|
||||
pRsp->blockSuid = taosArrayInit(0, sizeof(int64_t));
|
||||
|
||||
if (pRsp->blockData == NULL || pRsp->blockDataLen == NULL || pRsp->blockTbName == NULL || pRsp->blockSchema == NULL) {
|
||||
if (pRsp->blockData == NULL || pRsp->blockDataLen == NULL ||
|
||||
pRsp->blockTbName == NULL || pRsp->blockSchema == NULL ||
|
||||
pRsp->blockSuid == NULL) {
|
||||
if (pRsp->blockData != NULL) {
|
||||
taosArrayDestroy(pRsp->blockData);
|
||||
pRsp->blockData = NULL;
|
||||
|
@ -71,6 +74,11 @@ static int32_t tqInitTaosxRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset) {
|
|||
taosArrayDestroy(pRsp->blockSchema);
|
||||
pRsp->blockSchema = NULL;
|
||||
}
|
||||
|
||||
if (pRsp->blockSuid != NULL) {
|
||||
taosArrayDestroy(pRsp->blockSuid);
|
||||
pRsp->blockSuid = NULL;
|
||||
}
|
||||
return terrno;
|
||||
}
|
||||
|
||||
|
|
|
@ -69,6 +69,7 @@ typedef struct {
|
|||
SVersionRange fillHistoryVer;
|
||||
STimeWindow fillHistoryWindow;
|
||||
SStreamState* pState;
|
||||
int64_t suid; // for tmq
|
||||
} SStreamTaskInfo;
|
||||
|
||||
struct SExecTaskInfo {
|
||||
|
|
|
@ -1212,6 +1212,11 @@ const char* qExtractTbnameFromTask(qTaskInfo_t tinfo) {
|
|||
return pTaskInfo->streamInfo.tbName;
|
||||
}
|
||||
|
||||
const int64_t qExtractSuidFromTask(qTaskInfo_t tinfo) {
|
||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||
return pTaskInfo->streamInfo.suid;
|
||||
}
|
||||
|
||||
SMqBatchMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo) {
|
||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||
return &pTaskInfo->streamInfo.btMetaRsp;
|
||||
|
@ -1494,6 +1499,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
|||
|
||||
cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond);
|
||||
tstrncpy(pTaskInfo->streamInfo.tbName, mtInfo.tbName, TSDB_TABLE_NAME_LEN);
|
||||
pTaskInfo->streamInfo.suid = mtInfo.suid;
|
||||
tDeleteSchemaWrapper(pTaskInfo->streamInfo.schema);
|
||||
pTaskInfo->streamInfo.schema = mtInfo.schema;
|
||||
|
||||
|
|
Loading…
Reference in New Issue