enh: support column mode write
This commit is contained in:
parent
bffb92b911
commit
34662ea10a
|
@ -113,6 +113,7 @@ int32_t parseJsontoTagData(const char *json, SArray *pTagVals, STag **ppTag, voi
|
|||
void tColDataDestroy(void *ph);
|
||||
void tColDataInit(SColData *pColData, int16_t cid, int8_t type, int8_t smaOn);
|
||||
void tColDataClear(SColData *pColData);
|
||||
void tColDataDeepClear(SColData *pColData);
|
||||
int32_t tColDataAppendValue(SColData *pColData, SColVal *pColVal);
|
||||
void tColDataGetValue(SColData *pColData, int32_t iVal, SColVal *pColVal);
|
||||
uint8_t tColDataGetBitValue(const SColData *pColData, int32_t iVal);
|
||||
|
|
|
@ -87,7 +87,6 @@ void qCleanupKeywordsTable();
|
|||
int32_t qBuildStmtOutput(SQuery* pQuery, SHashObj* pVgHash, SHashObj* pBlockHash);
|
||||
int32_t qResetStmtDataBlock(void* block, bool keepBuf);
|
||||
int32_t qCloneStmtDataBlock(void** pDst, void* pSrc);
|
||||
void qFreeStmtDataBlock(void* pDataBlock);
|
||||
int32_t qRebuildStmtDataBlock(void** pDst, void* pSrc, uint64_t uid, int32_t vgId);
|
||||
void qDestroyStmtDataBlock(void* pBlock);
|
||||
STableMeta* qGetTableMetaInDataBlock(void* pDataBlock);
|
||||
|
|
|
@ -21,7 +21,7 @@ extern "C" {
|
|||
#endif
|
||||
#include "catalog.h"
|
||||
|
||||
typedef void STableDataBlocks;
|
||||
typedef void STableDataCxt;
|
||||
|
||||
typedef enum {
|
||||
STMT_TYPE_INSERT = 1,
|
||||
|
@ -43,8 +43,8 @@ typedef enum {
|
|||
} STMT_STATUS;
|
||||
|
||||
typedef struct SStmtTableCache {
|
||||
STableDataBlocks *pDataBlock;
|
||||
void *boundTags;
|
||||
STableDataCxt *pDataCtx;
|
||||
void *boundTags;
|
||||
} SStmtTableCache;
|
||||
|
||||
typedef struct SStmtQueryResInfo {
|
||||
|
|
|
@ -289,22 +289,18 @@ int32_t stmtCleanExecInfo(STscStmt* pStmt, bool keepTable, bool deepClean) {
|
|||
size_t keyLen = 0;
|
||||
void* pIter = taosHashIterate(pStmt->exec.pBlockHash, NULL);
|
||||
while (pIter) {
|
||||
STableDataBlocks* pBlocks = *(STableDataBlocks**)pIter;
|
||||
char* key = taosHashGetKey(pIter, &keyLen);
|
||||
STableMeta* pMeta = qGetTableMetaInDataBlock(pBlocks);
|
||||
STableDataCxt* pBlocks = *(STableDataCxt*)pIter;
|
||||
char* key = taosHashGetKey(pIter, &keyLen);
|
||||
STableMeta* pMeta = qGetTableMetaInDataBlock(pBlocks);
|
||||
|
||||
if (keepTable && (strlen(pStmt->bInfo.tbFName) == keyLen) && strncmp(pStmt->bInfo.tbFName, key, keyLen) == 0) {
|
||||
STMT_ERR_RET(qResetStmtDataBlock(pBlocks, true));
|
||||
STMT_ERR_RET(qResetStmtDataBlock(pBlocks, false));
|
||||
|
||||
pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (STMT_TYPE_MULTI_INSERT == pStmt->sql.type) {
|
||||
qFreeStmtDataBlock(pBlocks);
|
||||
} else {
|
||||
qDestroyStmtDataBlock(pBlocks);
|
||||
}
|
||||
qDestroyStmtDataBlock(pBlocks);
|
||||
taosHashRemove(pStmt->exec.pBlockHash, key, keyLen);
|
||||
|
||||
pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
|
||||
|
@ -354,7 +350,7 @@ int32_t stmtCleanSQLInfo(STscStmt* pStmt) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t stmtRebuildDataBlock(STscStmt* pStmt, STableDataBlocks* pDataBlock, STableDataBlocks** newBlock, uint64_t uid) {
|
||||
int32_t stmtRebuildDataBlock(STscStmt* pStmt, STableDataCxt* pDataBlock, STableDataCxt** newBlock, uint64_t uid) {
|
||||
SEpSet ep = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp);
|
||||
SVgroupInfo vgInfo = {0};
|
||||
SRequestConnInfo conn = {.pTrans = pStmt->taos->pAppInfo->pTransporter,
|
||||
|
@ -375,9 +371,9 @@ int32_t stmtGetFromCache(STscStmt* pStmt) {
|
|||
pStmt->bInfo.needParse = true;
|
||||
pStmt->bInfo.inExecCache = false;
|
||||
|
||||
STableDataBlocks* pBlockInExec =
|
||||
STableDataCxt* pCxtInExec =
|
||||
taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
|
||||
if (pBlockInExec) {
|
||||
if (pCxtInExec) {
|
||||
pStmt->bInfo.needParse = false;
|
||||
pStmt->bInfo.inExecCache = true;
|
||||
|
||||
|
@ -411,8 +407,8 @@ int32_t stmtGetFromCache(STscStmt* pStmt) {
|
|||
|
||||
pStmt->bInfo.tbUid = 0;
|
||||
|
||||
STableDataBlocks* pNewBlock = NULL;
|
||||
STMT_ERR_RET(stmtRebuildDataBlock(pStmt, pCache->pDataBlock, &pNewBlock, 0));
|
||||
STableDataCxt* pNewBlock = NULL;
|
||||
STMT_ERR_RET(stmtRebuildDataBlock(pStmt, pCache->pDataCtx, &pNewBlock, 0));
|
||||
|
||||
if (taosHashPut(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName), &pNewBlock,
|
||||
POINTER_BYTES)) {
|
||||
|
@ -489,8 +485,8 @@ int32_t stmtGetFromCache(STscStmt* pStmt) {
|
|||
pStmt->bInfo.boundTags = pCache->boundTags;
|
||||
pStmt->bInfo.tagsCached = true;
|
||||
|
||||
STableDataBlocks* pNewBlock = NULL;
|
||||
STMT_ERR_RET(stmtRebuildDataBlock(pStmt, pCache->pDataBlock, &pNewBlock, uid));
|
||||
STableDataCxt* pNewBlock = NULL;
|
||||
STMT_ERR_RET(stmtRebuildDataBlock(pStmt, pCache->pDataCtx, &pNewBlock, uid));
|
||||
|
||||
if (taosHashPut(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName), &pNewBlock,
|
||||
POINTER_BYTES)) {
|
||||
|
@ -614,8 +610,8 @@ int stmtSetTbTags(TAOS_STMT* stmt, TAOS_MULTI_BIND* tags) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
STableDataBlocks** pDataBlock =
|
||||
(STableDataBlocks**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
|
||||
STableDataCxt** pDataBlock =
|
||||
(STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
|
||||
if (NULL == pDataBlock) {
|
||||
tscError("table %s not found in exec blockHash", pStmt->bInfo.tbFName);
|
||||
STMT_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
|
||||
|
@ -637,8 +633,8 @@ int stmtFetchTagFields(STscStmt* pStmt, int32_t* fieldNum, TAOS_FIELD_E** fields
|
|||
STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
|
||||
}
|
||||
|
||||
STableDataBlocks** pDataBlock =
|
||||
(STableDataBlocks**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
|
||||
STableDataCxt** pDataBlock =
|
||||
(STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
|
||||
if (NULL == pDataBlock) {
|
||||
tscError("table %s not found in exec blockHash", pStmt->bInfo.tbFName);
|
||||
STMT_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
|
||||
|
@ -655,8 +651,8 @@ int stmtFetchColFields(STscStmt* pStmt, int32_t* fieldNum, TAOS_FIELD_E** fields
|
|||
STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
|
||||
}
|
||||
|
||||
STableDataBlocks** pDataBlock =
|
||||
(STableDataBlocks**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
|
||||
STableDataCxt** pDataBlock =
|
||||
(STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
|
||||
if (NULL == pDataBlock) {
|
||||
tscError("table %s not found in exec blockHash", pStmt->bInfo.tbFName);
|
||||
STMT_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
|
||||
|
@ -729,8 +725,8 @@ int stmtBindBatch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind, int32_t colIdx) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
STableDataBlocks** pDataBlock =
|
||||
(STableDataBlocks**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
|
||||
STableDataCxt** pDataBlock =
|
||||
(STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
|
||||
if (NULL == pDataBlock) {
|
||||
tscError("table %s not found in exec blockHash", pStmt->bInfo.tbFName);
|
||||
STMT_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
|
||||
|
@ -779,7 +775,7 @@ int stmtUpdateTableUid(STscStmt* pStmt, SSubmitRsp* pRsp) {
|
|||
int32_t code = 0;
|
||||
int32_t finalCode = 0;
|
||||
size_t keyLen = 0;
|
||||
STableDataBlocks** pIter = taosHashIterate(pStmt->exec.pBlockHash, NULL);
|
||||
STableDataCxt** pIter = taosHashIterate(pStmt->exec.pBlockHash, NULL);
|
||||
while (pIter) {
|
||||
STableDataBlocks* pBlock = *pIter;
|
||||
char* key = taosHashGetKey(pIter, &keyLen);
|
||||
|
|
|
@ -1547,6 +1547,14 @@ void tColDataClear(SColData *pColData) {
|
|||
pColData->nData = 0;
|
||||
}
|
||||
|
||||
void tColDataDeepClear(SColData *pColData) {
|
||||
pColData->pBitMap = NULL;
|
||||
pColData->aOffset = NULL;
|
||||
pColData->pData = NULL;
|
||||
|
||||
tColDataClear(pColData);
|
||||
}
|
||||
|
||||
static FORCE_INLINE int32_t tColDataPutValue(SColData *pColData, uint8_t *pData, uint32_t nData) {
|
||||
int32_t code = 0;
|
||||
|
||||
|
|
|
@ -142,7 +142,7 @@ int32_t insBuildOutput(SHashObj *pVgroupsHashObj, SArray *pVgDataBlocks, SArray
|
|||
void insDestroyDataBlock(STableDataBlocks *pDataBlock);
|
||||
|
||||
typedef struct SBoundColInfo {
|
||||
int32_t *pColIndex; // bound index => schema index
|
||||
int16_t *pColIndex; // bound index => schema index
|
||||
int32_t numOfCols;
|
||||
int32_t numOfBound;
|
||||
} SBoundColInfo;
|
||||
|
@ -162,7 +162,7 @@ typedef struct SVgroupDataCxt {
|
|||
|
||||
int32_t insInitBoundColsInfo(int32_t numOfBound, SBoundColInfo *pInfo);
|
||||
int32_t insGetTableDataCxt(SHashObj *pHash, void *id, int32_t idLen, STableMeta *pTableMeta,
|
||||
SVCreateTbReq **pCreateTbReq, STableDataCxt **pTableCxt);
|
||||
SVCreateTbReq **pCreateTbReq, STableDataCxt **pTableCxt, bool colMode);
|
||||
int32_t insMergeTableDataCxt(SHashObj *pTableHash, SArray **pVgDataBlocks);
|
||||
int32_t insBuildVgDataBlocks(SHashObj *pVgroupsHashObj, SArray *pVgDataBlocks, SArray **pDataBlocks);
|
||||
void insDestroyTableDataCxtHashMap(SHashObj *pTableCxtHash);
|
||||
|
|
|
@ -181,7 +181,7 @@ static int32_t parseBoundColumns(SInsertParseContext* pCxt, const char** pSql, b
|
|||
|
||||
pBoundInfo->numOfBound = 0;
|
||||
|
||||
int32_t lastColIdx = -1; // last column found
|
||||
int16_t lastColIdx = -1; // last column found
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
while (TSDB_CODE_SUCCESS == code) {
|
||||
SToken token;
|
||||
|
@ -196,8 +196,8 @@ static int32_t parseBoundColumns(SInsertParseContext* pCxt, const char** pSql, b
|
|||
token.z = tmpTokenBuf;
|
||||
token.n = strdequote(token.z);
|
||||
|
||||
int32_t t = lastColIdx + 1;
|
||||
int32_t index = insFindCol(&token, t, pBoundInfo->numOfCols, pSchema);
|
||||
int16_t t = lastColIdx + 1;
|
||||
int16_t index = insFindCol(&token, t, pBoundInfo->numOfCols, pSchema);
|
||||
if (index < 0 && t > 0) {
|
||||
index = insFindCol(&token, 0, t, pSchema);
|
||||
}
|
||||
|
@ -886,17 +886,17 @@ static int32_t preParseBoundColumnsClause(SInsertParseContext* pCxt, SVnodeModif
|
|||
|
||||
static int32_t getTableDataCxt(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt, STableDataCxt** pTableCxt) {
|
||||
if (pCxt->pComCxt->async) {
|
||||
uint64_t uid = pStmt->pTableMeta->uid;
|
||||
if (pStmt->usingTableProcessing) {
|
||||
pStmt->pTableMeta->uid = 0;
|
||||
}
|
||||
return insGetTableDataCxt(pStmt->pTableBlockHashObj, &uid, sizeof(pStmt->pTableMeta->uid), pStmt->pTableMeta,
|
||||
&pStmt->pCreateTblReq, pTableCxt);
|
||||
return insGetTableDataCxt(pStmt->pTableBlockHashObj, &pStmt->pTableMeta->uid, sizeof(pStmt->pTableMeta->uid), pStmt->pTableMeta,
|
||||
&pStmt->pCreateTblReq, pTableCxt, false);
|
||||
}
|
||||
|
||||
char tbFName[TSDB_TABLE_FNAME_LEN];
|
||||
tNameExtractFullName(&pStmt->targetTableName, tbFName);
|
||||
if (pStmt->usingTableProcessing) {
|
||||
pStmt->pTableMeta->uid = 0;
|
||||
}
|
||||
return insGetTableDataCxt(pStmt->pTableBlockHashObj, tbFName, strlen(tbFName), pStmt->pTableMeta,
|
||||
&pStmt->pCreateTblReq, pTableCxt);
|
||||
&pStmt->pCreateTblReq, pTableCxt, NULL != pCxt->pComCxt->pStmtCb);
|
||||
}
|
||||
|
||||
static int32_t parseBoundColumnsClause(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt, STableDataCxt* pTableCxt) {
|
||||
|
@ -921,6 +921,23 @@ static int32_t parseBoundColumnsClause(SInsertParseContext* pCxt, SVnodeModifOpS
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t initTableColSubmitData(STableDataCxt* pTableCxt) {
|
||||
if (0 == (pTableCxt->pData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT)) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < pTableCxt->boundColsInfo.numOfBound; ++i) {
|
||||
SSchema* pSchema = pTableCxt->pMeta->schema[pTableCxt->boundColsInfo.pColIndex[i]];
|
||||
SColData* pCol = taosArrayReserve(pTableCxt->pData->aCol, 1);
|
||||
if (NULL == pCol) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
tColDataInit(pCol, pSchema->colId, pSchema->type, 0);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
// input pStmt->pSql:
|
||||
// 1. [(tag1_name, ...)] ...
|
||||
// 2. VALUES ... | FILE ...
|
||||
|
@ -933,6 +950,9 @@ static int32_t parseSchemaClauseBottom(SInsertParseContext* pCxt, SVnodeModifOpS
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = parseBoundColumnsClause(pCxt, pStmt, *pTableCxt);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = initTableColSubmitData(*pTableCxt);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -1411,7 +1431,6 @@ static int32_t checkTableClauseFirstToken(SInsertParseContext* pCxt, SVnodeModif
|
|||
}
|
||||
|
||||
static int32_t setStmtInfo(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt) {
|
||||
#if 0
|
||||
SParsedDataColInfo* tags = taosMemoryMalloc(sizeof(pCxt->tags));
|
||||
if (NULL == tags) {
|
||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
|
@ -1426,9 +1445,6 @@ static int32_t setStmtInfo(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt)
|
|||
pStmt->pVgroupsHashObj = NULL;
|
||||
pStmt->pTableBlockHashObj = NULL;
|
||||
return code;
|
||||
#else
|
||||
return TSDB_CODE_FAILED;
|
||||
#endif
|
||||
}
|
||||
|
||||
static int32_t parseInsertBodyBottom(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt) {
|
||||
|
|
|
@ -32,20 +32,25 @@ typedef struct SKvParam {
|
|||
int32_t qBuildStmtOutput(SQuery* pQuery, SHashObj* pVgHash, SHashObj* pBlockHash) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SArray* pVgDataBlocks = NULL;
|
||||
SVnodeModifOpStmt *pStmt = (SVnodeModifOpStmt*)pQuery->pRoot;
|
||||
|
||||
// merge according to vgId
|
||||
if (taosHashGetSize(pBlockHash) > 0) {
|
||||
code = insMergeTableDataBlocks(pBlockHash, &pVgDataBlocks);
|
||||
code = insMergeTableDataCxt(pBlockHash, &pVgDataBlocks);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = insBuildOutput(pVgHash, pVgDataBlocks, &((SVnodeModifOpStmt*)pQuery->pRoot)->pDataBlocks);
|
||||
code = insBuildVgDataBlocks(pVgHash, pVgDataBlocks, &pStmt->pDataBlocks);
|
||||
}
|
||||
|
||||
if (pStmt->freeArrayFunc) {
|
||||
pStmt->freeArrayFunc(pVgDataBlocks);
|
||||
}
|
||||
insDestroyBlockArrayList(pVgDataBlocks);
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, const char* sTableName, char* tName,
|
||||
TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen) {
|
||||
STableDataBlocks* pDataBlock = (STableDataBlocks*)pBlock;
|
||||
STableDataCxt* pDataBlock = (STableDataCxt*)pBlock;
|
||||
SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen};
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SParsedDataColInfo* tags = (SParsedDataColInfo*)boundTags;
|
||||
|
@ -64,7 +69,7 @@ int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, const ch
|
|||
goto end;
|
||||
}
|
||||
|
||||
SSchema* pSchema = getTableTagSchema(pDataBlock->pTableMeta);
|
||||
SSchema* pSchema = getTableTagSchema(pDataBlock->pMeta);
|
||||
|
||||
bool isJson = false;
|
||||
STag* pTag = NULL;
|
||||
|
@ -136,10 +141,7 @@ int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, const ch
|
|||
goto end;
|
||||
}
|
||||
|
||||
SVCreateTbReq tbReq = {0};
|
||||
insBuildCreateTbReq(&tbReq, tName, pTag, suid, sTableName, tagName, pDataBlock->pTableMeta->tableInfo.numOfTags);
|
||||
code = insBuildCreateTbMsg(pDataBlock, &tbReq);
|
||||
tdDestroySVCreateTbReq(&tbReq);
|
||||
insBuildCreateTbReq(pDataBlock->pData->pCreateTbReq, tName, pTag, suid, sTableName, tagName, pDataBlock->pMeta->tableInfo.numOfTags);
|
||||
|
||||
end:
|
||||
for (int i = 0; i < taosArrayGetSize(pTagArray); ++i) {
|
||||
|
@ -155,198 +157,90 @@ end:
|
|||
}
|
||||
|
||||
int32_t qBindStmtColsValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen) {
|
||||
STableDataBlocks* pDataBlock = (STableDataBlocks*)pBlock;
|
||||
SSchema* pSchema = getTableColumnSchema(pDataBlock->pTableMeta);
|
||||
int32_t extendedRowSize = insGetExtendedRowSize(pDataBlock);
|
||||
SParsedDataColInfo* spd = &pDataBlock->boundColumnInfo;
|
||||
SRowBuilder* pBuilder = &pDataBlock->rowBuilder;
|
||||
SMemParam param = {.rb = pBuilder};
|
||||
STableDataCxt* pDataBlock = (STableDataCxt*)pBlock;
|
||||
SSchema* pSchema = getTableColumnSchema(pDataBlock->pMeta);
|
||||
SBoundColInfo* boundInfo = &pDataBlock->boundColsInfo;
|
||||
SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen};
|
||||
int32_t rowNum = bind->num;
|
||||
|
||||
CHECK_CODE(
|
||||
insInitRowBuilder(&pDataBlock->rowBuilder, pDataBlock->pTableMeta->sversion, &pDataBlock->boundColumnInfo));
|
||||
for (int c = 0; c < boundInfo->numOfBound; ++c) {
|
||||
SSchema* pColSchema = &pSchema[boundInfo->pColIndex[c]];
|
||||
SColData* pCol = taosArrayGet(pDataBlock->pData->aCol, c);
|
||||
|
||||
CHECK_CODE(insAllocateMemForSize(pDataBlock, extendedRowSize * bind->num));
|
||||
|
||||
for (int32_t r = 0; r < bind->num; ++r) {
|
||||
STSRow* row = (STSRow*)(pDataBlock->pData + pDataBlock->size); // skip the SSubmitBlk header
|
||||
tdSRowResetBuf(pBuilder, row);
|
||||
|
||||
for (int c = 0; c < spd->numOfBound; ++c) {
|
||||
SSchema* pColSchema = &pSchema[spd->boundColumns[c]];
|
||||
|
||||
if (bind[c].num != rowNum) {
|
||||
return buildInvalidOperationMsg(&pBuf, "row number in each bind param should be the same");
|
||||
}
|
||||
|
||||
param.schema = pColSchema;
|
||||
insGetSTSRowAppendInfo(pBuilder->rowType, spd, c, ¶m.toffset, ¶m.colIdx);
|
||||
|
||||
if (bind[c].is_null && bind[c].is_null[r]) {
|
||||
if (pColSchema->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
|
||||
return buildInvalidOperationMsg(&pBuf, "primary timestamp should not be NULL");
|
||||
}
|
||||
|
||||
CHECK_CODE(insMemRowAppend(&pBuf, NULL, 0, ¶m));
|
||||
} else {
|
||||
if (bind[c].buffer_type != pColSchema->type) {
|
||||
return buildInvalidOperationMsg(&pBuf, "column type mis-match with buffer type");
|
||||
}
|
||||
|
||||
int32_t colLen = pColSchema->bytes;
|
||||
if (IS_VAR_DATA_TYPE(pColSchema->type)) {
|
||||
colLen = bind[c].length[r];
|
||||
}
|
||||
|
||||
CHECK_CODE(insMemRowAppend(&pBuf, (char*)bind[c].buffer + bind[c].buffer_length * r, colLen, ¶m));
|
||||
}
|
||||
|
||||
if (PRIMARYKEY_TIMESTAMP_COL_ID == pColSchema->colId) {
|
||||
TSKEY tsKey = TD_ROW_KEY(row);
|
||||
insCheckTimestamp(pDataBlock, (const char*)&tsKey);
|
||||
}
|
||||
}
|
||||
// set the null value for the columns that do not assign values
|
||||
if ((spd->numOfBound < spd->numOfCols) && TD_IS_TP_ROW(row)) {
|
||||
pBuilder->hasNone = true;
|
||||
}
|
||||
tdSRowEnd(pBuilder);
|
||||
#ifdef TD_DEBUG_PRINT_ROW
|
||||
STSchema* pSTSchema = tBuildTSchema(pSchema, spd->numOfCols, 1);
|
||||
tdSRowPrint(row, pSTSchema, __func__);
|
||||
taosMemoryFree(pSTSchema);
|
||||
#endif
|
||||
pDataBlock->size += extendedRowSize;
|
||||
}
|
||||
|
||||
SSubmitBlk* pBlocks = (SSubmitBlk*)(pDataBlock->pData);
|
||||
return insSetBlockInfo(pBlocks, pDataBlock, bind->num, &pBuf);
|
||||
}
|
||||
|
||||
int32_t qBindStmtSingleColValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen, int32_t colIdx,
|
||||
int32_t rowNum) {
|
||||
STableDataBlocks* pDataBlock = (STableDataBlocks*)pBlock;
|
||||
SSchema* pSchema = getTableColumnSchema(pDataBlock->pTableMeta);
|
||||
int32_t extendedRowSize = insGetExtendedRowSize(pDataBlock);
|
||||
SParsedDataColInfo* spd = &pDataBlock->boundColumnInfo;
|
||||
SRowBuilder* pBuilder = &pDataBlock->rowBuilder;
|
||||
SMemParam param = {.rb = pBuilder};
|
||||
SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen};
|
||||
bool rowStart = (0 == colIdx);
|
||||
bool rowEnd = ((colIdx + 1) == spd->numOfBound);
|
||||
|
||||
if (rowStart) {
|
||||
CHECK_CODE(
|
||||
insInitRowBuilder(&pDataBlock->rowBuilder, pDataBlock->pTableMeta->sversion, &pDataBlock->boundColumnInfo));
|
||||
CHECK_CODE(insAllocateMemForSize(pDataBlock, extendedRowSize * bind->num));
|
||||
}
|
||||
|
||||
for (int32_t r = 0; r < bind->num; ++r) {
|
||||
STSRow* row = (STSRow*)(pDataBlock->pData + pDataBlock->size + extendedRowSize * r); // skip the SSubmitBlk header
|
||||
if (rowStart) {
|
||||
tdSRowResetBuf(pBuilder, row);
|
||||
} else {
|
||||
tdSRowGetBuf(pBuilder, row);
|
||||
}
|
||||
|
||||
SSchema* pColSchema = &pSchema[spd->boundColumns[colIdx]];
|
||||
|
||||
if (bind->num != rowNum) {
|
||||
if (bind[c].num != rowNum) {
|
||||
return buildInvalidOperationMsg(&pBuf, "row number in each bind param should be the same");
|
||||
}
|
||||
|
||||
param.schema = pColSchema;
|
||||
insGetSTSRowAppendInfo(pBuilder->rowType, spd, colIdx, ¶m.toffset, ¶m.colIdx);
|
||||
|
||||
if (bind->is_null && bind->is_null[r]) {
|
||||
if (pColSchema->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
|
||||
return buildInvalidOperationMsg(&pBuf, "primary timestamp should not be NULL");
|
||||
}
|
||||
|
||||
CHECK_CODE(insMemRowAppend(&pBuf, NULL, 0, ¶m));
|
||||
} else {
|
||||
if (bind->buffer_type != pColSchema->type) {
|
||||
return buildInvalidOperationMsg(&pBuf, "column type mis-match with buffer type");
|
||||
}
|
||||
|
||||
int32_t colLen = pColSchema->bytes;
|
||||
if (IS_VAR_DATA_TYPE(pColSchema->type)) {
|
||||
colLen = bind->length[r];
|
||||
}
|
||||
|
||||
CHECK_CODE(insMemRowAppend(&pBuf, (char*)bind->buffer + bind->buffer_length * r, colLen, ¶m));
|
||||
if (bind[c].buffer_type != pColSchema->type) {
|
||||
return buildInvalidOperationMsg(&pBuf, "column type mis-match with buffer type");
|
||||
}
|
||||
|
||||
if (PRIMARYKEY_TIMESTAMP_COL_ID == pColSchema->colId) {
|
||||
TSKEY tsKey = TD_ROW_KEY(row);
|
||||
insCheckTimestamp(pDataBlock, (const char*)&tsKey);
|
||||
}
|
||||
|
||||
// set the null value for the columns that do not assign values
|
||||
if (rowEnd && (spd->numOfBound < spd->numOfCols) && TD_IS_TP_ROW(row)) {
|
||||
pBuilder->hasNone = true;
|
||||
}
|
||||
if (rowEnd) {
|
||||
tdSRowEnd(pBuilder);
|
||||
}
|
||||
#ifdef TD_DEBUG_PRINT_ROW
|
||||
if (rowEnd) {
|
||||
STSchema* pSTSchema = tBuildTSchema(pSchema, spd->numOfCols, 1);
|
||||
tdSRowPrint(row, pSTSchema, __func__);
|
||||
taosMemoryFree(pSTSchema);
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
if (rowEnd) {
|
||||
pDataBlock->size += extendedRowSize * bind->num;
|
||||
|
||||
SSubmitBlk* pBlocks = (SSubmitBlk*)(pDataBlock->pData);
|
||||
CHECK_CODE(insSetBlockInfo(pBlocks, pDataBlock, bind->num, &pBuf));
|
||||
tColDataAddValueByBind(pCol, bind + c);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t buildBoundFields(SParsedDataColInfo* boundInfo, SSchema* pSchema, int32_t* fieldNum, TAOS_FIELD_E** fields,
|
||||
int32_t qBindStmtSingleColValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen, int32_t colIdx,
|
||||
int32_t rowNum) {
|
||||
STableDataCxt* pDataBlock = (STableDataCxt*)pBlock;
|
||||
SSchema* pSchema = getTableColumnSchema(pDataBlock->pMeta);
|
||||
SBoundColInfo* boundInfo = &pDataBlock->boundColsInfo;
|
||||
SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen};
|
||||
SSchema* pColSchema = &pSchema[boundInfo->pColIndex[colIdx]];
|
||||
SColData* pCol = taosArrayGet(pDataBlock->pData->aCol, colIdx);
|
||||
|
||||
if (bind[colIdx].num != rowNum) {
|
||||
return buildInvalidOperationMsg(&pBuf, "row number in each bind param should be the same");
|
||||
}
|
||||
|
||||
if (bind[colIdx].buffer_type != pColSchema->type) {
|
||||
return buildInvalidOperationMsg(&pBuf, "column type mis-match with buffer type");
|
||||
}
|
||||
|
||||
tColDataAddValueByBind(pCol, bind);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t buildBoundFields(int32_t numOfBound, int16_t boundColumns, SSchema* pSchema, int32_t* fieldNum, TAOS_FIELD_E** fields,
|
||||
uint8_t timePrec) {
|
||||
if (fields) {
|
||||
*fields = taosMemoryCalloc(boundInfo->numOfBound, sizeof(TAOS_FIELD));
|
||||
*fields = taosMemoryCalloc(numOfBound, sizeof(TAOS_FIELD));
|
||||
if (NULL == *fields) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
SSchema* schema = &pSchema[boundInfo->boundColumns[0]];
|
||||
SSchema* schema = &pSchema[boundColumns[0]];
|
||||
if (TSDB_DATA_TYPE_TIMESTAMP == schema->type) {
|
||||
(*fields)[0].precision = timePrec;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < boundInfo->numOfBound; ++i) {
|
||||
schema = &pSchema[boundInfo->boundColumns[i]];
|
||||
for (int32_t i = 0; i < numOfBound; ++i) {
|
||||
schema = &pSchema[boundColumns[i]];
|
||||
strcpy((*fields)[i].name, schema->name);
|
||||
(*fields)[i].type = schema->type;
|
||||
(*fields)[i].bytes = schema->bytes;
|
||||
}
|
||||
}
|
||||
|
||||
*fieldNum = boundInfo->numOfBound;
|
||||
*fieldNum = numOfBound;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qBuildStmtTagFields(void* pBlock, void* boundTags, int32_t* fieldNum, TAOS_FIELD_E** fields) {
|
||||
STableDataBlocks* pDataBlock = (STableDataBlocks*)pBlock;
|
||||
STableDataCxt* pDataBlock = (STableDataCxt*)pBlock;
|
||||
SParsedDataColInfo* tags = (SParsedDataColInfo*)boundTags;
|
||||
if (NULL == tags) {
|
||||
return TSDB_CODE_QRY_APP_ERROR;
|
||||
}
|
||||
|
||||
if (pDataBlock->pTableMeta->tableType != TSDB_SUPER_TABLE && pDataBlock->pTableMeta->tableType != TSDB_CHILD_TABLE) {
|
||||
if (pDataBlock->pMeta->tableType != TSDB_SUPER_TABLE && pDataBlock->pMeta->tableType != TSDB_CHILD_TABLE) {
|
||||
return TSDB_CODE_TSC_STMT_API_ERROR;
|
||||
}
|
||||
|
||||
SSchema* pSchema = getTableTagSchema(pDataBlock->pTableMeta);
|
||||
SSchema* pSchema = getTableTagSchema(pDataBlock->pMeta);
|
||||
if (tags->numOfBound <= 0) {
|
||||
*fieldNum = 0;
|
||||
*fields = NULL;
|
||||
|
@ -354,15 +248,15 @@ int32_t qBuildStmtTagFields(void* pBlock, void* boundTags, int32_t* fieldNum, TA
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
CHECK_CODE(buildBoundFields(tags, pSchema, fieldNum, fields, 0));
|
||||
CHECK_CODE(buildBoundFields(tags->numOfBound, tags->boundColumns, pSchema, fieldNum, fields, 0));
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qBuildStmtColFields(void* pBlock, int32_t* fieldNum, TAOS_FIELD_E** fields) {
|
||||
STableDataBlocks* pDataBlock = (STableDataBlocks*)pBlock;
|
||||
SSchema* pSchema = getTableColumnSchema(pDataBlock->pTableMeta);
|
||||
if (pDataBlock->boundColumnInfo.numOfBound <= 0) {
|
||||
STableDataCxt* pDataBlock = (STableDataCxt*)pBlock;
|
||||
SSchema* pSchema = getTableColumnSchema(pDataBlock->pMeta);
|
||||
if (pDataBlock->boundColsInfo.numOfBound <= 0) {
|
||||
*fieldNum = 0;
|
||||
if (fields) {
|
||||
*fields = NULL;
|
||||
|
@ -371,107 +265,113 @@ int32_t qBuildStmtColFields(void* pBlock, int32_t* fieldNum, TAOS_FIELD_E** fiel
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
CHECK_CODE(buildBoundFields(&pDataBlock->boundColumnInfo, pSchema, fieldNum, fields,
|
||||
pDataBlock->pTableMeta->tableInfo.precision));
|
||||
CHECK_CODE(buildBoundFields(&pDataBlock->boundColsInfo.numOfBound, pDataBlock->boundColsInfo.pColIndex, pSchema, fieldNum, fields,
|
||||
pDataBlock->pMeta->tableInfo.precision));
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qResetStmtDataBlock(void* block, bool keepBuf) {
|
||||
STableDataBlocks* pBlock = (STableDataBlocks*)block;
|
||||
int32_t qResetStmtDataBlock(void* block, bool deepClear) {
|
||||
STableDataCxt* pBlock = (STableDataCxt*)block;
|
||||
int32_t colNum = taosArrayGetSize(pBlock->pData->aCol);
|
||||
|
||||
if (keepBuf) {
|
||||
taosMemoryFreeClear(pBlock->pData);
|
||||
pBlock->pData = taosMemoryMalloc(TSDB_PAYLOAD_SIZE);
|
||||
if (NULL == pBlock->pData) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
for (int32_t i = 0; i < colNum; ++i) {
|
||||
SColData *pCol = (SColData*)taosArrayGet(pBlock->pData->aCol, i);
|
||||
if (deepClear) {
|
||||
tColDataDeepClear(pCol);
|
||||
} else {
|
||||
tColDataClear(pCol);
|
||||
}
|
||||
memset(pBlock->pData, 0, sizeof(SSubmitBlk));
|
||||
} else {
|
||||
pBlock->pData = NULL;
|
||||
}
|
||||
|
||||
pBlock->ordered = true;
|
||||
pBlock->prevTS = INT64_MIN;
|
||||
pBlock->size = sizeof(SSubmitBlk);
|
||||
pBlock->tsSource = -1;
|
||||
pBlock->numOfTables = 1;
|
||||
pBlock->nAllocSize = TSDB_PAYLOAD_SIZE;
|
||||
pBlock->headerSize = pBlock->size;
|
||||
pBlock->createTbReqLen = 0;
|
||||
|
||||
memset(&pBlock->rowBuilder, 0, sizeof(pBlock->rowBuilder));
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qCloneStmtDataBlock(void** pDst, void* pSrc) {
|
||||
*pDst = taosMemoryMalloc(sizeof(STableDataBlocks));
|
||||
int32_t qCloneStmtDataBlock(void** pDst, void* pSrc, bool reset) {
|
||||
int32_t code = 0;
|
||||
|
||||
*pDst = taosMemoryCalloc(1, sizeof(STableDataCxt));
|
||||
if (NULL == *pDst) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
memcpy(*pDst, pSrc, sizeof(STableDataBlocks));
|
||||
((STableDataBlocks*)(*pDst))->cloned = true;
|
||||
STableDataCxt* pNewCxt = (STableDataCxt*)*pDst;
|
||||
STableDataCxt* pCxt = (STableDataCxt*)pSrc;
|
||||
pNewCxt->pSchema = NULL;
|
||||
pNewCxt->pValues = NULL;
|
||||
|
||||
STableDataBlocks* pBlock = (STableDataBlocks*)(*pDst);
|
||||
if (pBlock->pTableMeta) {
|
||||
void* pNewMeta = taosMemoryMalloc(TABLE_META_SIZE(pBlock->pTableMeta));
|
||||
if (pCxt->pMeta) {
|
||||
void* pNewMeta = taosMemoryMalloc(TABLE_META_SIZE(pCxt->pMeta));
|
||||
if (NULL == pNewMeta) {
|
||||
taosMemoryFreeClear(*pDst);
|
||||
insDestroyTableDataCxt(*pDst);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
memcpy(pNewMeta, pBlock->pTableMeta, TABLE_META_SIZE(pBlock->pTableMeta));
|
||||
pBlock->pTableMeta = pNewMeta;
|
||||
memcpy(pNewMeta, pCxt->pMeta, TABLE_META_SIZE(pCxt->pMeta));
|
||||
pNewCxt->pMeta = pNewMeta;
|
||||
}
|
||||
|
||||
return qResetStmtDataBlock(*pDst, false);
|
||||
memcpy(&pNewCxt->boundColsInfo, &pCxt->boundColsInfo, sizeof(pCxt->boundColsInfo));
|
||||
pNewCxt->boundColsInfo.pColIndex = NULL;
|
||||
|
||||
if (pCxt->boundColsInfo.pColIndex) {
|
||||
void* pNewColIdx = taosMemoryMalloc(pCxt->boundColsInfo.numOfBound * sizeof(*pCxt->boundColsInfo.pColIndex));
|
||||
if (NULL == pNewColIdx) {
|
||||
insDestroyTableDataCxt(*pDst);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
memcpy(pNewColIdx, pCxt->boundColsInfo.pColIndex, pCxt->boundColsInfo.numOfBound * sizeof(*pCxt->boundColsInfo.pColIndex));
|
||||
pNewCxt->boundColsInfo.pColIndex = pNewColIdx;
|
||||
}
|
||||
|
||||
if (pCxt->pData) {
|
||||
SSubmitTbData *pNewTb = (SSubmitTbData*)taosMemoryMalloc(sizeof(SSubmitTbData));
|
||||
if (NULL == pNewTb) {
|
||||
insDestroyTableDataCxt(*pDst);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
memcpy(pNewTb, pCxt->pData, sizeof(*pCxt->pData));
|
||||
pNewTb->pCreateTbReq = NULL;
|
||||
|
||||
pNewTb->aCol = taosArrayDup(pCxt->pData->aCol, NULL);
|
||||
if (NULL == pNewTb) {
|
||||
insDestroyTableDataCxt(*pDst);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
if (reset) {
|
||||
code = qResetStmtDataBlock(*pDst, true);
|
||||
}
|
||||
|
||||
pNewCxt->pData = pNewTb;
|
||||
}
|
||||
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t qRebuildStmtDataBlock(void** pDst, void* pSrc, uint64_t uid, int32_t vgId) {
|
||||
int32_t code = qCloneStmtDataBlock(pDst, pSrc);
|
||||
int32_t code = qCloneStmtDataBlock(pDst, pSrc, false);
|
||||
if (code) {
|
||||
return code;
|
||||
}
|
||||
|
||||
STableDataBlocks* pBlock = (STableDataBlocks*)*pDst;
|
||||
pBlock->pData = taosMemoryMalloc(pBlock->nAllocSize);
|
||||
if (NULL == pBlock->pData) {
|
||||
qFreeStmtDataBlock(pBlock);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
STableDataCxt* pBlock = (STableDataCxt*)*pDst;
|
||||
if (pBlock->pMeta) {
|
||||
pBlock->pMeta->uid = uid;
|
||||
pBlock->pMeta->vgId = vgId;
|
||||
}
|
||||
|
||||
pBlock->vgId = vgId;
|
||||
|
||||
if (pBlock->pTableMeta) {
|
||||
pBlock->pTableMeta->uid = uid;
|
||||
pBlock->pTableMeta->vgId = vgId;
|
||||
}
|
||||
|
||||
memset(pBlock->pData, 0, sizeof(SSubmitBlk));
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
STableMeta* qGetTableMetaInDataBlock(void* pDataBlock) { return ((STableDataBlocks*)pDataBlock)->pTableMeta; }
|
||||
|
||||
void qFreeStmtDataBlock(void* pDataBlock) {
|
||||
if (pDataBlock == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
taosMemoryFreeClear(((STableDataBlocks*)pDataBlock)->pTableMeta);
|
||||
taosMemoryFreeClear(((STableDataBlocks*)pDataBlock)->pData);
|
||||
taosMemoryFreeClear(pDataBlock);
|
||||
}
|
||||
STableMeta* qGetTableMetaInDataBlock(void* pDataBlock) { return ((STableDataCxt*)pDataBlock)->pMeta; }
|
||||
|
||||
void qDestroyStmtDataBlock(void* pBlock) {
|
||||
if (pBlock == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
STableDataBlocks* pDataBlock = (STableDataBlocks*)pBlock;
|
||||
|
||||
pDataBlock->cloned = false;
|
||||
insDestroyDataBlock(pDataBlock);
|
||||
STableDataCxt* pDataBlock = (STableDataCxt*)pBlock;
|
||||
insDestroyTableDataCxt(pDataBlock);
|
||||
}
|
||||
|
|
|
@ -839,7 +839,7 @@ int32_t insCreateSName(SName* pName, SToken* pTableName, int32_t acctId, const c
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t insFindCol(SToken* pColname, int32_t start, int32_t end, SSchema* pSchema) {
|
||||
int16_t insFindCol(SToken* pColname, int16_t start, int16_t end, SSchema* pSchema) {
|
||||
while (start < end) {
|
||||
if (strlen(pSchema[start].name) == pColname->n && strncmp(pColname->z, pSchema[start].name, pColname->n) == 0) {
|
||||
return start;
|
||||
|
@ -956,7 +956,7 @@ int32_t insBuildOutput(SHashObj* pVgroupsHashObj, SArray* pVgDataBlocks, SArray*
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static void initBoundCols(int32_t ncols, int32_t* pBoundCols) {
|
||||
static void initBoundCols(int32_t ncols, int16_t* pBoundCols) {
|
||||
for (int32_t i = 0; i < ncols; ++i) {
|
||||
pBoundCols[i] = i;
|
||||
}
|
||||
|
@ -973,7 +973,7 @@ static void initColValues(STableMeta* pTableMeta, SArray* pValues) {
|
|||
int32_t insInitBoundColsInfo(int32_t numOfBound, SBoundColInfo* pInfo) {
|
||||
pInfo->numOfCols = numOfBound;
|
||||
pInfo->numOfBound = numOfBound;
|
||||
pInfo->pColIndex = taosMemoryCalloc(numOfBound, sizeof(int32_t));
|
||||
pInfo->pColIndex = taosMemoryCalloc(numOfBound, sizeof(int16_t));
|
||||
if (NULL == pInfo->pColIndex) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
@ -983,7 +983,7 @@ int32_t insInitBoundColsInfo(int32_t numOfBound, SBoundColInfo* pInfo) {
|
|||
|
||||
static void destroyBoundColInfo(SBoundColInfo* pInfo) { taosMemoryFreeClear(pInfo->pColIndex); }
|
||||
|
||||
static int32_t createTableDataCxt(STableMeta* pTableMeta, SVCreateTbReq** pCreateTbReq, STableDataCxt** pOutput) {
|
||||
static int32_t createTableDataCxt(STableMeta* pTableMeta, SVCreateTbReq** pCreateTbReq, STableDataCxt** pOutput, bool colMode) {
|
||||
STableDataCxt* pTableCxt = taosMemoryCalloc(1, sizeof(STableDataCxt));
|
||||
if (NULL == pTableCxt) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -1019,14 +1019,22 @@ static int32_t createTableDataCxt(STableMeta* pTableMeta, SVCreateTbReq** pCreat
|
|||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
} else {
|
||||
pTableCxt->pData->flags = NULL != *pCreateTbReq ? SUBMIT_REQ_AUTO_CREATE_TABLE : 0;
|
||||
pTableCxt->pData->flags |= colMode ? SUBMIT_REQ_COLUMN_DATA_FORMAT : 0;
|
||||
pTableCxt->pData->suid = pTableMeta->suid;
|
||||
pTableCxt->pData->uid = pTableMeta->uid;
|
||||
pTableCxt->pData->sver = pTableMeta->sversion;
|
||||
pTableCxt->pData->pCreateTbReq = *pCreateTbReq;
|
||||
*pCreateTbReq = NULL;
|
||||
pTableCxt->pData->aRowP = taosArrayInit(128, POINTER_BYTES);
|
||||
if (NULL == pTableCxt->pData->aRowP) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
if (pTableCxt->pData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
|
||||
pTableCxt->pData->aCol = taosArrayInit(128, sizeof(SColData));
|
||||
if (NULL == pTableCxt->pData->aCol) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
} else {
|
||||
pTableCxt->pData->aRowP = taosArrayInit(128, POINTER_BYTES);
|
||||
if (NULL == pTableCxt->pData->aRowP) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1041,12 +1049,12 @@ static int32_t createTableDataCxt(STableMeta* pTableMeta, SVCreateTbReq** pCreat
|
|||
}
|
||||
|
||||
int32_t insGetTableDataCxt(SHashObj* pHash, void* id, int32_t idLen, STableMeta* pTableMeta,
|
||||
SVCreateTbReq** pCreateTbReq, STableDataCxt** pTableCxt) {
|
||||
SVCreateTbReq** pCreateTbReq, STableDataCxt** pTableCxt, bool colMode) {
|
||||
*pTableCxt = taosHashGet(pHash, id, idLen);
|
||||
if (NULL != *pTableCxt) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
int32_t code = createTableDataCxt(pTableMeta, pCreateTbReq, pTableCxt);
|
||||
int32_t code = createTableDataCxt(pTableMeta, pCreateTbReq, pTableCxt, colMode);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = taosHashPut(pHash, id, idLen, pTableCxt, POINTER_BYTES);
|
||||
}
|
||||
|
@ -1162,6 +1170,18 @@ static int32_t createVgroupDataCxt(STableDataCxt* pTableCxt, SHashObj* pVgroupHa
|
|||
return code;
|
||||
}
|
||||
|
||||
int insColDataComp(const void* lp, const void* rp) {
|
||||
SColData* pLeft = (SColData*)lp;
|
||||
SColData* pRight = (SColData*)rp;
|
||||
if (pLeft->cid < pRight->cid) {
|
||||
return -1;
|
||||
} else if (pLeft->cid > pRight->cid) {
|
||||
return 1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t insMergeTableDataCxt(SHashObj* pTableHash, SArray** pVgDataBlocks) {
|
||||
SHashObj* pVgroupHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false);
|
||||
SArray* pVgroupList = taosArrayInit(8, POINTER_BYTES);
|
||||
|
@ -1172,11 +1192,31 @@ int32_t insMergeTableDataCxt(SHashObj* pTableHash, SArray** pVgDataBlocks) {
|
|||
}
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
bool colFormat = false;
|
||||
|
||||
void* p = taosHashIterate(pTableHash, NULL);
|
||||
if (p) {
|
||||
STableDataCxt* pTableCxt = *(STableDataCxt**)p;
|
||||
colFormat = (0 != (pTableCxt->pData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT));
|
||||
}
|
||||
|
||||
while (TSDB_CODE_SUCCESS == code && NULL != p) {
|
||||
STableDataCxt* pTableCxt = *(STableDataCxt**)p;
|
||||
code = tRowMergeSort(pTableCxt->pData->aRowP, pTableCxt->pSchema, 0);
|
||||
if (colFormat) {
|
||||
taosArraySort(pTableCxt->pData->aCol, insColDataComp);
|
||||
|
||||
int32_t colNum = taosArrayGetSize(pTableCxt->pData->aCol);
|
||||
for (int32_t i = 0; i < colNum; ++i) {
|
||||
SColData *pCol = taosArrayGet(pTableCxt->pData->aCol, i);
|
||||
code = tColDataSortMerge(pCol);
|
||||
if (code) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
code = tRowMergeSort(pTableCxt->pData->aRowP, pTableCxt->pSchema, 0);
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
SVgroupDataCxt* pVgCxt = NULL;
|
||||
int32_t vgId = pTableCxt->pMeta->vgId;
|
||||
|
|
Loading…
Reference in New Issue