enh: remove obsolete codes for raw data row
This commit is contained in:
parent
847e351b92
commit
6d0d42569a
|
@ -290,6 +290,7 @@ typedef struct {
|
|||
(IS_SIGNED_NUMERIC_TYPE(_t) || (_t) == (TSDB_DATA_TYPE_BOOL) || (_t) == (TSDB_DATA_TYPE_TIMESTAMP))
|
||||
#define IS_CONVERT_AS_UNSIGNED(_t) (IS_UNSIGNED_NUMERIC_TYPE(_t) || (_t) == (TSDB_DATA_TYPE_BOOL))
|
||||
|
||||
#if 0
|
||||
// TODO remove this function
|
||||
static FORCE_INLINE bool isNull(const void *val, int32_t type) {
|
||||
switch (type) {
|
||||
|
@ -325,6 +326,7 @@ static FORCE_INLINE bool isNull(const void *val, int32_t type) {
|
|||
return false;
|
||||
};
|
||||
}
|
||||
#endif
|
||||
|
||||
typedef struct tDataTypeDescriptor {
|
||||
int16_t type;
|
||||
|
|
|
@ -347,11 +347,6 @@ typedef struct SInsertStmt {
|
|||
uint8_t precision;
|
||||
} SInsertStmt;
|
||||
|
||||
typedef enum {
|
||||
PAYLOAD_TYPE_KV = 0,
|
||||
PAYLOAD_TYPE_RAW = 1,
|
||||
} EPayloadType;
|
||||
|
||||
typedef struct SVgDataBlocks {
|
||||
SVgroupInfo vg;
|
||||
int32_t numOfTables; // number of tables in current submit block
|
||||
|
@ -363,7 +358,6 @@ typedef struct SVnodeModifOpStmt {
|
|||
ENodeType nodeType;
|
||||
ENodeType sqlNodeType;
|
||||
SArray* pDataBlocks; // data block for each vgroup, SArray<SVgDataBlocks*>.
|
||||
uint8_t payloadType; // EPayloadType. 0: K-V payload for non-prepare insert, 1: rawPayload for prepare insert
|
||||
uint32_t insertType; // insert data from [file|sql statement| bound statement]
|
||||
const char* sql; // current sql statement position
|
||||
} SVnodeModifOpStmt;
|
||||
|
|
|
@ -1369,7 +1369,6 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname)
|
|||
goto end;
|
||||
}
|
||||
SVnodeModifOpStmt* nodeStmt = (SVnodeModifOpStmt*)(pQuery->pRoot);
|
||||
nodeStmt->payloadType = PAYLOAD_TYPE_KV;
|
||||
nodeStmt->pDataBlocks = taosArrayInit(1, POINTER_BYTES);
|
||||
|
||||
SVgDataBlocks* dst = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
|
||||
|
@ -1625,7 +1624,6 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
|
|||
goto end;
|
||||
}
|
||||
SVnodeModifOpStmt* nodeStmt = (SVnodeModifOpStmt*)(pQuery->pRoot);
|
||||
nodeStmt->payloadType = PAYLOAD_TYPE_KV;
|
||||
|
||||
int32_t numOfVg = taosHashGetSize(pVgHash);
|
||||
nodeStmt->pDataBlocks = taosArrayInit(numOfVg, POINTER_BYTES);
|
||||
|
@ -1929,7 +1927,6 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
|
|||
goto end;
|
||||
}
|
||||
SVnodeModifOpStmt* nodeStmt = (SVnodeModifOpStmt*)(pQuery->pRoot);
|
||||
nodeStmt->payloadType = PAYLOAD_TYPE_KV;
|
||||
|
||||
int32_t numOfVg = taosHashGetSize(pVgHash);
|
||||
nodeStmt->pDataBlocks = taosArrayInit(numOfVg, POINTER_BYTES);
|
||||
|
|
|
@ -1531,7 +1531,6 @@ static SSmlHandle *smlBuildSmlInfo(STscObj *pTscObj, SRequestObj *request, SMLPr
|
|||
uError("SML:0x%" PRIx64 " create info->pQuery->pRoot error", info->id);
|
||||
goto cleanup;
|
||||
}
|
||||
((SVnodeModifOpStmt *)(info->pQuery->pRoot))->payloadType = PAYLOAD_TYPE_KV;
|
||||
|
||||
if (pTscObj) {
|
||||
info->taos = pTscObj;
|
||||
|
|
|
@ -152,7 +152,7 @@ int32_t insInitRowBuilder(SRowBuilder *pBuilder, int16_t schemaVer, SParsedDataC
|
|||
int32_t insGetDataBlockFromList(SHashObj *pHashList, void *id, int32_t idLen, int32_t size, int32_t startOffset,
|
||||
int32_t rowSize, STableMeta *pTableMeta, STableDataBlocks **dataBlocks,
|
||||
SArray *pBlockList, SVCreateTbReq *pCreateTbReq);
|
||||
int32_t insMergeTableDataBlocks(SHashObj *pHashObj, uint8_t payloadType, SArray **pVgDataBlocks);
|
||||
int32_t insMergeTableDataBlocks(SHashObj *pHashObj, SArray **pVgDataBlocks);
|
||||
int32_t insBuildCreateTbMsg(STableDataBlocks *pBlocks, SVCreateTbReq *pCreateTbReq);
|
||||
int32_t insAllocateMemForSize(STableDataBlocks *pDataBlock, int32_t allSize);
|
||||
int32_t insCreateSName(SName *pName, struct SToken *pTableName, int32_t acctId, const char *dbName, SMsgBuf *pMsgBuf);
|
||||
|
|
|
@ -1369,7 +1369,7 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
|
|||
|
||||
// merge according to vgId
|
||||
if (taosHashGetSize(pCxt->pTableBlockHashObj) > 0) {
|
||||
CHECK_CODE(insMergeTableDataBlocks(pCxt->pTableBlockHashObj, pCxt->pOutput->payloadType, &pCxt->pVgDataBlocks));
|
||||
CHECK_CODE(insMergeTableDataBlocks(pCxt->pTableBlockHashObj, &pCxt->pVgDataBlocks));
|
||||
}
|
||||
return insBuildOutput(pCxt);
|
||||
}
|
||||
|
@ -1390,7 +1390,7 @@ static int32_t parseInsertBodyAgain(SInsertParseContext* pCxt) {
|
|||
parserDebug("0x%" PRIx64 " insert again input rows: %d", pCxt->pComCxt->requestId, pCxt->totalNum);
|
||||
// merge according to vgId
|
||||
if (taosHashGetSize(pCxt->pTableBlockHashObj) > 0) {
|
||||
CHECK_CODE(insMergeTableDataBlocks(pCxt->pTableBlockHashObj, pCxt->pOutput->payloadType, &pCxt->pVgDataBlocks));
|
||||
CHECK_CODE(insMergeTableDataBlocks(pCxt->pTableBlockHashObj, &pCxt->pVgDataBlocks));
|
||||
}
|
||||
return insBuildOutput(pCxt);
|
||||
}
|
||||
|
@ -1472,8 +1472,6 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery, SParseMetaCache
|
|||
}
|
||||
}
|
||||
|
||||
context.pOutput->payloadType = PAYLOAD_TYPE_KV;
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
if (!context.pComCxt->needMultiParse) {
|
||||
code = skipInsertInto(&context.pSql, &context.msg);
|
||||
|
|
|
@ -40,8 +40,7 @@ int32_t qBuildStmtOutput(SQuery* pQuery, SHashObj* pVgHash, SHashObj* pBlockHash
|
|||
|
||||
// merge according to vgId
|
||||
if (taosHashGetSize(insertCtx.pTableBlockHashObj) > 0) {
|
||||
CHECK_CODE(
|
||||
insMergeTableDataBlocks(insertCtx.pTableBlockHashObj, modifyNode->payloadType, &insertCtx.pVgDataBlocks));
|
||||
CHECK_CODE(insMergeTableDataBlocks(insertCtx.pTableBlockHashObj, &insertCtx.pVgDataBlocks));
|
||||
}
|
||||
|
||||
CHECK_CODE(insBuildOutput(&insertCtx));
|
||||
|
|
|
@ -21,9 +21,6 @@
|
|||
#include "querynodes.h"
|
||||
#include "tRealloc.h"
|
||||
|
||||
#define IS_RAW_PAYLOAD(t) \
|
||||
(((int)(t)) == PAYLOAD_TYPE_RAW) // 0: K-V payload for non-prepare insert, 1: rawPayload for prepare insert
|
||||
|
||||
typedef struct SBlockKeyTuple {
|
||||
TSKEY skey;
|
||||
void* payloadAddr;
|
||||
|
@ -315,7 +312,7 @@ int32_t insGetDataBlockFromList(SHashObj* pHashList, void* id, int32_t idLen, in
|
|||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
#if 0
|
||||
static int32_t getRowExpandSize(STableMeta* pTableMeta) {
|
||||
int32_t result = TD_ROW_HEAD_LEN - sizeof(TSKEY);
|
||||
int32_t columns = getNumOfColumns(pTableMeta);
|
||||
|
@ -328,6 +325,7 @@ static int32_t getRowExpandSize(STableMeta* pTableMeta) {
|
|||
result += (int32_t)TD_BITMAP_BYTES(columns - 1);
|
||||
return result;
|
||||
}
|
||||
#endif
|
||||
|
||||
void insDestroyBlockArrayList(SArray* pDataBlockList) {
|
||||
if (pDataBlockList == NULL) {
|
||||
|
@ -359,6 +357,7 @@ void insDestroyBlockHashmap(SHashObj* pDataBlockHash) {
|
|||
taosHashCleanup(pDataBlockHash);
|
||||
}
|
||||
|
||||
#if 0
|
||||
// data block is disordered, sort it in ascending order
|
||||
void sortRemoveDataBlockDupRowsRaw(STableDataBlocks* dataBuf) {
|
||||
SSubmitBlk* pBlocks = (SSubmitBlk*)dataBuf->pData;
|
||||
|
@ -401,6 +400,7 @@ void sortRemoveDataBlockDupRowsRaw(STableDataBlocks* dataBuf) {
|
|||
|
||||
dataBuf->prevTS = INT64_MIN;
|
||||
}
|
||||
#endif
|
||||
|
||||
// data block is disordered, sort it in ascending order
|
||||
static int sortRemoveDataBlockDupRows(STableDataBlocks* dataBuf, SBlockKeyInfo* pBlkKeyInfo) {
|
||||
|
@ -667,52 +667,17 @@ static int sortMergeDataBlockDupRows(STableDataBlocks* dataBuf, SBlockKeyInfo* p
|
|||
}
|
||||
|
||||
// Erase the empty space reserved for binary data
|
||||
static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, SBlockKeyTuple* blkKeyTuple,
|
||||
bool isRawPayload) {
|
||||
static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, SBlockKeyTuple* blkKeyTuple) {
|
||||
// TODO: optimize this function, handle the case while binary is not presented
|
||||
STableMeta* pTableMeta = pTableDataBlock->pTableMeta;
|
||||
STableComInfo tinfo = getTableInfo(pTableMeta);
|
||||
SSchema* pSchema = getTableColumnSchema(pTableMeta);
|
||||
|
||||
int32_t nonDataLen = sizeof(SSubmitBlk) + pTableDataBlock->createTbReqLen;
|
||||
SSubmitBlk* pBlock = pDataBlock;
|
||||
memcpy(pDataBlock, pTableDataBlock->pData, nonDataLen);
|
||||
pDataBlock = (char*)pDataBlock + nonDataLen;
|
||||
|
||||
int32_t flen = 0; // original total length of row
|
||||
if (isRawPayload) {
|
||||
for (int32_t j = 0; j < tinfo.numOfColumns; ++j) {
|
||||
flen += TYPE_BYTES[pSchema[j].type];
|
||||
}
|
||||
}
|
||||
pBlock->schemaLen = pTableDataBlock->createTbReqLen;
|
||||
|
||||
char* p = pTableDataBlock->pData + nonDataLen;
|
||||
pBlock->dataLen = 0;
|
||||
|
||||
int32_t numOfRows = pBlock->numOfRows;
|
||||
|
||||
if (isRawPayload) {
|
||||
SRowBuilder builder = {0};
|
||||
|
||||
tdSRowInit(&builder, pTableMeta->sversion);
|
||||
tdSRowSetInfo(&builder, getNumOfColumns(pTableMeta), -1, flen);
|
||||
|
||||
for (int32_t i = 0; i < numOfRows; ++i) {
|
||||
tdSRowResetBuf(&builder, pDataBlock);
|
||||
int toffset = 0;
|
||||
for (int32_t j = 0; j < tinfo.numOfColumns; ++j) {
|
||||
int8_t colType = pSchema[j].type;
|
||||
uint8_t valType = isNull(p, colType) ? TD_VTYPE_NULL : TD_VTYPE_NORM;
|
||||
tdAppendColValToRow(&builder, pSchema[j].colId, colType, valType, p, true, toffset, j);
|
||||
toffset += TYPE_BYTES[colType];
|
||||
p += pSchema[j].bytes;
|
||||
}
|
||||
tdSRowEnd(&builder);
|
||||
int32_t rowLen = TD_ROW_LEN((STSRow*)pDataBlock);
|
||||
pDataBlock = (char*)pDataBlock + rowLen;
|
||||
pBlock->dataLen += rowLen;
|
||||
}
|
||||
} else {
|
||||
for (int32_t i = 0; i < numOfRows; ++i) {
|
||||
void* payload = (blkKeyTuple + i)->payloadAddr;
|
||||
TDRowLenT rowTLen = TD_ROW_LEN((STSRow*)payload);
|
||||
|
@ -720,15 +685,13 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, SB
|
|||
pDataBlock = POINTER_SHIFT(pDataBlock, rowTLen);
|
||||
pBlock->dataLen += rowTLen;
|
||||
}
|
||||
}
|
||||
|
||||
return pBlock->dataLen + pBlock->schemaLen;
|
||||
}
|
||||
|
||||
int32_t insMergeTableDataBlocks(SHashObj* pHashObj, uint8_t payloadType, SArray** pVgDataBlocks) {
|
||||
int32_t insMergeTableDataBlocks(SHashObj* pHashObj, SArray** pVgDataBlocks) {
|
||||
const int INSERT_HEAD_SIZE = sizeof(SSubmitReq);
|
||||
int code = 0;
|
||||
bool isRawPayload = IS_RAW_PAYLOAD(payloadType);
|
||||
SHashObj* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false);
|
||||
SArray* pVnodeDataBlockList = taosArrayInit(8, POINTER_BYTES);
|
||||
|
||||
|
@ -754,8 +717,7 @@ int32_t insMergeTableDataBlocks(SHashObj* pHashObj, uint8_t payloadType, SArray*
|
|||
}
|
||||
ASSERT(pOneTableBlock->pTableMeta->tableInfo.rowSize > 0);
|
||||
// the maximum expanded size in byte when a row-wise data is converted to SDataRow format
|
||||
int32_t expandSize = isRawPayload ? getRowExpandSize(pOneTableBlock->pTableMeta) : 0;
|
||||
int64_t destSize = dataBuf->size + pOneTableBlock->size + pBlocks->numOfRows * expandSize +
|
||||
int64_t destSize = dataBuf->size + pOneTableBlock->size +
|
||||
sizeof(STColumn) * getNumOfColumns(pOneTableBlock->pTableMeta) +
|
||||
pOneTableBlock->createTbReqLen;
|
||||
|
||||
|
@ -774,9 +736,6 @@ int32_t insMergeTableDataBlocks(SHashObj* pHashObj, uint8_t payloadType, SArray*
|
|||
}
|
||||
}
|
||||
|
||||
if (isRawPayload) {
|
||||
sortRemoveDataBlockDupRowsRaw(pOneTableBlock);
|
||||
} else {
|
||||
if ((code = sortMergeDataBlockDupRows(pOneTableBlock, &blkKeyInfo, &pBlkRowMerger)) != 0) {
|
||||
tdFreeSBlockRowMerger(pBlkRowMerger);
|
||||
taosHashCleanup(pVnodeDataBlockHashList);
|
||||
|
@ -786,11 +745,9 @@ int32_t insMergeTableDataBlocks(SHashObj* pHashObj, uint8_t payloadType, SArray*
|
|||
return code;
|
||||
}
|
||||
ASSERT(blkKeyInfo.pKeyTuple != NULL && pBlocks->numOfRows > 0);
|
||||
}
|
||||
|
||||
// erase the empty space reserved for binary data
|
||||
int32_t finalLen =
|
||||
trimDataBlock(dataBuf->pData + dataBuf->size, pOneTableBlock, blkKeyInfo.pKeyTuple, isRawPayload);
|
||||
int32_t finalLen = trimDataBlock(dataBuf->pData + dataBuf->size, pOneTableBlock, blkKeyInfo.pKeyTuple);
|
||||
|
||||
dataBuf->size += (finalLen + sizeof(SSubmitBlk));
|
||||
assert(dataBuf->size <= dataBuf->nAllocSize);
|
||||
|
|
Loading…
Reference in New Issue