fix: assign uid for rsma by physical plan
This commit is contained in:
parent
3e2c24d987
commit
0dcb3a5da1
|
@ -1713,7 +1713,7 @@ void blockDebugShowDataBlocks(const SArray* dataBlocks, const char* flag) {
|
|||
size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
|
||||
|
||||
int32_t rows = pDataBlock->info.rows;
|
||||
printf("%s |block type %d |child id %d|group id %zX\n", flag, (int32_t)pDataBlock->info.type,
|
||||
printf("%s |block type %d |child id %d|group id %" PRIu64 "\n", flag, (int32_t)pDataBlock->info.type,
|
||||
pDataBlock->info.childId, pDataBlock->info.groupId);
|
||||
for (int32_t j = 0; j < rows; j++) {
|
||||
printf("%s |", flag);
|
||||
|
|
|
@ -599,14 +599,14 @@ static int32_t tdRSmaFetchAndSubmitResult(SRSmaInfoItem *pItem, STSchema *pTSche
|
|||
SSubmitReq *pReq = NULL;
|
||||
// TODO: the schema update should be handled
|
||||
if (buildSubmitReqFromDataBlock(&pReq, pResult, pTSchema, SMA_VID(pSma), suid) < 0) {
|
||||
smaError("vgId:%d, build submit req for rsma table %" PRIi64 "l evel %" PRIi8 " failed since %s", SMA_VID(pSma),
|
||||
smaError("vgId:%d, build submit req for rsma stable %" PRIi64 " level %" PRIi8 " failed since %s", SMA_VID(pSma),
|
||||
suid, pItem->level, terrstr());
|
||||
goto _err;
|
||||
}
|
||||
|
||||
if (pReq && tdProcessSubmitReq(sinkTsdb, output->info.version, pReq) < 0) {
|
||||
taosMemoryFreeClear(pReq);
|
||||
smaError("vgId:%d, process submit req for rsma table %" PRIi64 " level %" PRIi8 " failed since %s",
|
||||
smaError("vgId:%d, process submit req for rsma stable %" PRIi64 " level %" PRIi8 " failed since %s",
|
||||
SMA_VID(pSma), suid, pItem->level, terrstr());
|
||||
goto _err;
|
||||
}
|
||||
|
|
|
@ -318,6 +318,7 @@ typedef struct STableScanInfo {
|
|||
int32_t currentTable;
|
||||
int8_t scanMode;
|
||||
int8_t noTable;
|
||||
int8_t assignBlockUid;
|
||||
} STableScanInfo;
|
||||
|
||||
typedef struct STableMergeScanInfo {
|
||||
|
|
|
@ -270,6 +270,12 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
|
|||
}
|
||||
|
||||
taosArrayPush(pTaskInfo->tableqinfoList.pTableList, &keyInfo);
|
||||
if (pTaskInfo->tableqinfoList.map == NULL) {
|
||||
pTaskInfo->tableqinfoList.map =
|
||||
taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
||||
}
|
||||
|
||||
taosHashPut(pTaskInfo->tableqinfoList.map, uid, sizeof(uid), &keyInfo.groupId, sizeof(keyInfo.groupId));
|
||||
}
|
||||
|
||||
if (keyBuf != NULL) {
|
||||
|
|
|
@ -408,6 +408,10 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
|
|||
pBlock->info.groupId = *groupId;
|
||||
}
|
||||
|
||||
if (pTableScanInfo->assignBlockUid) {
|
||||
pBlock->info.groupId = pBlock->info.uid;
|
||||
}
|
||||
|
||||
pOperator->resultInfo.totalRows = pTableScanInfo->readRecorder.totalRows;
|
||||
pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
|
||||
|
||||
|
@ -616,6 +620,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
|
|||
pInfo->scanFlag = MAIN_SCAN;
|
||||
pInfo->pColMatchInfo = pColList;
|
||||
pInfo->currentGroupId = -1;
|
||||
pInfo->assignBlockUid = pTableScanNode->assignBlockUid;
|
||||
|
||||
pOperator->name = "TableScanOperator"; // for debug purpose
|
||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
|
||||
|
|
|
@ -1522,6 +1522,7 @@ static const char* jkTableScanPhysiPlanWatermark = "Watermark";
|
|||
static const char* jkTableScanPhysiPlanIgnoreExpired = "IgnoreExpired";
|
||||
static const char* jkTableScanPhysiPlanGroupTags = "GroupTags";
|
||||
static const char* jkTableScanPhysiPlanGroupSort = "GroupSort";
|
||||
static const char* jkTableScanPhysiPlanAssignBlockUid = "AssignBlockUid";
|
||||
|
||||
static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) {
|
||||
const STableScanPhysiNode* pNode = (const STableScanPhysiNode*)pObj;
|
||||
|
@ -1578,6 +1579,9 @@ static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddBoolToObject(pJson, jkTableScanPhysiPlanGroupSort, pNode->groupSort);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddBoolToObject(pJson, jkTableScanPhysiPlanAssignBlockUid, pNode->assignBlockUid);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
@ -1637,6 +1641,9 @@ static int32_t jsonToPhysiTableScanNode(const SJson* pJson, void* pObj) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetBoolValue(pJson, jkTableScanPhysiPlanGroupSort, &pNode->groupSort);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetBoolValue(pJson, jkTableScanPhysiPlanAssignBlockUid, &pNode->assignBlockUid);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
@ -4518,7 +4525,6 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
|
|||
case QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN:
|
||||
return jsonToPhysiScanNode(pJson, pObj);
|
||||
case QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN:
|
||||
|
||||
return jsonToPhysiLastRowScanNode(pJson, pObj);
|
||||
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN:
|
||||
case QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN:
|
||||
|
|
Loading…
Reference in New Issue