Merge pull request #15519 from taosdata/feature/TD-11274-3.0
fix: assign uid for rsma by physical plan
This commit is contained in:
commit
f7b0476c89
|
@ -1713,7 +1713,7 @@ void blockDebugShowDataBlocks(const SArray* dataBlocks, const char* flag) {
|
|||
size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
|
||||
|
||||
int32_t rows = pDataBlock->info.rows;
|
||||
printf("%s |block type %d |child id %d|group id %zX\n", flag, (int32_t)pDataBlock->info.type,
|
||||
printf("%s |block type %d |child id %d|group id %" PRIu64 "\n", flag, (int32_t)pDataBlock->info.type,
|
||||
pDataBlock->info.childId, pDataBlock->info.groupId);
|
||||
for (int32_t j = 0; j < rows; j++) {
|
||||
printf("%s |", flag);
|
||||
|
|
|
@ -599,14 +599,14 @@ static int32_t tdRSmaFetchAndSubmitResult(SRSmaInfoItem *pItem, STSchema *pTSche
|
|||
SSubmitReq *pReq = NULL;
|
||||
// TODO: the schema update should be handled
|
||||
if (buildSubmitReqFromDataBlock(&pReq, pResult, pTSchema, SMA_VID(pSma), suid) < 0) {
|
||||
smaError("vgId:%d, build submit req for rsma table %" PRIi64 "l evel %" PRIi8 " failed since %s", SMA_VID(pSma),
|
||||
smaError("vgId:%d, build submit req for rsma stable %" PRIi64 " level %" PRIi8 " failed since %s", SMA_VID(pSma),
|
||||
suid, pItem->level, terrstr());
|
||||
goto _err;
|
||||
}
|
||||
|
||||
if (pReq && tdProcessSubmitReq(sinkTsdb, output->info.version, pReq) < 0) {
|
||||
taosMemoryFreeClear(pReq);
|
||||
smaError("vgId:%d, process submit req for rsma table %" PRIi64 " level %" PRIi8 " failed since %s",
|
||||
smaError("vgId:%d, process submit req for rsma stable %" PRIi64 " level %" PRIi8 " failed since %s",
|
||||
SMA_VID(pSma), suid, pItem->level, terrstr());
|
||||
goto _err;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,288 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "sma.h"
|
||||
|
||||
static int32_t rsmaSnapReadQTaskInfo(SRsmaSnapReader* pReader, uint8_t** ppData);
|
||||
static int32_t rsmaSnapWriteQTaskInfo(SRsmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData);
|
||||
|
||||
// SRsmaSnapReader ========================================
|
||||
struct SRsmaSnapReader {
|
||||
SSma* pSma;
|
||||
int64_t sver;
|
||||
int64_t ever;
|
||||
|
||||
// for data file
|
||||
int8_t rsmaDataDone[TSDB_RETENTION_L2];
|
||||
STsdbSnapReader* pDataReader[TSDB_RETENTION_L2];
|
||||
|
||||
// for qtaskinfo file
|
||||
int8_t qTaskDone;
|
||||
SQTaskFReader* pQTaskFReader;
|
||||
};
|
||||
|
||||
int32_t rsmaSnapReaderOpen(SSma* pSma, int64_t sver, int64_t ever, SRsmaSnapReader** ppReader) {
|
||||
int32_t code = 0;
|
||||
SRsmaSnapReader* pReader = NULL;
|
||||
|
||||
// alloc
|
||||
pReader = (SRsmaSnapReader*)taosMemoryCalloc(1, sizeof(*pReader));
|
||||
if (pReader == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
pReader->pSma = pSma;
|
||||
pReader->sver = sver;
|
||||
pReader->ever = ever;
|
||||
|
||||
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
|
||||
if (pSma->pRSmaTsdb[i]) {
|
||||
code = tsdbSnapReaderOpen(pSma->pRSmaTsdb[i], sver, ever, &pReader->pDataReader[i]);
|
||||
if (code < 0) {
|
||||
goto _err;
|
||||
}
|
||||
}
|
||||
}
|
||||
*ppReader = pReader;
|
||||
smaInfo("vgId:%d vnode snapshot rsma reader opened succeed", SMA_VID(pSma));
|
||||
return TSDB_CODE_SUCCESS;
|
||||
_err:
|
||||
smaError("vgId:%d vnode snapshot rsma reader opened failed since %s", SMA_VID(pSma), tstrerror(code));
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
static int32_t rsmaSnapReadQTaskInfo(SRsmaSnapReader* pReader, uint8_t** ppData) {
|
||||
int32_t code = 0;
|
||||
SSma* pSma = pReader->pSma;
|
||||
|
||||
_exit:
|
||||
smaInfo("vgId:%d vnode snapshot rsma read qtaskinfo succeed", SMA_VID(pSma));
|
||||
return code;
|
||||
|
||||
_err:
|
||||
smaError("vgId:%d vnode snapshot rsma read qtaskinfo failed since %s", SMA_VID(pSma), tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t rsmaSnapRead(SRsmaSnapReader* pReader, uint8_t** ppData) {
|
||||
int32_t code = 0;
|
||||
|
||||
*ppData = NULL;
|
||||
|
||||
smaInfo("vgId:%d vnode snapshot rsma read entry", SMA_VID(pReader->pSma));
|
||||
// read rsma1/rsma2 file
|
||||
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
|
||||
STsdbSnapReader* pTsdbSnapReader = pReader->pDataReader[i];
|
||||
if (!pTsdbSnapReader) {
|
||||
continue;
|
||||
}
|
||||
if (!pReader->rsmaDataDone[i]) {
|
||||
smaInfo("vgId:%d vnode snapshot rsma read level %d not done", SMA_VID(pReader->pSma), i);
|
||||
code = tsdbSnapRead(pTsdbSnapReader, ppData);
|
||||
if (code) {
|
||||
goto _err;
|
||||
} else {
|
||||
if (*ppData) {
|
||||
goto _exit;
|
||||
} else {
|
||||
pReader->rsmaDataDone[i] = 1;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
smaInfo("vgId:%d vnode snapshot rsma read level %d is done", SMA_VID(pReader->pSma), i);
|
||||
}
|
||||
}
|
||||
|
||||
// read qtaskinfo file
|
||||
if (!pReader->qTaskDone) {
|
||||
code = rsmaSnapReadQTaskInfo(pReader, ppData);
|
||||
if (code) {
|
||||
goto _err;
|
||||
} else {
|
||||
if (*ppData) {
|
||||
goto _exit;
|
||||
} else {
|
||||
pReader->qTaskDone = 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
_exit:
|
||||
smaInfo("vgId:%d vnode snapshot rsma read succeed", SMA_VID(pReader->pSma));
|
||||
return code;
|
||||
|
||||
_err:
|
||||
smaError("vgId:%d vnode snapshot rsma read failed since %s", SMA_VID(pReader->pSma), tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t rsmaSnapReaderClose(SRsmaSnapReader** ppReader) {
|
||||
int32_t code = 0;
|
||||
SRsmaSnapReader* pReader = *ppReader;
|
||||
|
||||
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
|
||||
if (pReader->pDataReader[i]) {
|
||||
tsdbSnapReaderClose(&pReader->pDataReader[i]);
|
||||
}
|
||||
}
|
||||
|
||||
if (pReader->pQTaskFReader) {
|
||||
// TODO: close for qtaskinfo
|
||||
smaInfo("vgId:%d vnode snapshot rsma reader closed for qTaskInfo", SMA_VID(pReader->pSma));
|
||||
}
|
||||
|
||||
|
||||
smaInfo("vgId:%d vnode snapshot rsma reader closed", SMA_VID(pReader->pSma));
|
||||
|
||||
taosMemoryFreeClear(*ppReader);
|
||||
return code;
|
||||
}
|
||||
|
||||
// SRsmaSnapWriter ========================================
|
||||
struct SRsmaSnapWriter {
|
||||
SSma* pSma;
|
||||
int64_t sver;
|
||||
int64_t ever;
|
||||
|
||||
// config
|
||||
int64_t commitID;
|
||||
|
||||
// for data file
|
||||
STsdbSnapWriter* pDataWriter[TSDB_RETENTION_L2];
|
||||
|
||||
// for qtaskinfo file
|
||||
SQTaskFReader* pQTaskFReader;
|
||||
SQTaskFWriter* pQTaskFWriter;
|
||||
};
|
||||
|
||||
int32_t rsmaSnapWriterOpen(SSma* pSma, int64_t sver, int64_t ever, SRsmaSnapWriter** ppWriter) {
|
||||
int32_t code = 0;
|
||||
SRsmaSnapWriter* pWriter = NULL;
|
||||
|
||||
// alloc
|
||||
pWriter = (SRsmaSnapWriter*)taosMemoryCalloc(1, sizeof(*pWriter));
|
||||
if (pWriter == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
pWriter->pSma = pSma;
|
||||
pWriter->sver = sver;
|
||||
pWriter->ever = ever;
|
||||
|
||||
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
|
||||
if (pSma->pRSmaTsdb[i]) {
|
||||
code = tsdbSnapWriterOpen(pSma->pRSmaTsdb[i], sver, ever, &pWriter->pDataWriter[i]);
|
||||
if (code < 0) {
|
||||
goto _err;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// qtaskinfo
|
||||
// TODO
|
||||
|
||||
*ppWriter = pWriter;
|
||||
|
||||
smaInfo("vgId:%d rsma snapshot writer open succeed", TD_VID(pSma->pVnode));
|
||||
return code;
|
||||
|
||||
_err:
|
||||
smaError("vgId:%d rsma snapshot writer open failed since %s", TD_VID(pSma->pVnode), tstrerror(code));
|
||||
*ppWriter = NULL;
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t rsmaSnapWriterClose(SRsmaSnapWriter** ppWriter, int8_t rollback) {
|
||||
int32_t code = 0;
|
||||
SRsmaSnapWriter* pWriter = *ppWriter;
|
||||
|
||||
if (rollback) {
|
||||
ASSERT(0);
|
||||
// code = tsdbFSRollback(pWriter->pTsdb->pFS);
|
||||
// if (code) goto _err;
|
||||
} else {
|
||||
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
|
||||
if (pWriter->pDataWriter[i]) {
|
||||
code = tsdbSnapWriterClose(&pWriter->pDataWriter[i], rollback);
|
||||
if (code) goto _err;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
taosMemoryFree(pWriter);
|
||||
*ppWriter = NULL;
|
||||
|
||||
smaInfo("vgId:%d vnode snapshot rsma writer close succeed", SMA_VID(pWriter->pSma));
|
||||
return code;
|
||||
|
||||
_err:
|
||||
smaError("vgId:%d vnode snapshot rsma writer close failed since %s", SMA_VID(pWriter->pSma), tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t rsmaSnapWrite(SRsmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
|
||||
int32_t code = 0;
|
||||
SSnapDataHdr* pHdr = (SSnapDataHdr*)pData;
|
||||
|
||||
// rsma1/rsma2
|
||||
if (pHdr->type == SNAP_DATA_RSMA1) {
|
||||
pHdr->type = SNAP_DATA_TSDB;
|
||||
code = tsdbSnapWrite(pWriter->pDataWriter[0], pData, nData);
|
||||
} else if (pHdr->type == SNAP_DATA_RSMA2) {
|
||||
pHdr->type = SNAP_DATA_TSDB;
|
||||
code = tsdbSnapWrite(pWriter->pDataWriter[1], pData, nData);
|
||||
} else if (pHdr->type == SNAP_DATA_QTASK) {
|
||||
code = rsmaSnapWriteQTaskInfo(pWriter, pData, nData);
|
||||
}
|
||||
if (code < 0) goto _err;
|
||||
|
||||
_exit:
|
||||
smaInfo("vgId:%d rsma snapshot write for data %" PRIi8 " succeed", SMA_VID(pWriter->pSma), pHdr->type);
|
||||
return code;
|
||||
|
||||
_err:
|
||||
smaError("vgId:%d rsma snapshot write for data %" PRIi8 " failed since %s", SMA_VID(pWriter->pSma), pHdr->type,
|
||||
tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t rsmaSnapWriteQTaskInfo(SRsmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
|
||||
int32_t code = 0;
|
||||
|
||||
if (pWriter->pQTaskFWriter == NULL) {
|
||||
// SDelFile* pDelFile = pWriter->fs.pDelFile;
|
||||
|
||||
// // reader
|
||||
// if (pDelFile) {
|
||||
// code = tsdbDelFReaderOpen(&pWriter->pDelFReader, pDelFile, pTsdb, NULL);
|
||||
// if (code) goto _err;
|
||||
|
||||
// code = tsdbReadDelIdx(pWriter->pDelFReader, pWriter->aDelIdxR, NULL);
|
||||
// if (code) goto _err;
|
||||
// }
|
||||
|
||||
// // writer
|
||||
// SDelFile delFile = {.commitID = pWriter->commitID, .offset = 0, .size = 0};
|
||||
// code = tsdbDelFWriterOpen(&pWriter->pDelFWriter, &delFile, pTsdb);
|
||||
// if (code) goto _err;
|
||||
}
|
||||
smaInfo("vgId:%d vnode snapshot rsma write qtaskinfo succeed", SMA_VID(pWriter->pSma));
|
||||
_exit:
|
||||
return code;
|
||||
|
||||
_err:
|
||||
smaError("vgId:%d vnode snapshot rsma write qtaskinfo failed since %s", SMA_VID(pWriter->pSma), tstrerror(code));
|
||||
return code;
|
||||
}
|
|
@ -318,6 +318,7 @@ typedef struct STableScanInfo {
|
|||
int32_t currentTable;
|
||||
int8_t scanMode;
|
||||
int8_t noTable;
|
||||
int8_t assignBlockUid;
|
||||
} STableScanInfo;
|
||||
|
||||
typedef struct STableMergeScanInfo {
|
||||
|
|
|
@ -264,6 +264,12 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
|
|||
}
|
||||
|
||||
taosArrayPush(pTaskInfo->tableqinfoList.pTableList, &keyInfo);
|
||||
if (pTaskInfo->tableqinfoList.map == NULL) {
|
||||
pTaskInfo->tableqinfoList.map =
|
||||
taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
||||
}
|
||||
|
||||
taosHashPut(pTaskInfo->tableqinfoList.map, uid, sizeof(uid), &keyInfo.groupId, sizeof(keyInfo.groupId));
|
||||
}
|
||||
|
||||
if (keyBuf != NULL) {
|
||||
|
|
|
@ -408,6 +408,10 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
|
|||
pBlock->info.groupId = *groupId;
|
||||
}
|
||||
|
||||
if (pTableScanInfo->assignBlockUid) {
|
||||
pBlock->info.groupId = pBlock->info.uid;
|
||||
}
|
||||
|
||||
pOperator->resultInfo.totalRows = pTableScanInfo->readRecorder.totalRows;
|
||||
pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
|
||||
|
||||
|
@ -616,6 +620,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
|
|||
pInfo->scanFlag = MAIN_SCAN;
|
||||
pInfo->pColMatchInfo = pColList;
|
||||
pInfo->currentGroupId = -1;
|
||||
pInfo->assignBlockUid = pTableScanNode->assignBlockUid;
|
||||
|
||||
pOperator->name = "TableScanOperator"; // for debug purpose
|
||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
|
||||
|
|
|
@ -1522,6 +1522,7 @@ static const char* jkTableScanPhysiPlanWatermark = "Watermark";
|
|||
static const char* jkTableScanPhysiPlanIgnoreExpired = "IgnoreExpired";
|
||||
static const char* jkTableScanPhysiPlanGroupTags = "GroupTags";
|
||||
static const char* jkTableScanPhysiPlanGroupSort = "GroupSort";
|
||||
static const char* jkTableScanPhysiPlanAssignBlockUid = "AssignBlockUid";
|
||||
|
||||
static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) {
|
||||
const STableScanPhysiNode* pNode = (const STableScanPhysiNode*)pObj;
|
||||
|
@ -1578,6 +1579,9 @@ static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddBoolToObject(pJson, jkTableScanPhysiPlanGroupSort, pNode->groupSort);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddBoolToObject(pJson, jkTableScanPhysiPlanAssignBlockUid, pNode->assignBlockUid);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
@ -1637,6 +1641,9 @@ static int32_t jsonToPhysiTableScanNode(const SJson* pJson, void* pObj) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetBoolValue(pJson, jkTableScanPhysiPlanGroupSort, &pNode->groupSort);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetBoolValue(pJson, jkTableScanPhysiPlanAssignBlockUid, &pNode->assignBlockUid);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
@ -4525,7 +4532,6 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
|
|||
case QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN:
|
||||
return jsonToPhysiScanNode(pJson, pObj);
|
||||
case QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN:
|
||||
|
||||
return jsonToPhysiLastRowScanNode(pJson, pObj);
|
||||
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN:
|
||||
case QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN:
|
||||
|
|
|
@ -132,7 +132,7 @@ print ===> write 100 records
|
|||
$N = 100
|
||||
$count = 0
|
||||
while $count < $N
|
||||
$ms = 1591200000000 + $count
|
||||
$ms = 1658924000000 + $count
|
||||
sql insert into ct1 values( $ms , $count , 2.1, 3.1)
|
||||
$count = $count + 1
|
||||
endw
|
||||
|
@ -149,7 +149,7 @@ system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
|||
system sh/exec.sh -n dnode2 -s stop -x SIGINT
|
||||
system sh/exec.sh -n dnode3 -s stop -x SIGINT
|
||||
|
||||
|
||||
sleep 3000
|
||||
|
||||
########################################################
|
||||
print ===> start dnode1 dnode2 dnode3 dnode4
|
||||
|
|
Loading…
Reference in New Issue