Merge pull request #12081 from taosdata/feature/TD-14481-3.0
feat: submit req msg iter refactor
This commit is contained in:
commit
b6de38015b
|
@ -240,21 +240,18 @@ typedef struct {
|
||||||
// head of SSubmitBlk
|
// head of SSubmitBlk
|
||||||
const void* pMsg;
|
const void* pMsg;
|
||||||
} SSubmitMsgIter;
|
} SSubmitMsgIter;
|
||||||
|
#if 0
|
||||||
|
int32_t tInitSubmitMsgIterOrigin(const SSubmitReq* pMsg, SSubmitMsgIter* pIter);
|
||||||
|
int32_t tGetSubmitMsgNextOrigin(SSubmitMsgIter* pIter, SSubmitBlk** pPBlock);
|
||||||
|
int32_t tInitSubmitBlkIterOrigin(SSubmitBlk* pBlock, SSubmitBlkIter* pIter);
|
||||||
|
STSRow* tGetSubmitBlkNextOrigin(SSubmitBlkIter* pIter);
|
||||||
|
#endif
|
||||||
|
// TODO: KEEP one suite of iterator API finally.
|
||||||
int32_t tInitSubmitMsgIter(const SSubmitReq* pMsg, SSubmitMsgIter* pIter);
|
int32_t tInitSubmitMsgIter(const SSubmitReq* pMsg, SSubmitMsgIter* pIter);
|
||||||
int32_t tGetSubmitMsgNext(SSubmitMsgIter* pIter, SSubmitBlk** pPBlock);
|
int32_t tGetSubmitMsgNext(SSubmitMsgIter* pIter, SSubmitBlk** pPBlock);
|
||||||
int32_t tInitSubmitBlkIter(SSubmitBlk* pBlock, SSubmitBlkIter* pIter);
|
int32_t tInitSubmitBlkIter(SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock, SSubmitBlkIter* pIter);
|
||||||
STSRow* tGetSubmitBlkNext(SSubmitBlkIter* pIter);
|
STSRow* tGetSubmitBlkNext(SSubmitBlkIter* pIter);
|
||||||
|
|
||||||
// TODO: KEEP one suite of iterator API finally.
|
|
||||||
// 1) use tInitSubmitMsgIterEx firstly as not decrease the merge conflicts
|
|
||||||
// 2) replace tInitSubmitMsgIterEx with tInitSubmitMsgIter later
|
|
||||||
// 3) finally, rename tInitSubmitMsgIterEx to tInitSubmitMsgIter
|
|
||||||
int32_t tInitSubmitMsgIterEx(const SSubmitReq* pMsg, SSubmitMsgIter* pIter);
|
|
||||||
int32_t tGetSubmitMsgNextEx(SSubmitMsgIter* pIter, SSubmitBlk** pPBlock);
|
|
||||||
int32_t tInitSubmitBlkIterEx(SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock, SSubmitBlkIter* pIter);
|
|
||||||
STSRow* tGetSubmitBlkNextEx(SSubmitBlkIter* pIter);
|
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t index; // index of failed block in submit blocks
|
int32_t index; // index of failed block in submit blocks
|
||||||
int32_t vnode; // vnode index of failed block
|
int32_t vnode; // vnode index of failed block
|
||||||
|
|
|
@ -27,8 +27,8 @@
|
||||||
#define TD_MSG_DICT_
|
#define TD_MSG_DICT_
|
||||||
#undef TD_MSG_SEG_CODE_
|
#undef TD_MSG_SEG_CODE_
|
||||||
#include "tmsgdef.h"
|
#include "tmsgdef.h"
|
||||||
|
#if 0
|
||||||
int32_t tInitSubmitMsgIter(const SSubmitReq *pMsg, SSubmitMsgIter *pIter) {
|
int32_t tInitSubmitMsgIterOrigin(const SSubmitReq *pMsg, SSubmitMsgIter *pIter) {
|
||||||
if (pMsg == NULL) {
|
if (pMsg == NULL) {
|
||||||
terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP;
|
terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP;
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -46,7 +46,7 @@ int32_t tInitSubmitMsgIter(const SSubmitReq *pMsg, SSubmitMsgIter *pIter) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock) {
|
int32_t tGetSubmitMsgNextOrigin(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock) {
|
||||||
ASSERT(pIter->len >= 0);
|
ASSERT(pIter->len >= 0);
|
||||||
|
|
||||||
if (pIter->len == 0) {
|
if (pIter->len == 0) {
|
||||||
|
@ -72,7 +72,7 @@ int32_t tGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter) {
|
int32_t tInitSubmitBlkIterOrigin(SSubmitBlk *pBlock, SSubmitBlkIter *pIter) {
|
||||||
if (pBlock->dataLen <= 0) return -1;
|
if (pBlock->dataLen <= 0) return -1;
|
||||||
pIter->totalLen = pBlock->dataLen;
|
pIter->totalLen = pBlock->dataLen;
|
||||||
pIter->len = 0;
|
pIter->len = 0;
|
||||||
|
@ -80,7 +80,7 @@ int32_t tInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
STSRow *tGetSubmitBlkNext(SSubmitBlkIter *pIter) {
|
STSRow *tGetSubmitBlkNextOrigin(SSubmitBlkIter *pIter) {
|
||||||
STSRow *row = pIter->row;
|
STSRow *row = pIter->row;
|
||||||
|
|
||||||
if (pIter->len >= pIter->totalLen) {
|
if (pIter->len >= pIter->totalLen) {
|
||||||
|
@ -93,13 +93,10 @@ STSRow *tGetSubmitBlkNext(SSubmitBlkIter *pIter) {
|
||||||
return row;
|
return row;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
// TODO: KEEP one suite of iterator API finally.
|
// TODO: KEEP one suite of iterator API finally.
|
||||||
// 1) use tInitSubmitMsgIterEx firstly as not decrease the merge conflicts
|
int32_t tInitSubmitMsgIter(const SSubmitReq *pMsg, SSubmitMsgIter *pIter) {
|
||||||
// 2) replace tInitSubmitMsgIterEx with tInitSubmitMsgIter later
|
|
||||||
// 3) finally, rename tInitSubmitMsgIterEx to tInitSubmitMsgIter
|
|
||||||
|
|
||||||
int32_t tInitSubmitMsgIterEx(const SSubmitReq *pMsg, SSubmitMsgIter *pIter) {
|
|
||||||
if (pMsg == NULL) {
|
if (pMsg == NULL) {
|
||||||
terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP;
|
terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP;
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -117,7 +114,7 @@ int32_t tInitSubmitMsgIterEx(const SSubmitReq *pMsg, SSubmitMsgIter *pIter) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tGetSubmitMsgNextEx(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock) {
|
int32_t tGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock) {
|
||||||
ASSERT(pIter->len >= 0);
|
ASSERT(pIter->len >= 0);
|
||||||
|
|
||||||
if (pIter->len == 0) {
|
if (pIter->len == 0) {
|
||||||
|
@ -152,7 +149,7 @@ int32_t tGetSubmitMsgNextEx(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tInitSubmitBlkIterEx(SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlock, SSubmitBlkIter *pIter) {
|
int32_t tInitSubmitBlkIter(SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlock, SSubmitBlkIter *pIter) {
|
||||||
if (pMsgIter->dataLen <= 0) return -1;
|
if (pMsgIter->dataLen <= 0) return -1;
|
||||||
pIter->totalLen = pMsgIter->dataLen;
|
pIter->totalLen = pMsgIter->dataLen;
|
||||||
pIter->len = 0;
|
pIter->len = 0;
|
||||||
|
@ -160,7 +157,7 @@ int32_t tInitSubmitBlkIterEx(SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlock, SSubm
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
STSRow *tGetSubmitBlkNextEx(SSubmitBlkIter *pIter) {
|
STSRow *tGetSubmitBlkNext(SSubmitBlkIter *pIter) {
|
||||||
STSRow *row = pIter->row;
|
STSRow *row = pIter->row;
|
||||||
|
|
||||||
if (pIter->len >= pIter->totalLen) {
|
if (pIter->len >= pIter->totalLen) {
|
||||||
|
|
|
@ -40,7 +40,7 @@ typedef struct STable STable;
|
||||||
|
|
||||||
int tsdbMemTableCreate(STsdb *pTsdb, STsdbMemTable **ppMemTable);
|
int tsdbMemTableCreate(STsdb *pTsdb, STsdbMemTable **ppMemTable);
|
||||||
void tsdbMemTableDestroy(STsdb *pTsdb, STsdbMemTable *pMemTable);
|
void tsdbMemTableDestroy(STsdb *pTsdb, STsdbMemTable *pMemTable);
|
||||||
int tsdbInsertTableData(STsdb *pTsdb, SSubmitBlk *pBlock, int32_t *pAffectedRows);
|
int tsdbInsertTableData(STsdb *pTsdb, SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlock, int32_t *pAffectedRows);
|
||||||
int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, SDataCols *pCols,
|
int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, SDataCols *pCols,
|
||||||
TKEY *filterKeys, int nFilterKeys, bool keepDup, SMergeInfo *pMergeInfo);
|
TKEY *filterKeys, int nFilterKeys, bool keepDup, SMergeInfo *pMergeInfo);
|
||||||
|
|
||||||
|
|
|
@ -188,7 +188,7 @@ struct STbUidStore {
|
||||||
#define TD_VID(PVNODE) (PVNODE)->config.vgId
|
#define TD_VID(PVNODE) (PVNODE)->config.vgId
|
||||||
|
|
||||||
|
|
||||||
static FORCE_INLINE bool tsdbIsRollup(SVnode* pVnode) {
|
static FORCE_INLINE bool vnodeIsRollup(SVnode* pVnode) {
|
||||||
SRetention* pRetention = &(pVnode->config.tsdbCfg.retentions[0]);
|
SRetention* pRetention = &(pVnode->config.tsdbCfg.retentions[0]);
|
||||||
return (pRetention->freq > 0 && pRetention->keep > 0);
|
return (pRetention->freq > 0 && pRetention->keep > 0);
|
||||||
}
|
}
|
||||||
|
|
|
@ -37,9 +37,9 @@ int32_t tqReadHandleSetMsg(STqReadHandle* pReadHandle, SSubmitReq* pMsg, int64_t
|
||||||
// pMsg->numOfBlocks = htonl(pMsg->numOfBlocks);
|
// pMsg->numOfBlocks = htonl(pMsg->numOfBlocks);
|
||||||
|
|
||||||
// iterate and convert
|
// iterate and convert
|
||||||
if (tInitSubmitMsgIterEx(pMsg, &pReadHandle->msgIter) < 0) return -1;
|
if (tInitSubmitMsgIter(pMsg, &pReadHandle->msgIter) < 0) return -1;
|
||||||
while (true) {
|
while (true) {
|
||||||
if (tGetSubmitMsgNextEx(&pReadHandle->msgIter, &pReadHandle->pBlock) < 0) return -1;
|
if (tGetSubmitMsgNext(&pReadHandle->msgIter, &pReadHandle->pBlock) < 0) return -1;
|
||||||
if (pReadHandle->pBlock == NULL) break;
|
if (pReadHandle->pBlock == NULL) break;
|
||||||
|
|
||||||
// pReadHandle->pBlock->uid = htobe64(pReadHandle->pBlock->uid);
|
// pReadHandle->pBlock->uid = htobe64(pReadHandle->pBlock->uid);
|
||||||
|
@ -50,7 +50,7 @@ int32_t tqReadHandleSetMsg(STqReadHandle* pReadHandle, SSubmitReq* pMsg, int64_t
|
||||||
// pReadHandle->pBlock->numOfRows = htons(pReadHandle->pBlock->numOfRows);
|
// pReadHandle->pBlock->numOfRows = htons(pReadHandle->pBlock->numOfRows);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tInitSubmitMsgIterEx(pMsg, &pReadHandle->msgIter) < 0) return -1;
|
if (tInitSubmitMsgIter(pMsg, &pReadHandle->msgIter) < 0) return -1;
|
||||||
pReadHandle->ver = ver;
|
pReadHandle->ver = ver;
|
||||||
memset(&pReadHandle->blkIter, 0, sizeof(SSubmitBlkIter));
|
memset(&pReadHandle->blkIter, 0, sizeof(SSubmitBlkIter));
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -58,7 +58,7 @@ int32_t tqReadHandleSetMsg(STqReadHandle* pReadHandle, SSubmitReq* pMsg, int64_t
|
||||||
|
|
||||||
bool tqNextDataBlock(STqReadHandle* pHandle) {
|
bool tqNextDataBlock(STqReadHandle* pHandle) {
|
||||||
while (1) {
|
while (1) {
|
||||||
if (tGetSubmitMsgNextEx(&pHandle->msgIter, &pHandle->pBlock) < 0) {
|
if (tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock) < 0) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
if (pHandle->pBlock == NULL) return false;
|
if (pHandle->pBlock == NULL) return false;
|
||||||
|
@ -169,8 +169,8 @@ int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* p
|
||||||
tdSTSRowIterInit(&iter, pTschema);
|
tdSTSRowIterInit(&iter, pTschema);
|
||||||
STSRow* row;
|
STSRow* row;
|
||||||
int32_t curRow = 0;
|
int32_t curRow = 0;
|
||||||
tInitSubmitBlkIterEx(&pHandle->msgIter, pHandle->pBlock, &pHandle->blkIter);
|
tInitSubmitBlkIter(&pHandle->msgIter, pHandle->pBlock, &pHandle->blkIter);
|
||||||
while ((row = tGetSubmitBlkNextEx(&pHandle->blkIter)) != NULL) {
|
while ((row = tGetSubmitBlkNext(&pHandle->blkIter)) != NULL) {
|
||||||
tdSTSRowIterReset(&iter, row);
|
tdSTSRowIterReset(&iter, row);
|
||||||
// get all wanted col of that block
|
// get all wanted col of that block
|
||||||
for (int32_t i = 0; i < colActual; i++) {
|
for (int32_t i = 0; i < colActual; i++) {
|
||||||
|
|
|
@ -190,35 +190,7 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tdScanAndConvertSubmitMsg(SSubmitReq *pMsg) {
|
int tsdbInsertTableData(STsdb *pTsdb, SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlock, int32_t *pAffectedRows) {
|
||||||
ASSERT(pMsg != NULL);
|
|
||||||
SSubmitMsgIter msgIter = {0};
|
|
||||||
SSubmitBlk *pBlock = NULL;
|
|
||||||
SSubmitBlkIter blkIter = {0};
|
|
||||||
STSRow *row = NULL;
|
|
||||||
|
|
||||||
terrno = TSDB_CODE_SUCCESS;
|
|
||||||
pMsg->length = htonl(pMsg->length);
|
|
||||||
pMsg->numOfBlocks = htonl(pMsg->numOfBlocks);
|
|
||||||
|
|
||||||
if (tInitSubmitMsgIter(pMsg, &msgIter) < 0) return -1;
|
|
||||||
while (true) {
|
|
||||||
if (tGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1;
|
|
||||||
if (pBlock == NULL) break;
|
|
||||||
|
|
||||||
pBlock->uid = htobe64(pBlock->uid);
|
|
||||||
pBlock->suid = htobe64(pBlock->suid);
|
|
||||||
pBlock->sversion = htonl(pBlock->sversion);
|
|
||||||
pBlock->dataLen = htonl(pBlock->dataLen);
|
|
||||||
pBlock->schemaLen = htonl(pBlock->schemaLen);
|
|
||||||
pBlock->numOfRows = htons(pBlock->numOfRows);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (terrno != TSDB_CODE_SUCCESS) return -1;
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int tsdbInsertTableData(STsdb *pTsdb, SSubmitBlk *pBlock, int32_t *pAffectedRows) {
|
|
||||||
// STsdbMeta *pMeta = pRepo->tsdbMeta;
|
// STsdbMeta *pMeta = pRepo->tsdbMeta;
|
||||||
// int32_t points = 0;
|
// int32_t points = 0;
|
||||||
// STable *pTable = NULL;
|
// STable *pTable = NULL;
|
||||||
|
@ -232,15 +204,15 @@ int tsdbInsertTableData(STsdb *pTsdb, SSubmitBlk *pBlock, int32_t *pAffectedRows
|
||||||
SSubmitBlk *pBlkCopy;
|
SSubmitBlk *pBlkCopy;
|
||||||
|
|
||||||
// create container is nedd
|
// create container is nedd
|
||||||
tptr = taosHashGet(pMemTable->pHashIdx, &(pBlock->uid), sizeof(pBlock->uid));
|
tptr = taosHashGet(pMemTable->pHashIdx, &(pMsgIter->uid), sizeof(pMsgIter->uid));
|
||||||
if (tptr == NULL) {
|
if (tptr == NULL) {
|
||||||
pTbData = tsdbNewTbData(pBlock->uid);
|
pTbData = tsdbNewTbData(pMsgIter->uid);
|
||||||
if (pTbData == NULL) {
|
if (pTbData == NULL) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Put into hash
|
// Put into hash
|
||||||
taosHashPut(pMemTable->pHashIdx, &(pBlock->uid), sizeof(pBlock->uid), &(pTbData), sizeof(pTbData));
|
taosHashPut(pMemTable->pHashIdx, &(pMsgIter->uid), sizeof(pMsgIter->uid), &(pTbData), sizeof(pTbData));
|
||||||
|
|
||||||
// Put into skiplist
|
// Put into skiplist
|
||||||
tSkipListPut(pMemTable->pSlIdx, pTbData);
|
tSkipListPut(pMemTable->pSlIdx, pTbData);
|
||||||
|
@ -249,10 +221,10 @@ int tsdbInsertTableData(STsdb *pTsdb, SSubmitBlk *pBlock, int32_t *pAffectedRows
|
||||||
}
|
}
|
||||||
|
|
||||||
// copy data to buffer pool
|
// copy data to buffer pool
|
||||||
pBlkCopy = (SSubmitBlk *)vnodeBufPoolMalloc(pTsdb->mem->pPool, pBlock->dataLen + sizeof(*pBlock));
|
pBlkCopy = (SSubmitBlk *)vnodeBufPoolMalloc(pTsdb->mem->pPool, pMsgIter->dataLen + sizeof(*pBlock));
|
||||||
memcpy(pBlkCopy, pBlock, pBlock->dataLen + sizeof(*pBlock));
|
memcpy(pBlkCopy, pBlock, pMsgIter->dataLen + sizeof(*pBlock));
|
||||||
|
|
||||||
tInitSubmitBlkIter(pBlkCopy, &blkIter);
|
tInitSubmitBlkIter(pMsgIter, pBlkCopy, &blkIter);
|
||||||
if (blkIter.row == NULL) return 0;
|
if (blkIter.row == NULL) return 0;
|
||||||
keyMin = TD_ROW_KEY(blkIter.row);
|
keyMin = TD_ROW_KEY(blkIter.row);
|
||||||
|
|
||||||
|
@ -261,15 +233,15 @@ int tsdbInsertTableData(STsdb *pTsdb, SSubmitBlk *pBlock, int32_t *pAffectedRows
|
||||||
// Set statistics
|
// Set statistics
|
||||||
keyMax = TD_ROW_KEY(blkIter.row);
|
keyMax = TD_ROW_KEY(blkIter.row);
|
||||||
|
|
||||||
pTbData->nrows += pBlock->numOfRows;
|
pTbData->nrows += pMsgIter->numOfRows;
|
||||||
if (pTbData->keyMin > keyMin) pTbData->keyMin = keyMin;
|
if (pTbData->keyMin > keyMin) pTbData->keyMin = keyMin;
|
||||||
if (pTbData->keyMax < keyMax) pTbData->keyMax = keyMax;
|
if (pTbData->keyMax < keyMax) pTbData->keyMax = keyMax;
|
||||||
|
|
||||||
pMemTable->nRow += pBlock->numOfRows;
|
pMemTable->nRow += pMsgIter->numOfRows;
|
||||||
if (pMemTable->keyMin > keyMin) pMemTable->keyMin = keyMin;
|
if (pMemTable->keyMin > keyMin) pMemTable->keyMin = keyMin;
|
||||||
if (pMemTable->keyMax < keyMax) pMemTable->keyMax = keyMax;
|
if (pMemTable->keyMax < keyMax) pMemTable->keyMax = keyMax;
|
||||||
|
|
||||||
(*pAffectedRows) += pBlock->numOfRows;
|
(*pAffectedRows) += pMsgIter->numOfRows;
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -67,7 +67,7 @@ int tsdbOpenImpl(SVnode *pVnode, int8_t type, STsdb **ppTsdb, const char *dir) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
tsdbDebug("vgId: %d tsdb is opened for %s", TD_VID(pVnode), pTsdb->path);
|
tsdbDebug("vgId:%d tsdb is opened for %s", TD_VID(pVnode), pTsdb->path);
|
||||||
|
|
||||||
*ppTsdb = pTsdb;
|
*ppTsdb = pTsdb;
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -351,14 +351,25 @@ static void setQueryTimewindow(STsdbReadHandle* pTsdbReadHandle, SQueryTableData
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions) {
|
||||||
|
if (vnodeIsRollup(pVnode)) {
|
||||||
|
// for(int32_t i=0; i< TSDB_; ) {
|
||||||
|
|
||||||
|
// }
|
||||||
|
}
|
||||||
|
return pVnode->pTsdb;
|
||||||
|
}
|
||||||
|
|
||||||
static STsdbReadHandle* tsdbQueryTablesImpl(SVnode* pVnode, SQueryTableDataCond* pCond, uint64_t qId, uint64_t taskId) {
|
static STsdbReadHandle* tsdbQueryTablesImpl(SVnode* pVnode, SQueryTableDataCond* pCond, uint64_t qId, uint64_t taskId) {
|
||||||
STsdbReadHandle* pReadHandle = taosMemoryCalloc(1, sizeof(STsdbReadHandle));
|
STsdbReadHandle* pReadHandle = taosMemoryCalloc(1, sizeof(STsdbReadHandle));
|
||||||
if (pReadHandle == NULL) {
|
if (pReadHandle == NULL) {
|
||||||
goto _end;
|
goto _end;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
STsdb* pTsdb = getTsdbByRetentions(pVnode, pCond->twindow.skey, pVnode->config.tsdbCfg.retentions);
|
||||||
|
|
||||||
pReadHandle->order = pCond->order;
|
pReadHandle->order = pCond->order;
|
||||||
pReadHandle->pTsdb = pVnode->pTsdb;
|
pReadHandle->pTsdb = pTsdb;
|
||||||
pReadHandle->type = TSDB_QUERY_TYPE_ALL;
|
pReadHandle->type = TSDB_QUERY_TYPE_ALL;
|
||||||
pReadHandle->cur.fid = INT32_MIN;
|
pReadHandle->cur.fid = INT32_MIN;
|
||||||
pReadHandle->cur.win = TSWINDOW_INITIALIZER;
|
pReadHandle->cur.win = TSWINDOW_INITIALIZER;
|
||||||
|
@ -376,7 +387,7 @@ static STsdbReadHandle* tsdbQueryTablesImpl(SVnode* pVnode, SQueryTableDataCond*
|
||||||
snprintf(buf, tListLen(buf), "TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, qId);
|
snprintf(buf, tListLen(buf), "TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, qId);
|
||||||
pReadHandle->idStr = strdup(buf);
|
pReadHandle->idStr = strdup(buf);
|
||||||
|
|
||||||
if (tsdbInitReadH(&pReadHandle->rhelper, (STsdb*)pVnode->pTsdb) != 0) {
|
if (tsdbInitReadH(&pReadHandle->rhelper, pReadHandle->pTsdb) != 0) {
|
||||||
goto _end;
|
goto _end;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -413,7 +424,7 @@ static STsdbReadHandle* tsdbQueryTablesImpl(SVnode* pVnode, SQueryTableDataCond*
|
||||||
pReadHandle->suppInfo.plist = taosMemoryCalloc(taosArrayGetSize(pReadHandle->suppInfo.defaultLoadColumn), POINTER_BYTES);
|
pReadHandle->suppInfo.plist = taosMemoryCalloc(taosArrayGetSize(pReadHandle->suppInfo.defaultLoadColumn), POINTER_BYTES);
|
||||||
}
|
}
|
||||||
|
|
||||||
pReadHandle->pDataCols = tdNewDataCols(1000, pReadHandle->pTsdb->pVnode->config.tsdbCfg.maxRows);
|
pReadHandle->pDataCols = tdNewDataCols(1000, pVnode->config.tsdbCfg.maxRows);
|
||||||
if (pReadHandle->pDataCols == NULL) {
|
if (pReadHandle->pDataCols == NULL) {
|
||||||
tsdbError("%p failed to malloc buf for pDataCols, %s", pReadHandle, pReadHandle->idStr);
|
tsdbError("%p failed to malloc buf for pDataCols, %s", pReadHandle, pReadHandle->idStr);
|
||||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||||
|
|
|
@ -702,25 +702,25 @@ int32_t tsdbUpdateExpiredWindowImpl(STsdb *pTsdb, SSubmitReq *pMsg, int64_t vers
|
||||||
SInterval interval = {0};
|
SInterval interval = {0};
|
||||||
TSKEY lastWinSKey = INT64_MIN;
|
TSKEY lastWinSKey = INT64_MIN;
|
||||||
|
|
||||||
if (tInitSubmitMsgIterEx(pMsg, &msgIter) != TSDB_CODE_SUCCESS) {
|
if (tInitSubmitMsgIter(pMsg, &msgIter) != TSDB_CODE_SUCCESS) {
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
tGetSubmitMsgNextEx(&msgIter, &pBlock);
|
tGetSubmitMsgNext(&msgIter, &pBlock);
|
||||||
if (!pBlock) break;
|
if (!pBlock) break;
|
||||||
|
|
||||||
STSmaWrapper *pSW = NULL;
|
STSmaWrapper *pSW = NULL;
|
||||||
STSma *pTSma = NULL;
|
STSma *pTSma = NULL;
|
||||||
|
|
||||||
SSubmitBlkIter blkIter = {0};
|
SSubmitBlkIter blkIter = {0};
|
||||||
if (tInitSubmitBlkIterEx(&msgIter, pBlock, &blkIter) != TSDB_CODE_SUCCESS) {
|
if (tInitSubmitBlkIter(&msgIter, pBlock, &blkIter) != TSDB_CODE_SUCCESS) {
|
||||||
pSW = tdFreeTSmaWrapper(pSW);
|
pSW = tdFreeTSmaWrapper(pSW);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
STSRow *row = tGetSubmitBlkNextEx(&blkIter);
|
STSRow *row = tGetSubmitBlkNext(&blkIter);
|
||||||
if (!row) {
|
if (!row) {
|
||||||
tdFreeTSmaWrapper(pSW);
|
tdFreeTSmaWrapper(pSW);
|
||||||
break;
|
break;
|
||||||
|
@ -1966,9 +1966,9 @@ static int32_t tsdbFetchSubmitReqSuids(SSubmitReq *pMsg, STbUidStore *pStore) {
|
||||||
|
|
||||||
terrno = TSDB_CODE_SUCCESS;
|
terrno = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
if (tInitSubmitMsgIterEx(pMsg, &msgIter) < 0) return -1;
|
if (tInitSubmitMsgIter(pMsg, &msgIter) < 0) return -1;
|
||||||
while (true) {
|
while (true) {
|
||||||
if (tGetSubmitMsgNextEx(&msgIter, &pBlock) < 0) return -1;
|
if (tGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1;
|
||||||
|
|
||||||
if (!pBlock) break;
|
if (!pBlock) break;
|
||||||
tsdbUidStorePut(pStore, msgIter.suid, NULL);
|
tsdbUidStorePut(pStore, msgIter.suid, NULL);
|
||||||
|
|
|
@ -38,11 +38,11 @@ int tsdbInsertData(STsdb *pTsdb, int64_t version, SSubmitReq *pMsg, SSubmitRsp *
|
||||||
while (true) {
|
while (true) {
|
||||||
tGetSubmitMsgNext(&msgIter, &pBlock);
|
tGetSubmitMsgNext(&msgIter, &pBlock);
|
||||||
if (pBlock == NULL) break;
|
if (pBlock == NULL) break;
|
||||||
if (tsdbInsertTableData(pTsdb, pBlock, &affectedrows) < 0) {
|
if (tsdbInsertTableData(pTsdb, &msgIter, pBlock, &affectedrows) < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
numOfRows += pBlock->numOfRows;
|
numOfRows += msgIter.numOfRows;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pRsp != NULL) {
|
if (pRsp != NULL) {
|
||||||
|
@ -66,20 +66,20 @@ static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg) {
|
||||||
TSKEY maxKey = now + tsTickPerDay[pCfg->precision] * pCfg->days;
|
TSKEY maxKey = now + tsTickPerDay[pCfg->precision] * pCfg->days;
|
||||||
|
|
||||||
terrno = TSDB_CODE_SUCCESS;
|
terrno = TSDB_CODE_SUCCESS;
|
||||||
pMsg->length = htonl(pMsg->length);
|
// pMsg->length = htonl(pMsg->length);
|
||||||
pMsg->numOfBlocks = htonl(pMsg->numOfBlocks);
|
// pMsg->numOfBlocks = htonl(pMsg->numOfBlocks);
|
||||||
|
|
||||||
if (tInitSubmitMsgIter(pMsg, &msgIter) < 0) return -1;
|
if (tInitSubmitMsgIter(pMsg, &msgIter) < 0) return -1;
|
||||||
while (true) {
|
while (true) {
|
||||||
if (tGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1;
|
if (tGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1;
|
||||||
if (pBlock == NULL) break;
|
if (pBlock == NULL) break;
|
||||||
|
|
||||||
pBlock->uid = htobe64(pBlock->uid);
|
// pBlock->uid = htobe64(pBlock->uid);
|
||||||
pBlock->suid = htobe64(pBlock->suid);
|
// pBlock->suid = htobe64(pBlock->suid);
|
||||||
pBlock->sversion = htonl(pBlock->sversion);
|
// pBlock->sversion = htonl(pBlock->sversion);
|
||||||
pBlock->dataLen = htonl(pBlock->dataLen);
|
// pBlock->dataLen = htonl(pBlock->dataLen);
|
||||||
pBlock->schemaLen = htonl(pBlock->schemaLen);
|
// pBlock->schemaLen = htonl(pBlock->schemaLen);
|
||||||
pBlock->numOfRows = htons(pBlock->numOfRows);
|
// pBlock->numOfRows = htons(pBlock->numOfRows);
|
||||||
|
|
||||||
#if 0
|
#if 0
|
||||||
if (pBlock->tid <= 0 || pBlock->tid >= pMeta->maxTables) {
|
if (pBlock->tid <= 0 || pBlock->tid >= pMeta->maxTables) {
|
||||||
|
|
|
@ -96,24 +96,24 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// open tsdb
|
// open tsdb
|
||||||
if (tsdbIsRollup(pVnode)) {
|
if (vnodeIsRollup(pVnode)) {
|
||||||
if (tsdbOpen(pVnode, TSDB_TYPE_RSMA_L0) < 0) {
|
if (tsdbOpen(pVnode, TSDB_TYPE_RSMA_L0) < 0) {
|
||||||
vError("vgId: %d failed to open vnode rsma0 since %s", TD_VID(pVnode), tstrerror(terrno));
|
vError("vgId:%d failed to open vnode rsma0 since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tsdbOpen(pVnode, TSDB_TYPE_RSMA_L1) < 0) {
|
if (tsdbOpen(pVnode, TSDB_TYPE_RSMA_L1) < 0) {
|
||||||
vError("vgId: %d failed to open vnode rsma1 since %s", TD_VID(pVnode), tstrerror(terrno));
|
vError("vgId:%d failed to open vnode rsma1 since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tsdbOpen(pVnode, TSDB_TYPE_RSMA_L2) < 0) {
|
if (tsdbOpen(pVnode, TSDB_TYPE_RSMA_L2) < 0) {
|
||||||
vError("vgId: %d failed to open vnode rsma2 since %s", TD_VID(pVnode), tstrerror(terrno));
|
vError("vgId:%d failed to open vnode rsma2 since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (tsdbOpen(pVnode, TSDB_TYPE_TSDB) < 0) {
|
if (tsdbOpen(pVnode, TSDB_TYPE_TSDB) < 0) {
|
||||||
vError("vgId: %d failed to open vnode tsdb since %s", TD_VID(pVnode), tstrerror(terrno));
|
vError("vgId:%d failed to open vnode tsdb since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue