Merge branch 'refact/submit_req' of github.com:taosdata/TDengine into refact/submit_req
This commit is contained in:
commit
db15b0e1b0
|
@ -85,11 +85,12 @@ int32_t qSetSTableIdForRsma(SNode* pStmt, int64_t uid);
|
|||
void qCleanupKeywordsTable();
|
||||
|
||||
int32_t qBuildStmtOutput(SQuery* pQuery, SHashObj* pVgHash, SHashObj* pBlockHash);
|
||||
int32_t qResetStmtDataBlock(void* block, bool keepBuf);
|
||||
int32_t qCloneStmtDataBlock(void** pDst, void* pSrc, bool reset);
|
||||
int32_t qRebuildStmtDataBlock(void** pDst, void* pSrc, uint64_t uid, int32_t vgId);
|
||||
void qDestroyStmtDataBlock(void* pBlock);
|
||||
STableMeta* qGetTableMetaInDataBlock(void* pDataBlock);
|
||||
int32_t qResetStmtDataBlock(STableDataCxt* block, bool keepBuf);
|
||||
int32_t qCloneStmtDataBlock(STableDataCxt** pDst, STableDataCxt* pSrc, bool reset);
|
||||
int32_t qRebuildStmtDataBlock(STableDataCxt** pDst, STableDataCxt* pSrc, uint64_t uid, int32_t vgId, bool rebuildCreateTb);
|
||||
void qDestroyStmtDataBlock(STableDataCxt* pBlock);
|
||||
STableMeta* qGetTableMetaInDataBlock(STableDataCxt* pDataBlock);
|
||||
int32_t qCloneCurrentTbData(STableDataCxt* pDataBlock, SSubmitTbData **pData);
|
||||
|
||||
int32_t qStmtBindParams(SQuery* pQuery, TAOS_MULTI_BIND* pParams, int32_t colIdx);
|
||||
int32_t qStmtParseQuerySql(SParseContext* pCxt, SQuery* pQuery);
|
||||
|
|
|
@ -163,6 +163,25 @@ typedef struct STargetInfo {
|
|||
int32_t vgId;
|
||||
} STargetInfo;
|
||||
|
||||
|
||||
typedef struct SBoundColInfo {
|
||||
int16_t *pColIndex; // bound index => schema index
|
||||
int32_t numOfCols;
|
||||
int32_t numOfBound;
|
||||
} SBoundColInfo;
|
||||
|
||||
|
||||
typedef struct STableDataCxt {
|
||||
STableMeta *pMeta;
|
||||
STSchema *pSchema;
|
||||
SBoundColInfo boundColsInfo;
|
||||
SArray *pValues;
|
||||
SSubmitTbData *pData;
|
||||
TSKEY lastTs;
|
||||
bool ordered;
|
||||
bool duplicateTs;
|
||||
} STableDataCxt;
|
||||
|
||||
typedef int32_t (*__async_send_cb_fn_t)(void* param, SDataBuf* pMsg, int32_t code);
|
||||
typedef int32_t (*__async_exec_fn_t)(void* param);
|
||||
|
||||
|
@ -238,6 +257,7 @@ int32_t dataConverToStr(char* str, int type, void* buf, int32_t bufSize, int32_t
|
|||
char* parseTagDatatoJson(void* p);
|
||||
int32_t cloneTableMeta(STableMeta* pSrc, STableMeta** pDst);
|
||||
int32_t cloneDbVgInfo(SDBVgInfo* pSrc, SDBVgInfo** pDst);
|
||||
int32_t cloneSVreateTbReq(SVCreateTbReq* pSrc, SVCreateTbReq** pDst);
|
||||
|
||||
extern int32_t (*queryBuildMsg[TDMT_MAX])(void* input, char** msg, int32_t msgSize, int32_t* msgLen,
|
||||
void* (*mallocFp)(int64_t));
|
||||
|
|
|
@ -21,8 +21,6 @@ extern "C" {
|
|||
#endif
|
||||
#include "catalog.h"
|
||||
|
||||
typedef void STableDataCxt;
|
||||
|
||||
typedef enum {
|
||||
STMT_TYPE_INSERT = 1,
|
||||
STMT_TYPE_MULTI_INSERT,
|
||||
|
@ -71,10 +69,11 @@ typedef struct SStmtBindInfo {
|
|||
} SStmtBindInfo;
|
||||
|
||||
typedef struct SStmtExecInfo {
|
||||
int32_t affectedRows;
|
||||
SRequestObj *pRequest;
|
||||
SHashObj *pBlockHash;
|
||||
bool autoCreateTbl;
|
||||
int32_t affectedRows;
|
||||
SRequestObj *pRequest;
|
||||
SHashObj *pBlockHash;
|
||||
STableDataCxt *pCurrBlock;
|
||||
SSubmitTbData *pCurrTbData;
|
||||
} SStmtExecInfo;
|
||||
|
||||
typedef struct SStmtSQLInfo {
|
||||
|
|
|
@ -171,12 +171,11 @@ int32_t stmtUpdateBindInfo(TAOS_STMT* stmt, STableMeta* pTableMeta, void* tags,
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t stmtUpdateExecInfo(TAOS_STMT* stmt, SHashObj* pVgHash, SHashObj* pBlockHash, bool autoCreateTbl) {
|
||||
int32_t stmtUpdateExecInfo(TAOS_STMT* stmt, SHashObj* pVgHash, SHashObj* pBlockHash) {
|
||||
STscStmt* pStmt = (STscStmt*)stmt;
|
||||
|
||||
pStmt->sql.pVgHash = pVgHash;
|
||||
pStmt->exec.pBlockHash = pBlockHash;
|
||||
pStmt->exec.autoCreateTbl = autoCreateTbl;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -186,7 +185,7 @@ int32_t stmtUpdateInfo(TAOS_STMT* stmt, STableMeta* pTableMeta, void* tags, SNam
|
|||
STscStmt* pStmt = (STscStmt*)stmt;
|
||||
|
||||
STMT_ERR_RET(stmtUpdateBindInfo(stmt, pTableMeta, tags, tbName, sTableName, autoCreateTbl));
|
||||
STMT_ERR_RET(stmtUpdateExecInfo(stmt, pVgHash, pBlockHash, autoCreateTbl));
|
||||
STMT_ERR_RET(stmtUpdateExecInfo(stmt, pVgHash, pBlockHash));
|
||||
|
||||
pStmt->sql.autoCreateTbl = autoCreateTbl;
|
||||
|
||||
|
@ -293,14 +292,14 @@ int32_t stmtCleanExecInfo(STscStmt* pStmt, bool keepTable, bool deepClean) {
|
|||
char* key = taosHashGetKey(pIter, &keyLen);
|
||||
STableMeta* pMeta = qGetTableMetaInDataBlock(pBlocks);
|
||||
|
||||
/*
|
||||
if (keepTable && (strlen(pStmt->bInfo.tbFName) == keyLen) && strncmp(pStmt->bInfo.tbFName, key, keyLen) == 0) {
|
||||
if (keepTable && pBlocks == pStmt->exec.pCurrBlock) {
|
||||
ASSERT(NULL == pBlocks->pData);
|
||||
TSWAP(pBlocks->pData, pStmt->exec.pCurrTbData);
|
||||
STMT_ERR_RET(qResetStmtDataBlock(pBlocks, false));
|
||||
|
||||
pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
|
||||
continue;
|
||||
}
|
||||
*/
|
||||
|
||||
qDestroyStmtDataBlock(pBlocks);
|
||||
taosHashRemove(pStmt->exec.pBlockHash, key, keyLen);
|
||||
|
@ -308,12 +307,15 @@ int32_t stmtCleanExecInfo(STscStmt* pStmt, bool keepTable, bool deepClean) {
|
|||
pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
|
||||
}
|
||||
|
||||
pStmt->exec.autoCreateTbl = false;
|
||||
|
||||
if (!keepTable) {
|
||||
taosHashCleanup(pStmt->exec.pBlockHash);
|
||||
pStmt->exec.pBlockHash = NULL;
|
||||
if (keepTable) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
taosHashCleanup(pStmt->exec.pBlockHash);
|
||||
pStmt->exec.pBlockHash = NULL;
|
||||
|
||||
tDestroySSubmitTbData(pStmt->exec.pCurrTbData, TSDB_MSG_FLG_ENCODE);
|
||||
taosMemoryFreeClear(pStmt->exec.pCurrTbData);
|
||||
|
||||
STMT_ERR_RET(stmtCleanBindInfo(pStmt));
|
||||
|
||||
|
@ -362,7 +364,7 @@ int32_t stmtRebuildDataBlock(STscStmt* pStmt, STableDataCxt* pDataBlock, STableD
|
|||
STMT_ERR_RET(
|
||||
taosHashPut(pStmt->sql.pVgHash, (const char*)&vgInfo.vgId, sizeof(vgInfo.vgId), (char*)&vgInfo, sizeof(vgInfo)));
|
||||
|
||||
STMT_ERR_RET(qRebuildStmtDataBlock(newBlock, pDataBlock, uid, vgInfo.vgId));
|
||||
STMT_ERR_RET(qRebuildStmtDataBlock(newBlock, pDataBlock, uid, vgInfo.vgId, pStmt->sql.autoCreateTbl));
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -371,11 +373,13 @@ int32_t stmtGetFromCache(STscStmt* pStmt) {
|
|||
pStmt->bInfo.needParse = true;
|
||||
pStmt->bInfo.inExecCache = false;
|
||||
|
||||
STableDataCxt* pCxtInExec =
|
||||
STableDataCxt** pCxtInExec =
|
||||
taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
|
||||
if (pCxtInExec) {
|
||||
pStmt->bInfo.needParse = false;
|
||||
pStmt->bInfo.inExecCache = true;
|
||||
|
||||
pStmt->exec.pCurrBlock = *pCxtInExec;
|
||||
|
||||
if (pStmt->sql.autoCreateTbl) {
|
||||
tscDebug("reuse stmt block for tb %s in execBlock", pStmt->bInfo.tbFName);
|
||||
|
@ -403,8 +407,6 @@ int32_t stmtGetFromCache(STscStmt* pStmt) {
|
|||
SStmtTableCache* pCache = taosHashGet(pStmt->sql.pTableCache, &pStmt->bInfo.tbSuid, sizeof(pStmt->bInfo.tbSuid));
|
||||
if (pCache) {
|
||||
pStmt->bInfo.needParse = false;
|
||||
pStmt->exec.autoCreateTbl = true;
|
||||
|
||||
pStmt->bInfo.tbUid = 0;
|
||||
|
||||
STableDataCxt* pNewBlock = NULL;
|
||||
|
@ -415,6 +417,8 @@ int32_t stmtGetFromCache(STscStmt* pStmt) {
|
|||
STMT_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
pStmt->exec.pCurrBlock = pNewBlock;
|
||||
|
||||
tscDebug("reuse stmt block for tb %s in sqlBlock, suid:0x%" PRIx64, pStmt->bInfo.tbFName, pStmt->bInfo.tbSuid);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -493,6 +497,8 @@ int32_t stmtGetFromCache(STscStmt* pStmt) {
|
|||
STMT_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
pStmt->exec.pCurrBlock = pNewBlock;
|
||||
|
||||
tscDebug("tb %s in sqlBlock list, set to current", pStmt->bInfo.tbFName);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -590,6 +596,8 @@ int stmtSetTbName(TAOS_STMT* stmt, const char* tbName) {
|
|||
STMT_ERR_RET(stmtGetFromCache(pStmt));
|
||||
|
||||
if (pStmt->bInfo.needParse) {
|
||||
pStmt->exec.pCurrBlock = NULL;
|
||||
|
||||
strncpy(pStmt->bInfo.tbName, tbName, sizeof(pStmt->bInfo.tbName) - 1);
|
||||
pStmt->bInfo.tbName[sizeof(pStmt->bInfo.tbName) - 1] = 0;
|
||||
|
||||
|
@ -622,8 +630,6 @@ int stmtSetTbTags(TAOS_STMT* stmt, TAOS_MULTI_BIND* tags) {
|
|||
pStmt->bInfo.sname.tname, tags, pStmt->exec.pRequest->msgBuf,
|
||||
pStmt->exec.pRequest->msgBufLen));
|
||||
|
||||
pStmt->exec.autoCreateTbl = true;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -725,11 +731,17 @@ int stmtBindBatch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind, int32_t colIdx) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
STableDataCxt** pDataBlock =
|
||||
(STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
|
||||
if (NULL == pDataBlock) {
|
||||
tscError("table %s not found in exec blockHash", pStmt->bInfo.tbFName);
|
||||
STMT_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
|
||||
STableDataCxt** pDataBlock = NULL;
|
||||
|
||||
if (pStmt->exec.pCurrBlock) {
|
||||
pDataBlock = &pStmt->exec.pCurrBlock;
|
||||
} else {
|
||||
pDataBlock= (STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
|
||||
if (NULL == pDataBlock) {
|
||||
tscError("table %s not found in exec blockHash", pStmt->bInfo.tbFName);
|
||||
STMT_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
|
||||
}
|
||||
pStmt->exec.pCurrBlock = *pDataBlock;
|
||||
}
|
||||
|
||||
if (colIdx < 0) {
|
||||
|
@ -857,7 +869,6 @@ int stmtExec(TAOS_STMT* stmt) {
|
|||
STscStmt* pStmt = (STscStmt*)stmt;
|
||||
int32_t code = 0;
|
||||
SSubmitRsp* pRsp = NULL;
|
||||
bool autoCreateTbl = pStmt->exec.autoCreateTbl;
|
||||
|
||||
STMT_DLOG_E("start to exec");
|
||||
|
||||
|
@ -866,8 +877,13 @@ int stmtExec(TAOS_STMT* stmt) {
|
|||
if (STMT_TYPE_QUERY == pStmt->sql.type) {
|
||||
launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, true, NULL);
|
||||
} else {
|
||||
tDestroySSubmitTbData(pStmt->exec.pCurrTbData, TSDB_MSG_FLG_ENCODE);
|
||||
taosMemoryFreeClear(pStmt->exec.pCurrTbData);
|
||||
|
||||
STMT_ERR_RET(qCloneCurrentTbData(pStmt->exec.pCurrBlock, &pStmt->exec.pCurrTbData));
|
||||
|
||||
STMT_ERR_RET(qBuildStmtOutput(pStmt->sql.pQuery, pStmt->sql.pVgHash, pStmt->exec.pBlockHash));
|
||||
launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, true, (autoCreateTbl ? (void**)&pRsp : NULL));
|
||||
launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, true, NULL);
|
||||
}
|
||||
|
||||
if (pStmt->exec.pRequest->code && NEED_CLIENT_HANDLE_ERROR(pStmt->exec.pRequest->code)) {
|
||||
|
@ -890,15 +906,6 @@ _return:
|
|||
|
||||
stmtCleanExecInfo(pStmt, (code ? false : true), false);
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code && autoCreateTbl) {
|
||||
if (NULL == pRsp) {
|
||||
tscError("no submit resp got for auto create table");
|
||||
code = TSDB_CODE_TSC_APP_ERROR;
|
||||
} else {
|
||||
code = stmtUpdateTableUid(pStmt, pRsp);
|
||||
}
|
||||
}
|
||||
|
||||
tFreeSSubmitRsp(pRsp);
|
||||
|
||||
++pStmt->sql.runTimes;
|
||||
|
|
|
@ -6867,11 +6867,16 @@ _exit:
|
|||
}
|
||||
|
||||
void tDestroySSubmitTbData(SSubmitTbData *pTbData, int32_t flag) {
|
||||
if (pTbData->pCreateTbReq) {
|
||||
taosMemoryFree(pTbData->pCreateTbReq);
|
||||
if (NULL == pTbData) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
if (flag == TSDB_MSG_FLG_ENCODE) {
|
||||
if (pTbData->pCreateTbReq) {
|
||||
tdDestroySVCreateTbReq(pTbData->pCreateTbReq);
|
||||
taosMemoryFree(pTbData->pCreateTbReq);
|
||||
}
|
||||
|
||||
if (pTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
|
||||
int32_t nColData = TARRAY_SIZE(pTbData->aCol);
|
||||
SColData *aColData = (SColData *)TARRAY_DATA(pTbData->aCol);
|
||||
|
@ -6890,6 +6895,10 @@ void tDestroySSubmitTbData(SSubmitTbData *pTbData, int32_t flag) {
|
|||
taosArrayDestroy(pTbData->aRowP);
|
||||
}
|
||||
} else if (flag == TSDB_MSG_FLG_DECODE) {
|
||||
if (pTbData->pCreateTbReq) {
|
||||
taosMemoryFree(pTbData->pCreateTbReq);
|
||||
}
|
||||
|
||||
if (pTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
|
||||
taosArrayDestroy(pTbData->aCol);
|
||||
} else {
|
||||
|
|
|
@ -141,23 +141,6 @@ int32_t insCheckTimestamp(STableDataBlocks *pDataBlocks, const char *start);
|
|||
int32_t insBuildOutput(SHashObj *pVgroupsHashObj, SArray *pVgDataBlocks, SArray **pDataBlocks);
|
||||
void insDestroyDataBlock(STableDataBlocks *pDataBlock);
|
||||
|
||||
typedef struct SBoundColInfo {
|
||||
int16_t *pColIndex; // bound index => schema index
|
||||
int32_t numOfCols;
|
||||
int32_t numOfBound;
|
||||
} SBoundColInfo;
|
||||
|
||||
typedef struct STableDataCxt {
|
||||
STableMeta *pMeta;
|
||||
STSchema *pSchema;
|
||||
SBoundColInfo boundColsInfo;
|
||||
SArray *pValues;
|
||||
SSubmitTbData *pData;
|
||||
TSKEY lastTs;
|
||||
bool ordered;
|
||||
bool duplicateTs;
|
||||
} STableDataCxt;
|
||||
|
||||
typedef struct SVgroupDataCxt {
|
||||
int32_t vgId;
|
||||
SSubmitReq2 *pData;
|
||||
|
|
|
@ -296,7 +296,6 @@ SQuery* smlInitHandle() {
|
|||
qDestroyQuery(pQuery);
|
||||
return NULL;
|
||||
}
|
||||
stmt->pVgroupsHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
|
||||
stmt->pTableBlockHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
|
||||
stmt->freeHashFunc = insDestroyTableDataCxtHashMap;
|
||||
stmt->freeArrayFunc = insDestroyVgroupDataCxtList;
|
||||
|
|
|
@ -22,6 +22,36 @@
|
|||
#include "ttime.h"
|
||||
#include "ttypes.h"
|
||||
|
||||
typedef struct SKvParam {
|
||||
int16_t pos;
|
||||
SArray* pTagVals;
|
||||
SSchema* schema;
|
||||
char buf[TSDB_MAX_TAGS_LEN];
|
||||
} SKvParam;
|
||||
|
||||
int32_t qCloneCurrentTbData(STableDataCxt* pDataBlock, SSubmitTbData **pData) {
|
||||
*pData = taosMemoryCalloc(1, sizeof(SSubmitTbData));
|
||||
if (NULL == *pData) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
SSubmitTbData *pNew = *pData;
|
||||
|
||||
*pNew = *pDataBlock->pData;
|
||||
|
||||
cloneSVreateTbReq(pDataBlock->pData->pCreateTbReq, &pNew->pCreateTbReq);
|
||||
pNew->aCol = taosArrayDup(pDataBlock->pData->aCol, NULL);
|
||||
|
||||
int32_t colNum = taosArrayGetSize(pNew->aCol);
|
||||
for (int32_t i = 0; i < colNum; ++i) {
|
||||
SColData *pCol = (SColData*)taosArrayGet(pNew->aCol, i);
|
||||
tColDataDeepClear(pCol);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int32_t qBuildStmtOutput(SQuery* pQuery, SHashObj* pVgHash, SHashObj* pBlockHash) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SArray* pVgDataBlocks = NULL;
|
||||
|
@ -134,6 +164,14 @@ int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, const ch
|
|||
goto end;
|
||||
}
|
||||
|
||||
if (NULL == pDataBlock->pData->pCreateTbReq) {
|
||||
pDataBlock->pData->pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
|
||||
if (NULL == pDataBlock->pData->pCreateTbReq) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto end;
|
||||
}
|
||||
}
|
||||
|
||||
insBuildCreateTbReq(pDataBlock->pData->pCreateTbReq, tName, pTag, suid, sTableName, tagName, pDataBlock->pMeta->tableInfo.numOfTags, TSDB_DEFAULT_TABLE_TTL);
|
||||
|
||||
end:
|
||||
|
@ -285,7 +323,7 @@ _return:
|
|||
int32_t buildBoundFields(int32_t numOfBound, int16_t* boundColumns, SSchema* pSchema, int32_t* fieldNum, TAOS_FIELD_E** fields,
|
||||
uint8_t timePrec) {
|
||||
if (fields) {
|
||||
*fields = taosMemoryCalloc(numOfBound, sizeof(TAOS_FIELD));
|
||||
*fields = taosMemoryCalloc(numOfBound, sizeof(TAOS_FIELD_E));
|
||||
if (NULL == *fields) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
@ -350,7 +388,7 @@ int32_t qBuildStmtColFields(void* pBlock, int32_t* fieldNum, TAOS_FIELD_E** fiel
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qResetStmtDataBlock(void* block, bool deepClear) {
|
||||
int32_t qResetStmtDataBlock(STableDataCxt* block, bool deepClear) {
|
||||
STableDataCxt* pBlock = (STableDataCxt*)block;
|
||||
int32_t colNum = taosArrayGetSize(pBlock->pData->aCol);
|
||||
|
||||
|
@ -366,7 +404,7 @@ int32_t qResetStmtDataBlock(void* block, bool deepClear) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qCloneStmtDataBlock(void** pDst, void* pSrc, bool reset) {
|
||||
int32_t qCloneStmtDataBlock(STableDataCxt** pDst, STableDataCxt* pSrc, bool reset) {
|
||||
int32_t code = 0;
|
||||
|
||||
*pDst = taosMemoryCalloc(1, sizeof(STableDataCxt));
|
||||
|
@ -429,7 +467,7 @@ int32_t qCloneStmtDataBlock(void** pDst, void* pSrc, bool reset) {
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t qRebuildStmtDataBlock(void** pDst, void* pSrc, uint64_t uid, int32_t vgId) {
|
||||
int32_t qRebuildStmtDataBlock(STableDataCxt** pDst, STableDataCxt* pSrc, uint64_t uid, int32_t vgId, bool rebuildCreateTb) {
|
||||
int32_t code = qCloneStmtDataBlock(pDst, pSrc, false);
|
||||
if (code) {
|
||||
return code;
|
||||
|
@ -441,12 +479,19 @@ int32_t qRebuildStmtDataBlock(void** pDst, void* pSrc, uint64_t uid, int32_t vgI
|
|||
pBlock->pMeta->vgId = vgId;
|
||||
}
|
||||
|
||||
if (rebuildCreateTb && NULL == pBlock->pData->pCreateTbReq) {
|
||||
pBlock->pData->pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
|
||||
if (NULL == pBlock->pData->pCreateTbReq) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
STableMeta* qGetTableMetaInDataBlock(void* pDataBlock) { return ((STableDataCxt*)pDataBlock)->pMeta; }
|
||||
STableMeta* qGetTableMetaInDataBlock(STableDataCxt* pDataBlock) { return ((STableDataCxt*)pDataBlock)->pMeta; }
|
||||
|
||||
void qDestroyStmtDataBlock(void* pBlock) {
|
||||
void qDestroyStmtDataBlock(STableDataCxt* pBlock) {
|
||||
if (pBlock == NULL) {
|
||||
return;
|
||||
}
|
||||
|
|
|
@ -1124,6 +1124,7 @@ void insDestroyVgroupDataCxt(SVgroupDataCxt* pVgCxt) {
|
|||
}
|
||||
|
||||
tDestroySSubmitReq2(pVgCxt->pData, TSDB_MSG_FLG_ENCODE);
|
||||
taosMemoryFree(pVgCxt->pData);
|
||||
taosMemoryFree(pVgCxt);
|
||||
}
|
||||
|
||||
|
@ -1240,6 +1241,16 @@ int32_t insMergeTableDataCxt(SHashObj* pTableHash, SArray** pVgDataBlocks) {
|
|||
while (TSDB_CODE_SUCCESS == code && NULL != p) {
|
||||
STableDataCxt* pTableCxt = *(STableDataCxt**)p;
|
||||
if (colFormat) {
|
||||
SColData *pCol = taosArrayGet(pTableCxt->pData->aCol, 0);
|
||||
if (pCol->nVal <= 0) {
|
||||
p = taosHashIterate(pTableHash, p);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (pTableCxt->pData->pCreateTbReq) {
|
||||
pTableCxt->pData->flags |= SUBMIT_REQ_AUTO_CREATE_TABLE;
|
||||
}
|
||||
|
||||
taosArraySort(pTableCxt->pData->aCol, insColDataComp);
|
||||
|
||||
tColDataSortMerge(pTableCxt->pData->aCol);
|
||||
|
@ -1274,7 +1285,7 @@ int32_t insMergeTableDataCxt(SHashObj* pTableHash, SArray** pVgDataBlocks) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
*pVgDataBlocks = pVgroupList;
|
||||
} else {
|
||||
taosArrayDestroy(pVgroupList);
|
||||
insDestroyVgroupDataCxtList(pVgroupList);
|
||||
}
|
||||
|
||||
return code;
|
||||
|
|
|
@ -244,6 +244,7 @@ void destroyQueryExecRes(SExecResult* pRes) {
|
|||
}
|
||||
case TDMT_VND_SUBMIT: {
|
||||
tDestroySSubmitRsp2((SSubmitRsp2*)pRes->res, TSDB_MSG_FLG_DECODE);
|
||||
taosMemoryFreeClear(pRes->res);
|
||||
break;
|
||||
}
|
||||
case TDMT_SCH_QUERY:
|
||||
|
@ -486,3 +487,55 @@ int32_t cloneDbVgInfo(SDBVgInfo* pSrc, SDBVgInfo** pDst) {
|
|||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t cloneSVreateTbReq(SVCreateTbReq* pSrc, SVCreateTbReq** pDst) {
|
||||
if (NULL == pSrc) {
|
||||
*pDst = NULL;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
*pDst = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
|
||||
if (NULL == *pDst) {
|
||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
(*pDst)->flags = pSrc->flags;
|
||||
if (pSrc->name) {
|
||||
(*pDst)->name = strdup(pSrc->name);
|
||||
}
|
||||
(*pDst)->uid = pSrc->uid;
|
||||
(*pDst)->ctime = pSrc->ctime;
|
||||
(*pDst)->ttl = pSrc->ttl;
|
||||
(*pDst)->commentLen = pSrc->commentLen;
|
||||
if (pSrc->comment) {
|
||||
(*pDst)->comment = strdup(pSrc->comment);
|
||||
}
|
||||
(*pDst)->type = pSrc->type;
|
||||
|
||||
if (pSrc->type == TSDB_CHILD_TABLE) {
|
||||
if (pSrc->ctb.stbName) {
|
||||
(*pDst)->ctb.stbName = strdup(pSrc->ctb.stbName);
|
||||
}
|
||||
(*pDst)->ctb.tagNum = pSrc->ctb.tagNum;
|
||||
(*pDst)->ctb.suid = pSrc->ctb.suid;
|
||||
if (pSrc->ctb.tagName) {
|
||||
(*pDst)->ctb.tagName = taosArrayDup(pSrc->ctb.tagName, NULL);
|
||||
}
|
||||
STag* pTag = (STag *)pSrc->ctb.pTag;
|
||||
if (pTag) {
|
||||
(*pDst)->ctb.pTag = taosMemoryMalloc(pTag->len);
|
||||
memcpy((*pDst)->ctb.pTag, pTag, pTag->len);
|
||||
}
|
||||
} else {
|
||||
(*pDst)->ntb.schemaRow.nCols = pSrc->ntb.schemaRow.nCols;
|
||||
(*pDst)->ntb.schemaRow.version = pSrc->ntb.schemaRow.nCols;
|
||||
if (pSrc->ntb.schemaRow.nCols > 0 && pSrc->ntb.schemaRow.pSchema) {
|
||||
(*pDst)->ntb.schemaRow.pSchema = taosMemoryMalloc(pSrc->ntb.schemaRow.nCols * sizeof(SSchema));
|
||||
memcpy((*pDst)->ntb.schemaRow.pSchema, pSrc->ntb.schemaRow.pSchema, pSrc->ntb.schemaRow.nCols * sizeof(SSchema));
|
||||
}
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -262,9 +262,11 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa
|
|||
SSubmitRsp2 *rsp = taosMemoryMalloc(sizeof(*rsp));
|
||||
tDecoderInit(&coder, msg, msgSize);
|
||||
code = tDecodeSSubmitRsp2(&coder, rsp);
|
||||
tDecoderClear(&coder);
|
||||
if (code) {
|
||||
SCH_TASK_ELOG("tDecodeSSubmitRsp2 failed, code:%d", code);
|
||||
tDestroySSubmitRsp2(rsp, TSDB_MSG_FLG_DECODE);
|
||||
taosMemoryFree(rsp);
|
||||
SCH_ERR_JRET(code);
|
||||
}
|
||||
|
||||
|
@ -281,11 +283,10 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa
|
|||
if (sum->aCreateTbRsp) {
|
||||
taosArrayAddAll(sum->aCreateTbRsp, rsp->aCreateTbRsp);
|
||||
taosArrayDestroy(rsp->aCreateTbRsp);
|
||||
taosMemoryFree(rsp);
|
||||
} else {
|
||||
TSWAP(sum->aCreateTbRsp, rsp->aCreateTbRsp);
|
||||
taosMemoryFree(rsp);
|
||||
}
|
||||
taosMemoryFree(rsp);
|
||||
} else {
|
||||
pJob->execRes.res = rsp;
|
||||
pJob->execRes.msgType = TDMT_VND_SUBMIT;
|
||||
|
@ -301,6 +302,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa
|
|||
}
|
||||
SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
|
||||
tDestroySSubmitRsp2(rsp, TSDB_MSG_FLG_DECODE);
|
||||
taosMemoryFree(rsp);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1120,7 +1120,7 @@ int32_t WCSPatternMatch(const TdUcs4 *patterStr, const TdUcs4 *str, size_t size,
|
|||
return TSDB_PATTERN_NOMATCH;
|
||||
}
|
||||
|
||||
return (str[j] == 0 || j >= size) ? TSDB_PATTERN_MATCH : TSDB_PATTERN_NOMATCH;
|
||||
return (j >= size || str[j] == 0) ? TSDB_PATTERN_MATCH : TSDB_PATTERN_NOMATCH;
|
||||
}
|
||||
|
||||
int32_t compareStrRegexCompMatch(const void *pLeft, const void *pRight) { return compareStrRegexComp(pLeft, pRight); }
|
||||
|
|
|
@ -219,7 +219,7 @@ typedef struct {
|
|||
int32_t caseRunNum; // total run case num
|
||||
} CaseCtrl;
|
||||
|
||||
#if 1
|
||||
#if 0
|
||||
CaseCtrl gCaseCtrl = {
|
||||
.precision = TIME_PRECISION_MICRO,
|
||||
.bindNullNum = 0,
|
||||
|
@ -244,7 +244,7 @@ CaseCtrl gCaseCtrl = {
|
|||
.funcIdxList = NULL,
|
||||
.checkParamNum = false,
|
||||
.runTimes = 0,
|
||||
.caseIdx = 0,
|
||||
.caseIdx = 22,
|
||||
.caseNum = 1,
|
||||
.caseRunIdx = -1,
|
||||
.caseRunNum = -1,
|
||||
|
@ -252,7 +252,7 @@ CaseCtrl gCaseCtrl = {
|
|||
#endif
|
||||
|
||||
|
||||
#if 0
|
||||
#if 1
|
||||
CaseCtrl gCaseCtrl = { // default
|
||||
.precision = TIME_PRECISION_MILLI,
|
||||
.bindNullNum = 0,
|
||||
|
@ -2749,7 +2749,6 @@ void runAll(TAOS *taos) {
|
|||
printf("%s Begin\n", gCaseCtrl.caseCatalog);
|
||||
runCaseList(taos);
|
||||
|
||||
#if 0
|
||||
strcpy(gCaseCtrl.caseCatalog, "Micro DB precision Test");
|
||||
printf("%s Begin\n", gCaseCtrl.caseCatalog);
|
||||
gCaseCtrl.precision = TIME_PRECISION_MICRO;
|
||||
|
@ -2805,7 +2804,6 @@ void runAll(TAOS *taos) {
|
|||
gCaseCtrl.bindColNum = 6;
|
||||
runCaseList(taos);
|
||||
gCaseCtrl.bindColNum = 0;
|
||||
#endif
|
||||
|
||||
/*
|
||||
strcpy(gCaseCtrl.caseCatalog, "Bind Col Type Test");
|
||||
|
|
Loading…
Reference in New Issue