diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index e516bddac1..faee6cc2fa 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1713,7 +1713,7 @@ void blockDebugShowDataBlocks(const SArray* dataBlocks, const char* flag) { size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock); int32_t rows = pDataBlock->info.rows; - printf("%s |block type %d |child id %d|group id %zX\n", flag, (int32_t)pDataBlock->info.type, + printf("%s |block type %d |child id %d|group id %" PRIu64 "\n", flag, (int32_t)pDataBlock->info.type, pDataBlock->info.childId, pDataBlock->info.groupId); for (int32_t j = 0; j < rows; j++) { printf("%s |", flag); diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 4b29b13abd..3505711cd0 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -599,14 +599,14 @@ static int32_t tdRSmaFetchAndSubmitResult(SRSmaInfoItem *pItem, STSchema *pTSche SSubmitReq *pReq = NULL; // TODO: the schema update should be handled if (buildSubmitReqFromDataBlock(&pReq, pResult, pTSchema, SMA_VID(pSma), suid) < 0) { - smaError("vgId:%d, build submit req for rsma table %" PRIi64 "l evel %" PRIi8 " failed since %s", SMA_VID(pSma), + smaError("vgId:%d, build submit req for rsma stable %" PRIi64 " level %" PRIi8 " failed since %s", SMA_VID(pSma), suid, pItem->level, terrstr()); goto _err; } if (pReq && tdProcessSubmitReq(sinkTsdb, output->info.version, pReq) < 0) { taosMemoryFreeClear(pReq); - smaError("vgId:%d, process submit req for rsma table %" PRIi64 " level %" PRIi8 " failed since %s", + smaError("vgId:%d, process submit req for rsma stable %" PRIi64 " level %" PRIi8 " failed since %s", SMA_VID(pSma), suid, pItem->level, terrstr()); goto _err; } diff --git a/source/dnode/vnode/src/sma/smaSnapshot.c b/source/dnode/vnode/src/sma/smaSnapshot.c new file mode 100644 index 0000000000..21dfd8a32d --- /dev/null +++ b/source/dnode/vnode/src/sma/smaSnapshot.c @@ -0,0 +1,288 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "sma.h" + +static int32_t rsmaSnapReadQTaskInfo(SRsmaSnapReader* pReader, uint8_t** ppData); +static int32_t rsmaSnapWriteQTaskInfo(SRsmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData); + +// SRsmaSnapReader ======================================== +struct SRsmaSnapReader { + SSma* pSma; + int64_t sver; + int64_t ever; + + // for data file + int8_t rsmaDataDone[TSDB_RETENTION_L2]; + STsdbSnapReader* pDataReader[TSDB_RETENTION_L2]; + + // for qtaskinfo file + int8_t qTaskDone; + SQTaskFReader* pQTaskFReader; +}; + +int32_t rsmaSnapReaderOpen(SSma* pSma, int64_t sver, int64_t ever, SRsmaSnapReader** ppReader) { + int32_t code = 0; + SRsmaSnapReader* pReader = NULL; + + // alloc + pReader = (SRsmaSnapReader*)taosMemoryCalloc(1, sizeof(*pReader)); + if (pReader == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + pReader->pSma = pSma; + pReader->sver = sver; + pReader->ever = ever; + + for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { + if (pSma->pRSmaTsdb[i]) { + code = tsdbSnapReaderOpen(pSma->pRSmaTsdb[i], sver, ever, &pReader->pDataReader[i]); + if (code < 0) { + goto _err; + } + } + } + *ppReader = pReader; + smaInfo("vgId:%d vnode snapshot rsma reader opened succeed", SMA_VID(pSma)); + return TSDB_CODE_SUCCESS; +_err: + smaError("vgId:%d vnode snapshot rsma reader opened failed since %s", SMA_VID(pSma), tstrerror(code)); + return TSDB_CODE_FAILED; +} + +static int32_t rsmaSnapReadQTaskInfo(SRsmaSnapReader* pReader, uint8_t** ppData) { + int32_t code = 0; + SSma* pSma = pReader->pSma; + +_exit: + smaInfo("vgId:%d vnode snapshot rsma read qtaskinfo succeed", SMA_VID(pSma)); + return code; + +_err: + smaError("vgId:%d vnode snapshot rsma read qtaskinfo failed since %s", SMA_VID(pSma), tstrerror(code)); + return code; +} + +int32_t rsmaSnapRead(SRsmaSnapReader* pReader, uint8_t** ppData) { + int32_t code = 0; + + *ppData = NULL; + + smaInfo("vgId:%d vnode snapshot rsma read entry", SMA_VID(pReader->pSma)); + // read rsma1/rsma2 file + for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { + STsdbSnapReader* pTsdbSnapReader = pReader->pDataReader[i]; + if (!pTsdbSnapReader) { + continue; + } + if (!pReader->rsmaDataDone[i]) { + smaInfo("vgId:%d vnode snapshot rsma read level %d not done", SMA_VID(pReader->pSma), i); + code = tsdbSnapRead(pTsdbSnapReader, ppData); + if (code) { + goto _err; + } else { + if (*ppData) { + goto _exit; + } else { + pReader->rsmaDataDone[i] = 1; + } + } + } else { + smaInfo("vgId:%d vnode snapshot rsma read level %d is done", SMA_VID(pReader->pSma), i); + } + } + + // read qtaskinfo file + if (!pReader->qTaskDone) { + code = rsmaSnapReadQTaskInfo(pReader, ppData); + if (code) { + goto _err; + } else { + if (*ppData) { + goto _exit; + } else { + pReader->qTaskDone = 1; + } + } + } + +_exit: + smaInfo("vgId:%d vnode snapshot rsma read succeed", SMA_VID(pReader->pSma)); + return code; + +_err: + smaError("vgId:%d vnode snapshot rsma read failed since %s", SMA_VID(pReader->pSma), tstrerror(code)); + return code; +} + +int32_t rsmaSnapReaderClose(SRsmaSnapReader** ppReader) { + int32_t code = 0; + SRsmaSnapReader* pReader = *ppReader; + + for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { + if (pReader->pDataReader[i]) { + tsdbSnapReaderClose(&pReader->pDataReader[i]); + } + } + + if (pReader->pQTaskFReader) { + // TODO: close for qtaskinfo + smaInfo("vgId:%d vnode snapshot rsma reader closed for qTaskInfo", SMA_VID(pReader->pSma)); + } + + + smaInfo("vgId:%d vnode snapshot rsma reader closed", SMA_VID(pReader->pSma)); + + taosMemoryFreeClear(*ppReader); + return code; +} + +// SRsmaSnapWriter ======================================== +struct SRsmaSnapWriter { + SSma* pSma; + int64_t sver; + int64_t ever; + + // config + int64_t commitID; + + // for data file + STsdbSnapWriter* pDataWriter[TSDB_RETENTION_L2]; + + // for qtaskinfo file + SQTaskFReader* pQTaskFReader; + SQTaskFWriter* pQTaskFWriter; +}; + +int32_t rsmaSnapWriterOpen(SSma* pSma, int64_t sver, int64_t ever, SRsmaSnapWriter** ppWriter) { + int32_t code = 0; + SRsmaSnapWriter* pWriter = NULL; + + // alloc + pWriter = (SRsmaSnapWriter*)taosMemoryCalloc(1, sizeof(*pWriter)); + if (pWriter == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + pWriter->pSma = pSma; + pWriter->sver = sver; + pWriter->ever = ever; + + for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { + if (pSma->pRSmaTsdb[i]) { + code = tsdbSnapWriterOpen(pSma->pRSmaTsdb[i], sver, ever, &pWriter->pDataWriter[i]); + if (code < 0) { + goto _err; + } + } + } + + // qtaskinfo + // TODO + + *ppWriter = pWriter; + + smaInfo("vgId:%d rsma snapshot writer open succeed", TD_VID(pSma->pVnode)); + return code; + +_err: + smaError("vgId:%d rsma snapshot writer open failed since %s", TD_VID(pSma->pVnode), tstrerror(code)); + *ppWriter = NULL; + return code; +} + +int32_t rsmaSnapWriterClose(SRsmaSnapWriter** ppWriter, int8_t rollback) { + int32_t code = 0; + SRsmaSnapWriter* pWriter = *ppWriter; + + if (rollback) { + ASSERT(0); + // code = tsdbFSRollback(pWriter->pTsdb->pFS); + // if (code) goto _err; + } else { + for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { + if (pWriter->pDataWriter[i]) { + code = tsdbSnapWriterClose(&pWriter->pDataWriter[i], rollback); + if (code) goto _err; + } + } + } + + taosMemoryFree(pWriter); + *ppWriter = NULL; + + smaInfo("vgId:%d vnode snapshot rsma writer close succeed", SMA_VID(pWriter->pSma)); + return code; + +_err: + smaError("vgId:%d vnode snapshot rsma writer close failed since %s", SMA_VID(pWriter->pSma), tstrerror(code)); + return code; +} + +int32_t rsmaSnapWrite(SRsmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData) { + int32_t code = 0; + SSnapDataHdr* pHdr = (SSnapDataHdr*)pData; + + // rsma1/rsma2 + if (pHdr->type == SNAP_DATA_RSMA1) { + pHdr->type = SNAP_DATA_TSDB; + code = tsdbSnapWrite(pWriter->pDataWriter[0], pData, nData); + } else if (pHdr->type == SNAP_DATA_RSMA2) { + pHdr->type = SNAP_DATA_TSDB; + code = tsdbSnapWrite(pWriter->pDataWriter[1], pData, nData); + } else if (pHdr->type == SNAP_DATA_QTASK) { + code = rsmaSnapWriteQTaskInfo(pWriter, pData, nData); + } + if (code < 0) goto _err; + +_exit: + smaInfo("vgId:%d rsma snapshot write for data %" PRIi8 " succeed", SMA_VID(pWriter->pSma), pHdr->type); + return code; + +_err: + smaError("vgId:%d rsma snapshot write for data %" PRIi8 " failed since %s", SMA_VID(pWriter->pSma), pHdr->type, + tstrerror(code)); + return code; +} + +static int32_t rsmaSnapWriteQTaskInfo(SRsmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData) { + int32_t code = 0; + + if (pWriter->pQTaskFWriter == NULL) { + // SDelFile* pDelFile = pWriter->fs.pDelFile; + + // // reader + // if (pDelFile) { + // code = tsdbDelFReaderOpen(&pWriter->pDelFReader, pDelFile, pTsdb, NULL); + // if (code) goto _err; + + // code = tsdbReadDelIdx(pWriter->pDelFReader, pWriter->aDelIdxR, NULL); + // if (code) goto _err; + // } + + // // writer + // SDelFile delFile = {.commitID = pWriter->commitID, .offset = 0, .size = 0}; + // code = tsdbDelFWriterOpen(&pWriter->pDelFWriter, &delFile, pTsdb); + // if (code) goto _err; + } + smaInfo("vgId:%d vnode snapshot rsma write qtaskinfo succeed", SMA_VID(pWriter->pSma)); +_exit: + return code; + +_err: + smaError("vgId:%d vnode snapshot rsma write qtaskinfo failed since %s", SMA_VID(pWriter->pSma), tstrerror(code)); + return code; +} diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index d5486d62b1..a80c2c2fea 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -318,6 +318,7 @@ typedef struct STableScanInfo { int32_t currentTable; int8_t scanMode; int8_t noTable; + int8_t assignBlockUid; } STableScanInfo; typedef struct STableMergeScanInfo { diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 1618bffb09..7f88e628c1 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -264,6 +264,12 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo } taosArrayPush(pTaskInfo->tableqinfoList.pTableList, &keyInfo); + if (pTaskInfo->tableqinfoList.map == NULL) { + pTaskInfo->tableqinfoList.map = + taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); + } + + taosHashPut(pTaskInfo->tableqinfoList.map, uid, sizeof(uid), &keyInfo.groupId, sizeof(keyInfo.groupId)); } if (keyBuf != NULL) { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index f07256e88e..4a2f57d628 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -408,6 +408,10 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { pBlock->info.groupId = *groupId; } + if (pTableScanInfo->assignBlockUid) { + pBlock->info.groupId = pBlock->info.uid; + } + pOperator->resultInfo.totalRows = pTableScanInfo->readRecorder.totalRows; pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; @@ -616,6 +620,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, pInfo->scanFlag = MAIN_SCAN; pInfo->pColMatchInfo = pColList; pInfo->currentGroupId = -1; + pInfo->assignBlockUid = pTableScanNode->assignBlockUid; pOperator->name = "TableScanOperator"; // for debug purpose pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN; diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 1c9c097cba..af3f0c242b 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -1522,6 +1522,7 @@ static const char* jkTableScanPhysiPlanWatermark = "Watermark"; static const char* jkTableScanPhysiPlanIgnoreExpired = "IgnoreExpired"; static const char* jkTableScanPhysiPlanGroupTags = "GroupTags"; static const char* jkTableScanPhysiPlanGroupSort = "GroupSort"; +static const char* jkTableScanPhysiPlanAssignBlockUid = "AssignBlockUid"; static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) { const STableScanPhysiNode* pNode = (const STableScanPhysiNode*)pObj; @@ -1578,6 +1579,9 @@ static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddBoolToObject(pJson, jkTableScanPhysiPlanGroupSort, pNode->groupSort); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddBoolToObject(pJson, jkTableScanPhysiPlanAssignBlockUid, pNode->assignBlockUid); + } return code; } @@ -1637,6 +1641,9 @@ static int32_t jsonToPhysiTableScanNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = tjsonGetBoolValue(pJson, jkTableScanPhysiPlanGroupSort, &pNode->groupSort); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetBoolValue(pJson, jkTableScanPhysiPlanAssignBlockUid, &pNode->assignBlockUid); + } return code; } @@ -4525,7 +4532,6 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) { case QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN: return jsonToPhysiScanNode(pJson, pObj); case QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN: - return jsonToPhysiLastRowScanNode(pJson, pObj); case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN: case QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN: diff --git a/tests/script/tsim/sync/vnodesnapshot-test.sim b/tests/script/tsim/sync/vnodesnapshot-test.sim index c3d8a243d3..9f4cd37b6d 100644 --- a/tests/script/tsim/sync/vnodesnapshot-test.sim +++ b/tests/script/tsim/sync/vnodesnapshot-test.sim @@ -132,7 +132,7 @@ print ===> write 100 records $N = 100 $count = 0 while $count < $N - $ms = 1591200000000 + $count + $ms = 1658924000000 + $count sql insert into ct1 values( $ms , $count , 2.1, 3.1) $count = $count + 1 endw @@ -149,7 +149,7 @@ system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode2 -s stop -x SIGINT system sh/exec.sh -n dnode3 -s stop -x SIGINT - +sleep 3000 ######################################################## print ===> start dnode1 dnode2 dnode3 dnode4