enh: support column mode write

This commit is contained in:
dapan1121 2022-11-30 19:08:48 +08:00
parent 366f246fc6
commit f7d94f73f5
6 changed files with 19 additions and 25 deletions

View File

@ -86,7 +86,7 @@ void qCleanupKeywordsTable();
int32_t qBuildStmtOutput(SQuery* pQuery, SHashObj* pVgHash, SHashObj* pBlockHash); int32_t qBuildStmtOutput(SQuery* pQuery, SHashObj* pVgHash, SHashObj* pBlockHash);
int32_t qResetStmtDataBlock(void* block, bool keepBuf); int32_t qResetStmtDataBlock(void* block, bool keepBuf);
int32_t qCloneStmtDataBlock(void** pDst, void* pSrc); int32_t qCloneStmtDataBlock(void** pDst, void* pSrc, bool reset);
int32_t qRebuildStmtDataBlock(void** pDst, void* pSrc, uint64_t uid, int32_t vgId); int32_t qRebuildStmtDataBlock(void** pDst, void* pSrc, uint64_t uid, int32_t vgId);
void qDestroyStmtDataBlock(void* pBlock); void qDestroyStmtDataBlock(void* pBlock);
STableMeta* qGetTableMetaInDataBlock(void* pDataBlock); STableMeta* qGetTableMetaInDataBlock(void* pDataBlock);

View File

@ -214,16 +214,16 @@ int32_t stmtCacheBlock(STscStmt* pStmt) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
STableDataBlocks** pSrc = taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName)); STableDataCxt** pSrc = taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
if (!pSrc) { if (!pSrc) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
STableDataBlocks* pDst = NULL; STableDataCxt* pDst = NULL;
STMT_ERR_RET(qCloneStmtDataBlock(&pDst, *pSrc)); STMT_ERR_RET(qCloneStmtDataBlock(&pDst, *pSrc, true));
SStmtTableCache cache = { SStmtTableCache cache = {
.pDataBlock = pDst, .pDataCtx = pDst,
.boundTags = pStmt->bInfo.boundTags, .boundTags = pStmt->bInfo.boundTags,
}; };
@ -289,7 +289,7 @@ int32_t stmtCleanExecInfo(STscStmt* pStmt, bool keepTable, bool deepClean) {
size_t keyLen = 0; size_t keyLen = 0;
void* pIter = taosHashIterate(pStmt->exec.pBlockHash, NULL); void* pIter = taosHashIterate(pStmt->exec.pBlockHash, NULL);
while (pIter) { while (pIter) {
STableDataCxt* pBlocks = *(STableDataCxt*)pIter; STableDataCxt* pBlocks = *(STableDataCxt**)pIter;
char* key = taosHashGetKey(pIter, &keyLen); char* key = taosHashGetKey(pIter, &keyLen);
STableMeta* pMeta = qGetTableMetaInDataBlock(pBlocks); STableMeta* pMeta = qGetTableMetaInDataBlock(pBlocks);
@ -333,7 +333,7 @@ int32_t stmtCleanSQLInfo(STscStmt* pStmt) {
while (pIter) { while (pIter) {
SStmtTableCache* pCache = (SStmtTableCache*)pIter; SStmtTableCache* pCache = (SStmtTableCache*)pIter;
qDestroyStmtDataBlock(pCache->pDataBlock); qDestroyStmtDataBlock(pCache->pDataCtx);
destroyBoundColumnInfo(pCache->boundTags); destroyBoundColumnInfo(pCache->boundTags);
taosMemoryFreeClear(pCache->boundTags); taosMemoryFreeClear(pCache->boundTags);
@ -775,9 +775,9 @@ int stmtUpdateTableUid(STscStmt* pStmt, SSubmitRsp* pRsp) {
int32_t code = 0; int32_t code = 0;
int32_t finalCode = 0; int32_t finalCode = 0;
size_t keyLen = 0; size_t keyLen = 0;
STableDataCxt** pIter = taosHashIterate(pStmt->exec.pBlockHash, NULL); void* pIter = taosHashIterate(pStmt->exec.pBlockHash, NULL);
while (pIter) { while (pIter) {
STableDataBlocks* pBlock = *pIter; STableDataCxt* pBlock = *(STableDataCxt**)pIter;
char* key = taosHashGetKey(pIter, &keyLen); char* key = taosHashGetKey(pIter, &keyLen);
STableMeta* pMeta = qGetTableMetaInDataBlock(pBlock); STableMeta* pMeta = qGetTableMetaInDataBlock(pBlock);

View File

@ -133,7 +133,7 @@ int32_t insMergeTableDataBlocks(SHashObj *pHashObj, SArray **pVgDataBlocks);
int32_t insBuildCreateTbMsg(STableDataBlocks *pBlocks, SVCreateTbReq *pCreateTbReq); int32_t insBuildCreateTbMsg(STableDataBlocks *pBlocks, SVCreateTbReq *pCreateTbReq);
int32_t insAllocateMemForSize(STableDataBlocks *pDataBlock, int32_t allSize); int32_t insAllocateMemForSize(STableDataBlocks *pDataBlock, int32_t allSize);
int32_t insCreateSName(SName *pName, struct SToken *pTableName, int32_t acctId, const char *dbName, SMsgBuf *pMsgBuf); int32_t insCreateSName(SName *pName, struct SToken *pTableName, int32_t acctId, const char *dbName, SMsgBuf *pMsgBuf);
int32_t insFindCol(struct SToken *pColname, int32_t start, int32_t end, SSchema *pSchema); int16_t insFindCol(struct SToken *pColname, int16_t start, int16_t end, SSchema *pSchema);
void insBuildCreateTbReq(SVCreateTbReq *pTbReq, const char *tname, STag *pTag, int64_t suid, const char *sname, void insBuildCreateTbReq(SVCreateTbReq *pTbReq, const char *tname, STag *pTag, int64_t suid, const char *sname,
SArray *tagName, uint8_t tagNum, int32_t ttl); SArray *tagName, uint8_t tagNum, int32_t ttl);
int32_t insMemRowAppend(SMsgBuf *pMsgBuf, const void *value, int32_t len, void *param); int32_t insMemRowAppend(SMsgBuf *pMsgBuf, const void *value, int32_t len, void *param);
@ -173,5 +173,6 @@ void insDestroyTableDataCxtHashMap(SHashObj *pTableCxtHash);
void insDestroyVgroupDataCxt(SVgroupDataCxt *pVgCxt); void insDestroyVgroupDataCxt(SVgroupDataCxt *pVgCxt);
void insDestroyVgroupDataCxtList(SArray *pVgCxtList); void insDestroyVgroupDataCxtList(SArray *pVgCxtList);
void insDestroyVgroupDataCxtHashMap(SHashObj *pVgCxtHash); void insDestroyVgroupDataCxtHashMap(SHashObj *pVgCxtHash);
void insDestroyTableDataCxt(STableDataCxt* pTableCxt);
#endif // TDENGINE_PAR_INSERT_UTIL_H #endif // TDENGINE_PAR_INSERT_UTIL_H

View File

@ -928,7 +928,7 @@ int32_t initTableColSubmitData(STableDataCxt* pTableCxt) {
} }
for (int32_t i = 0; i < pTableCxt->boundColsInfo.numOfBound; ++i) { for (int32_t i = 0; i < pTableCxt->boundColsInfo.numOfBound; ++i) {
SSchema* pSchema = pTableCxt->pMeta->schema[pTableCxt->boundColsInfo.pColIndex[i]]; SSchema* pSchema = &pTableCxt->pMeta->schema[pTableCxt->boundColsInfo.pColIndex[i]];
SColData* pCol = taosArrayReserve(pTableCxt->pData->aCol, 1); SColData* pCol = taosArrayReserve(pTableCxt->pData->aCol, 1);
if (NULL == pCol) { if (NULL == pCol) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
@ -1193,7 +1193,7 @@ static int parseOneRow(SInsertParseContext* pCxt, const char** pSql, STableDataC
} }
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code && !isParseBindParam) {
SRow** pRow = taosArrayReserve(pTableCxt->pData->aRowP, 1); SRow** pRow = taosArrayReserve(pTableCxt->pData->aRowP, 1);
code = tRowBuild(pTableCxt->pValues, pTableCxt->pSchema, pRow); code = tRowBuild(pTableCxt->pValues, pTableCxt->pSchema, pRow);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {

View File

@ -141,7 +141,7 @@ int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, const ch
goto end; goto end;
} }
insBuildCreateTbReq(pDataBlock->pData->pCreateTbReq, tName, pTag, suid, sTableName, tagName, pDataBlock->pMeta->tableInfo.numOfTags); insBuildCreateTbReq(pDataBlock->pData->pCreateTbReq, tName, pTag, suid, sTableName, tagName, pDataBlock->pMeta->tableInfo.numOfTags, TSDB_DEFAULT_TABLE_TTL);
end: end:
for (int i = 0; i < taosArrayGetSize(pTagArray); ++i) { for (int i = 0; i < taosArrayGetSize(pTagArray); ++i) {
@ -203,7 +203,7 @@ int32_t qBindStmtSingleColValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBu
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t buildBoundFields(int32_t numOfBound, int16_t boundColumns, SSchema* pSchema, int32_t* fieldNum, TAOS_FIELD_E** fields, int32_t buildBoundFields(int32_t numOfBound, int16_t* boundColumns, SSchema* pSchema, int32_t* fieldNum, TAOS_FIELD_E** fields,
uint8_t timePrec) { uint8_t timePrec) {
if (fields) { if (fields) {
*fields = taosMemoryCalloc(numOfBound, sizeof(TAOS_FIELD)); *fields = taosMemoryCalloc(numOfBound, sizeof(TAOS_FIELD));
@ -265,7 +265,7 @@ int32_t qBuildStmtColFields(void* pBlock, int32_t* fieldNum, TAOS_FIELD_E** fiel
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
CHECK_CODE(buildBoundFields(&pDataBlock->boundColsInfo.numOfBound, pDataBlock->boundColsInfo.pColIndex, pSchema, fieldNum, fields, CHECK_CODE(buildBoundFields(pDataBlock->boundColsInfo.numOfBound, pDataBlock->boundColsInfo.pColIndex, pSchema, fieldNum, fields,
pDataBlock->pMeta->tableInfo.precision)); pDataBlock->pMeta->tableInfo.precision));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -338,12 +338,12 @@ int32_t qCloneStmtDataBlock(void** pDst, void* pSrc, bool reset) {
insDestroyTableDataCxt(*pDst); insDestroyTableDataCxt(*pDst);
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
pNewCxt->pData = pNewTb;
if (reset) { if (reset) {
code = qResetStmtDataBlock(*pDst, true); code = qResetStmtDataBlock(*pDst, true);
} }
pNewCxt->pData = pNewTb;
} }

View File

@ -1230,14 +1230,7 @@ int32_t insMergeTableDataCxt(SHashObj* pTableHash, SArray** pVgDataBlocks) {
if (colFormat) { if (colFormat) {
taosArraySort(pTableCxt->pData->aCol, insColDataComp); taosArraySort(pTableCxt->pData->aCol, insColDataComp);
int32_t colNum = taosArrayGetSize(pTableCxt->pData->aCol); tColDataSortMerge(pTableCxt->pData->aCol);
for (int32_t i = 0; i < colNum; ++i) {
SColData *pCol = taosArrayGet(pTableCxt->pData->aCol, i);
code = tColDataSortMerge(pCol);
if (code) {
break;
}
}
} else { } else {
if (!pTableCxt->ordered) { if (!pTableCxt->ordered) {
tRowSort(pTableCxt->pData->aRowP); tRowSort(pTableCxt->pData->aRowP);