From 5a0ae19cbc619b79ba32cc92cfb8f4f902a2f35f Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Sun, 1 May 2022 10:50:23 +0800 Subject: [PATCH] feat: submit req msg iter refactor --- include/common/tmsg.h | 19 ++++----- source/common/src/tmsg.c | 23 +++++------ source/dnode/vnode/src/inc/tsdb.h | 2 +- source/dnode/vnode/src/inc/vnodeInt.h | 2 +- source/dnode/vnode/src/tq/tqRead.c | 12 +++--- source/dnode/vnode/src/tsdb/tsdbMemTable.c | 48 +++++----------------- source/dnode/vnode/src/tsdb/tsdbOpen.c | 2 +- source/dnode/vnode/src/tsdb/tsdbRead.c | 17 ++++++-- source/dnode/vnode/src/tsdb/tsdbSma.c | 12 +++--- source/dnode/vnode/src/tsdb/tsdbWrite.c | 20 ++++----- source/dnode/vnode/src/vnd/vnodeOpen.c | 14 +++---- 11 files changed, 74 insertions(+), 97 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 7e049b1dd9..d5d052c80a 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -240,21 +240,18 @@ typedef struct { // head of SSubmitBlk const void* pMsg; } SSubmitMsgIter; - +#if 0 +int32_t tInitSubmitMsgIterOrigin(const SSubmitReq* pMsg, SSubmitMsgIter* pIter); +int32_t tGetSubmitMsgNextOrigin(SSubmitMsgIter* pIter, SSubmitBlk** pPBlock); +int32_t tInitSubmitBlkIterOrigin(SSubmitBlk* pBlock, SSubmitBlkIter* pIter); +STSRow* tGetSubmitBlkNextOrigin(SSubmitBlkIter* pIter); +#endif +// TODO: KEEP one suite of iterator API finally. int32_t tInitSubmitMsgIter(const SSubmitReq* pMsg, SSubmitMsgIter* pIter); int32_t tGetSubmitMsgNext(SSubmitMsgIter* pIter, SSubmitBlk** pPBlock); -int32_t tInitSubmitBlkIter(SSubmitBlk* pBlock, SSubmitBlkIter* pIter); +int32_t tInitSubmitBlkIter(SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock, SSubmitBlkIter* pIter); STSRow* tGetSubmitBlkNext(SSubmitBlkIter* pIter); -// TODO: KEEP one suite of iterator API finally. -// 1) use tInitSubmitMsgIterEx firstly as not decrease the merge conflicts -// 2) replace tInitSubmitMsgIterEx with tInitSubmitMsgIter later -// 3) finally, rename tInitSubmitMsgIterEx to tInitSubmitMsgIter -int32_t tInitSubmitMsgIterEx(const SSubmitReq* pMsg, SSubmitMsgIter* pIter); -int32_t tGetSubmitMsgNextEx(SSubmitMsgIter* pIter, SSubmitBlk** pPBlock); -int32_t tInitSubmitBlkIterEx(SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock, SSubmitBlkIter* pIter); -STSRow* tGetSubmitBlkNextEx(SSubmitBlkIter* pIter); - typedef struct { int32_t index; // index of failed block in submit blocks int32_t vnode; // vnode index of failed block diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 874e09b403..469d336cdd 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -27,8 +27,8 @@ #define TD_MSG_DICT_ #undef TD_MSG_SEG_CODE_ #include "tmsgdef.h" - -int32_t tInitSubmitMsgIter(const SSubmitReq *pMsg, SSubmitMsgIter *pIter) { +#if 0 +int32_t tInitSubmitMsgIterOrigin(const SSubmitReq *pMsg, SSubmitMsgIter *pIter) { if (pMsg == NULL) { terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP; return -1; @@ -46,7 +46,7 @@ int32_t tInitSubmitMsgIter(const SSubmitReq *pMsg, SSubmitMsgIter *pIter) { return 0; } -int32_t tGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock) { +int32_t tGetSubmitMsgNextOrigin(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock) { ASSERT(pIter->len >= 0); if (pIter->len == 0) { @@ -72,7 +72,7 @@ int32_t tGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock) { return 0; } -int32_t tInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter) { +int32_t tInitSubmitBlkIterOrigin(SSubmitBlk *pBlock, SSubmitBlkIter *pIter) { if (pBlock->dataLen <= 0) return -1; pIter->totalLen = pBlock->dataLen; pIter->len = 0; @@ -80,7 +80,7 @@ int32_t tInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter) { return 0; } -STSRow *tGetSubmitBlkNext(SSubmitBlkIter *pIter) { +STSRow *tGetSubmitBlkNextOrigin(SSubmitBlkIter *pIter) { STSRow *row = pIter->row; if (pIter->len >= pIter->totalLen) { @@ -93,13 +93,10 @@ STSRow *tGetSubmitBlkNext(SSubmitBlkIter *pIter) { return row; } } +#endif // TODO: KEEP one suite of iterator API finally. -// 1) use tInitSubmitMsgIterEx firstly as not decrease the merge conflicts -// 2) replace tInitSubmitMsgIterEx with tInitSubmitMsgIter later -// 3) finally, rename tInitSubmitMsgIterEx to tInitSubmitMsgIter - -int32_t tInitSubmitMsgIterEx(const SSubmitReq *pMsg, SSubmitMsgIter *pIter) { +int32_t tInitSubmitMsgIter(const SSubmitReq *pMsg, SSubmitMsgIter *pIter) { if (pMsg == NULL) { terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP; return -1; @@ -117,7 +114,7 @@ int32_t tInitSubmitMsgIterEx(const SSubmitReq *pMsg, SSubmitMsgIter *pIter) { return 0; } -int32_t tGetSubmitMsgNextEx(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock) { +int32_t tGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock) { ASSERT(pIter->len >= 0); if (pIter->len == 0) { @@ -152,7 +149,7 @@ int32_t tGetSubmitMsgNextEx(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock) { return 0; } -int32_t tInitSubmitBlkIterEx(SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlock, SSubmitBlkIter *pIter) { +int32_t tInitSubmitBlkIter(SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlock, SSubmitBlkIter *pIter) { if (pMsgIter->dataLen <= 0) return -1; pIter->totalLen = pMsgIter->dataLen; pIter->len = 0; @@ -160,7 +157,7 @@ int32_t tInitSubmitBlkIterEx(SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlock, SSubm return 0; } -STSRow *tGetSubmitBlkNextEx(SSubmitBlkIter *pIter) { +STSRow *tGetSubmitBlkNext(SSubmitBlkIter *pIter) { STSRow *row = pIter->row; if (pIter->len >= pIter->totalLen) { diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 9ac8434949..427ce8c1b6 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -40,7 +40,7 @@ typedef struct STable STable; int tsdbMemTableCreate(STsdb *pTsdb, STsdbMemTable **ppMemTable); void tsdbMemTableDestroy(STsdb *pTsdb, STsdbMemTable *pMemTable); -int tsdbInsertTableData(STsdb *pTsdb, SSubmitBlk *pBlock, int32_t *pAffectedRows); +int tsdbInsertTableData(STsdb *pTsdb, SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlock, int32_t *pAffectedRows); int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, SDataCols *pCols, TKEY *filterKeys, int nFilterKeys, bool keepDup, SMergeInfo *pMergeInfo); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 5930dcaf96..92923d4876 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -188,7 +188,7 @@ struct STbUidStore { #define TD_VID(PVNODE) (PVNODE)->config.vgId -static FORCE_INLINE bool tsdbIsRollup(SVnode* pVnode) { +static FORCE_INLINE bool vnodeIsRollup(SVnode* pVnode) { SRetention* pRetention = &(pVnode->config.tsdbCfg.retentions[0]); return (pRetention->freq > 0 && pRetention->keep > 0); } diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 89ec55cca1..511d57ed58 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -37,9 +37,9 @@ int32_t tqReadHandleSetMsg(STqReadHandle* pReadHandle, SSubmitReq* pMsg, int64_t // pMsg->numOfBlocks = htonl(pMsg->numOfBlocks); // iterate and convert - if (tInitSubmitMsgIterEx(pMsg, &pReadHandle->msgIter) < 0) return -1; + if (tInitSubmitMsgIter(pMsg, &pReadHandle->msgIter) < 0) return -1; while (true) { - if (tGetSubmitMsgNextEx(&pReadHandle->msgIter, &pReadHandle->pBlock) < 0) return -1; + if (tGetSubmitMsgNext(&pReadHandle->msgIter, &pReadHandle->pBlock) < 0) return -1; if (pReadHandle->pBlock == NULL) break; // pReadHandle->pBlock->uid = htobe64(pReadHandle->pBlock->uid); @@ -50,7 +50,7 @@ int32_t tqReadHandleSetMsg(STqReadHandle* pReadHandle, SSubmitReq* pMsg, int64_t // pReadHandle->pBlock->numOfRows = htons(pReadHandle->pBlock->numOfRows); } - if (tInitSubmitMsgIterEx(pMsg, &pReadHandle->msgIter) < 0) return -1; + if (tInitSubmitMsgIter(pMsg, &pReadHandle->msgIter) < 0) return -1; pReadHandle->ver = ver; memset(&pReadHandle->blkIter, 0, sizeof(SSubmitBlkIter)); return 0; @@ -58,7 +58,7 @@ int32_t tqReadHandleSetMsg(STqReadHandle* pReadHandle, SSubmitReq* pMsg, int64_t bool tqNextDataBlock(STqReadHandle* pHandle) { while (1) { - if (tGetSubmitMsgNextEx(&pHandle->msgIter, &pHandle->pBlock) < 0) { + if (tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock) < 0) { return false; } if (pHandle->pBlock == NULL) return false; @@ -169,8 +169,8 @@ int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* p tdSTSRowIterInit(&iter, pTschema); STSRow* row; int32_t curRow = 0; - tInitSubmitBlkIterEx(&pHandle->msgIter, pHandle->pBlock, &pHandle->blkIter); - while ((row = tGetSubmitBlkNextEx(&pHandle->blkIter)) != NULL) { + tInitSubmitBlkIter(&pHandle->msgIter, pHandle->pBlock, &pHandle->blkIter); + while ((row = tGetSubmitBlkNext(&pHandle->blkIter)) != NULL) { tdSTSRowIterReset(&iter, row); // get all wanted col of that block for (int32_t i = 0; i < colActual; i++) { diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index 48e672d9bc..ff4d99f510 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -190,35 +190,7 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey return 0; } -int32_t tdScanAndConvertSubmitMsg(SSubmitReq *pMsg) { - ASSERT(pMsg != NULL); - SSubmitMsgIter msgIter = {0}; - SSubmitBlk *pBlock = NULL; - SSubmitBlkIter blkIter = {0}; - STSRow *row = NULL; - - terrno = TSDB_CODE_SUCCESS; - pMsg->length = htonl(pMsg->length); - pMsg->numOfBlocks = htonl(pMsg->numOfBlocks); - - if (tInitSubmitMsgIter(pMsg, &msgIter) < 0) return -1; - while (true) { - if (tGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1; - if (pBlock == NULL) break; - - pBlock->uid = htobe64(pBlock->uid); - pBlock->suid = htobe64(pBlock->suid); - pBlock->sversion = htonl(pBlock->sversion); - pBlock->dataLen = htonl(pBlock->dataLen); - pBlock->schemaLen = htonl(pBlock->schemaLen); - pBlock->numOfRows = htons(pBlock->numOfRows); - } - - if (terrno != TSDB_CODE_SUCCESS) return -1; - return 0; -} - -int tsdbInsertTableData(STsdb *pTsdb, SSubmitBlk *pBlock, int32_t *pAffectedRows) { +int tsdbInsertTableData(STsdb *pTsdb, SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlock, int32_t *pAffectedRows) { // STsdbMeta *pMeta = pRepo->tsdbMeta; // int32_t points = 0; // STable *pTable = NULL; @@ -232,15 +204,15 @@ int tsdbInsertTableData(STsdb *pTsdb, SSubmitBlk *pBlock, int32_t *pAffectedRows SSubmitBlk *pBlkCopy; // create container is nedd - tptr = taosHashGet(pMemTable->pHashIdx, &(pBlock->uid), sizeof(pBlock->uid)); + tptr = taosHashGet(pMemTable->pHashIdx, &(pMsgIter->uid), sizeof(pMsgIter->uid)); if (tptr == NULL) { - pTbData = tsdbNewTbData(pBlock->uid); + pTbData = tsdbNewTbData(pMsgIter->uid); if (pTbData == NULL) { return -1; } // Put into hash - taosHashPut(pMemTable->pHashIdx, &(pBlock->uid), sizeof(pBlock->uid), &(pTbData), sizeof(pTbData)); + taosHashPut(pMemTable->pHashIdx, &(pMsgIter->uid), sizeof(pMsgIter->uid), &(pTbData), sizeof(pTbData)); // Put into skiplist tSkipListPut(pMemTable->pSlIdx, pTbData); @@ -249,10 +221,10 @@ int tsdbInsertTableData(STsdb *pTsdb, SSubmitBlk *pBlock, int32_t *pAffectedRows } // copy data to buffer pool - pBlkCopy = (SSubmitBlk *)vnodeBufPoolMalloc(pTsdb->mem->pPool, pBlock->dataLen + sizeof(*pBlock)); - memcpy(pBlkCopy, pBlock, pBlock->dataLen + sizeof(*pBlock)); + pBlkCopy = (SSubmitBlk *)vnodeBufPoolMalloc(pTsdb->mem->pPool, pMsgIter->dataLen + sizeof(*pBlock)); + memcpy(pBlkCopy, pBlock, pMsgIter->dataLen + sizeof(*pBlock)); - tInitSubmitBlkIter(pBlkCopy, &blkIter); + tInitSubmitBlkIter(pMsgIter, pBlkCopy, &blkIter); if (blkIter.row == NULL) return 0; keyMin = TD_ROW_KEY(blkIter.row); @@ -261,15 +233,15 @@ int tsdbInsertTableData(STsdb *pTsdb, SSubmitBlk *pBlock, int32_t *pAffectedRows // Set statistics keyMax = TD_ROW_KEY(blkIter.row); - pTbData->nrows += pBlock->numOfRows; + pTbData->nrows += pMsgIter->numOfRows; if (pTbData->keyMin > keyMin) pTbData->keyMin = keyMin; if (pTbData->keyMax < keyMax) pTbData->keyMax = keyMax; - pMemTable->nRow += pBlock->numOfRows; + pMemTable->nRow += pMsgIter->numOfRows; if (pMemTable->keyMin > keyMin) pMemTable->keyMin = keyMin; if (pMemTable->keyMax < keyMax) pMemTable->keyMax = keyMax; - (*pAffectedRows) += pBlock->numOfRows; + (*pAffectedRows) += pMsgIter->numOfRows; return 0; } diff --git a/source/dnode/vnode/src/tsdb/tsdbOpen.c b/source/dnode/vnode/src/tsdb/tsdbOpen.c index 0827ba6eab..da363d9bc9 100644 --- a/source/dnode/vnode/src/tsdb/tsdbOpen.c +++ b/source/dnode/vnode/src/tsdb/tsdbOpen.c @@ -67,7 +67,7 @@ int tsdbOpenImpl(SVnode *pVnode, int8_t type, STsdb **ppTsdb, const char *dir) { goto _err; } - tsdbDebug("vgId: %d tsdb is opened for %s", TD_VID(pVnode), pTsdb->path); + tsdbDebug("vgId:%d tsdb is opened for %s", TD_VID(pVnode), pTsdb->path); *ppTsdb = pTsdb; return 0; diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 7d3f7cadc9..2b8558c5ac 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -351,14 +351,25 @@ static void setQueryTimewindow(STsdbReadHandle* pTsdbReadHandle, SQueryTableData } } +static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions) { + if (vnodeIsRollup(pVnode)) { + // for(int32_t i=0; i< TSDB_; ) { + + // } + } + return pVnode->pTsdb; +} + static STsdbReadHandle* tsdbQueryTablesImpl(SVnode* pVnode, SQueryTableDataCond* pCond, uint64_t qId, uint64_t taskId) { STsdbReadHandle* pReadHandle = taosMemoryCalloc(1, sizeof(STsdbReadHandle)); if (pReadHandle == NULL) { goto _end; } + STsdb* pTsdb = getTsdbByRetentions(pVnode, pCond->twindow.skey, pVnode->config.tsdbCfg.retentions); + pReadHandle->order = pCond->order; - pReadHandle->pTsdb = pVnode->pTsdb; + pReadHandle->pTsdb = pTsdb; pReadHandle->type = TSDB_QUERY_TYPE_ALL; pReadHandle->cur.fid = INT32_MIN; pReadHandle->cur.win = TSWINDOW_INITIALIZER; @@ -376,7 +387,7 @@ static STsdbReadHandle* tsdbQueryTablesImpl(SVnode* pVnode, SQueryTableDataCond* snprintf(buf, tListLen(buf), "TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, qId); pReadHandle->idStr = strdup(buf); - if (tsdbInitReadH(&pReadHandle->rhelper, (STsdb*)pVnode->pTsdb) != 0) { + if (tsdbInitReadH(&pReadHandle->rhelper, pReadHandle->pTsdb) != 0) { goto _end; } @@ -413,7 +424,7 @@ static STsdbReadHandle* tsdbQueryTablesImpl(SVnode* pVnode, SQueryTableDataCond* pReadHandle->suppInfo.plist = taosMemoryCalloc(taosArrayGetSize(pReadHandle->suppInfo.defaultLoadColumn), POINTER_BYTES); } - pReadHandle->pDataCols = tdNewDataCols(1000, pReadHandle->pTsdb->pVnode->config.tsdbCfg.maxRows); + pReadHandle->pDataCols = tdNewDataCols(1000, pVnode->config.tsdbCfg.maxRows); if (pReadHandle->pDataCols == NULL) { tsdbError("%p failed to malloc buf for pDataCols, %s", pReadHandle, pReadHandle->idStr); terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; diff --git a/source/dnode/vnode/src/tsdb/tsdbSma.c b/source/dnode/vnode/src/tsdb/tsdbSma.c index d131cef8a2..380e376e24 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSma.c +++ b/source/dnode/vnode/src/tsdb/tsdbSma.c @@ -702,25 +702,25 @@ int32_t tsdbUpdateExpiredWindowImpl(STsdb *pTsdb, SSubmitReq *pMsg, int64_t vers SInterval interval = {0}; TSKEY lastWinSKey = INT64_MIN; - if (tInitSubmitMsgIterEx(pMsg, &msgIter) != TSDB_CODE_SUCCESS) { + if (tInitSubmitMsgIter(pMsg, &msgIter) != TSDB_CODE_SUCCESS) { return TSDB_CODE_FAILED; } while (true) { - tGetSubmitMsgNextEx(&msgIter, &pBlock); + tGetSubmitMsgNext(&msgIter, &pBlock); if (!pBlock) break; STSmaWrapper *pSW = NULL; STSma *pTSma = NULL; SSubmitBlkIter blkIter = {0}; - if (tInitSubmitBlkIterEx(&msgIter, pBlock, &blkIter) != TSDB_CODE_SUCCESS) { + if (tInitSubmitBlkIter(&msgIter, pBlock, &blkIter) != TSDB_CODE_SUCCESS) { pSW = tdFreeTSmaWrapper(pSW); break; } while (true) { - STSRow *row = tGetSubmitBlkNextEx(&blkIter); + STSRow *row = tGetSubmitBlkNext(&blkIter); if (!row) { tdFreeTSmaWrapper(pSW); break; @@ -1966,9 +1966,9 @@ static int32_t tsdbFetchSubmitReqSuids(SSubmitReq *pMsg, STbUidStore *pStore) { terrno = TSDB_CODE_SUCCESS; - if (tInitSubmitMsgIterEx(pMsg, &msgIter) < 0) return -1; + if (tInitSubmitMsgIter(pMsg, &msgIter) < 0) return -1; while (true) { - if (tGetSubmitMsgNextEx(&msgIter, &pBlock) < 0) return -1; + if (tGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1; if (!pBlock) break; tsdbUidStorePut(pStore, msgIter.suid, NULL); diff --git a/source/dnode/vnode/src/tsdb/tsdbWrite.c b/source/dnode/vnode/src/tsdb/tsdbWrite.c index 9fb6aad472..88b637bc24 100644 --- a/source/dnode/vnode/src/tsdb/tsdbWrite.c +++ b/source/dnode/vnode/src/tsdb/tsdbWrite.c @@ -38,11 +38,11 @@ int tsdbInsertData(STsdb *pTsdb, int64_t version, SSubmitReq *pMsg, SSubmitRsp * while (true) { tGetSubmitMsgNext(&msgIter, &pBlock); if (pBlock == NULL) break; - if (tsdbInsertTableData(pTsdb, pBlock, &affectedrows) < 0) { + if (tsdbInsertTableData(pTsdb, &msgIter, pBlock, &affectedrows) < 0) { return -1; } - numOfRows += pBlock->numOfRows; + numOfRows += msgIter.numOfRows; } if (pRsp != NULL) { @@ -66,20 +66,20 @@ static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg) { TSKEY maxKey = now + tsTickPerDay[pCfg->precision] * pCfg->days; terrno = TSDB_CODE_SUCCESS; - pMsg->length = htonl(pMsg->length); - pMsg->numOfBlocks = htonl(pMsg->numOfBlocks); + // pMsg->length = htonl(pMsg->length); + // pMsg->numOfBlocks = htonl(pMsg->numOfBlocks); if (tInitSubmitMsgIter(pMsg, &msgIter) < 0) return -1; while (true) { if (tGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1; if (pBlock == NULL) break; - pBlock->uid = htobe64(pBlock->uid); - pBlock->suid = htobe64(pBlock->suid); - pBlock->sversion = htonl(pBlock->sversion); - pBlock->dataLen = htonl(pBlock->dataLen); - pBlock->schemaLen = htonl(pBlock->schemaLen); - pBlock->numOfRows = htons(pBlock->numOfRows); + // pBlock->uid = htobe64(pBlock->uid); + // pBlock->suid = htobe64(pBlock->suid); + // pBlock->sversion = htonl(pBlock->sversion); + // pBlock->dataLen = htonl(pBlock->dataLen); + // pBlock->schemaLen = htonl(pBlock->schemaLen); + // pBlock->numOfRows = htons(pBlock->numOfRows); #if 0 if (pBlock->tid <= 0 || pBlock->tid >= pMeta->maxTables) { diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index 3737bcfe3b..ae134e6496 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -96,24 +96,24 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { } // open tsdb - if (tsdbIsRollup(pVnode)) { + if (vnodeIsRollup(pVnode)) { if (tsdbOpen(pVnode, TSDB_TYPE_RSMA_L0) < 0) { - vError("vgId: %d failed to open vnode rsma0 since %s", TD_VID(pVnode), tstrerror(terrno)); + vError("vgId:%d failed to open vnode rsma0 since %s", TD_VID(pVnode), tstrerror(terrno)); goto _err; } if (tsdbOpen(pVnode, TSDB_TYPE_RSMA_L1) < 0) { - vError("vgId: %d failed to open vnode rsma1 since %s", TD_VID(pVnode), tstrerror(terrno)); + vError("vgId:%d failed to open vnode rsma1 since %s", TD_VID(pVnode), tstrerror(terrno)); goto _err; } if (tsdbOpen(pVnode, TSDB_TYPE_RSMA_L2) < 0) { - vError("vgId: %d failed to open vnode rsma2 since %s", TD_VID(pVnode), tstrerror(terrno)); + vError("vgId:%d failed to open vnode rsma2 since %s", TD_VID(pVnode), tstrerror(terrno)); goto _err; } } else { if (tsdbOpen(pVnode, TSDB_TYPE_TSDB) < 0) { - vError("vgId: %d failed to open vnode tsdb since %s", TD_VID(pVnode), tstrerror(terrno)); + vError("vgId:%d failed to open vnode tsdb since %s", TD_VID(pVnode), tstrerror(terrno)); goto _err; } } @@ -160,8 +160,8 @@ _err: if (pVnode->pWal) walClose(pVnode->pWal); if (pVnode->pTsdb) tsdbClose(pVnode->pTsdb); if (pVnode->pMeta) metaClose(pVnode->pMeta); - tsdbClose(VND_RSMA1(pVnode)); - tsdbClose(VND_RSMA2(pVnode)); + tsdbClose(VND_RSMA1(pVnode)); + tsdbClose(VND_RSMA2(pVnode)); tsem_destroy(&(pVnode->canCommit)); taosMemoryFree(pVnode); return NULL;