stmt auto create table
This commit is contained in:
parent
97f64a58ee
commit
66bcbbba47
|
@ -89,7 +89,7 @@ extern char *qtypeStr[];
|
||||||
|
|
||||||
#define TSDB_PORT_HTTP 11
|
#define TSDB_PORT_HTTP 11
|
||||||
|
|
||||||
#undef TD_DEBUG_PRINT_ROW
|
#define TD_DEBUG_PRINT_ROW
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -254,10 +254,7 @@ int32_t tPrintFixedSchemaSubmitReq(const SSubmitReq* pReq, STSchema* pSchema);
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int8_t hashMeta;
|
int8_t hashMeta;
|
||||||
int64_t uid;
|
int64_t uid;
|
||||||
union {
|
char* tblFName;
|
||||||
char* ename; // used for encode
|
|
||||||
const char* dname; // used for decode
|
|
||||||
};
|
|
||||||
int32_t numOfRows;
|
int32_t numOfRows;
|
||||||
int32_t affectedRows;
|
int32_t affectedRows;
|
||||||
} SSubmitBlkRsp;
|
} SSubmitBlkRsp;
|
||||||
|
@ -274,6 +271,7 @@ typedef struct {
|
||||||
|
|
||||||
int32_t tEncodeSSubmitRsp(SEncoder* pEncoder, const SSubmitRsp* pRsp);
|
int32_t tEncodeSSubmitRsp(SEncoder* pEncoder, const SSubmitRsp* pRsp);
|
||||||
int32_t tDecodeSSubmitRsp(SDecoder* pDecoder, SSubmitRsp* pRsp);
|
int32_t tDecodeSSubmitRsp(SDecoder* pDecoder, SSubmitRsp* pRsp);
|
||||||
|
void tFreeSSubmitRsp(SSubmitRsp *pRsp);
|
||||||
|
|
||||||
#define SCHEMA_SMA_ON 0x1
|
#define SCHEMA_SMA_ON 0x1
|
||||||
#define SCHEMA_IDX_ON 0x2
|
#define SCHEMA_IDX_ON 0x2
|
||||||
|
|
|
@ -56,6 +56,7 @@ typedef struct SQueryResult {
|
||||||
uint64_t numOfRows;
|
uint64_t numOfRows;
|
||||||
int32_t msgSize;
|
int32_t msgSize;
|
||||||
char *msg;
|
char *msg;
|
||||||
|
void *res;
|
||||||
} SQueryResult;
|
} SQueryResult;
|
||||||
|
|
||||||
typedef struct STaskInfo {
|
typedef struct STaskInfo {
|
||||||
|
@ -71,7 +72,7 @@ int32_t schedulerInit(SSchedulerCfg *cfg);
|
||||||
* @param nodeList Qnode/Vnode address list, element is SQueryNodeAddr
|
* @param nodeList Qnode/Vnode address list, element is SQueryNodeAddr
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql, int64_t startTs, SQueryResult *pRes);
|
int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql, int64_t startTs, bool needRes, SQueryResult *pRes);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Process the query job, generated according to the query physical plan.
|
* Process the query job, generated according to the query physical plan.
|
||||||
|
|
|
@ -307,9 +307,9 @@ int hbAddConnInfo(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, void* key, void* v
|
||||||
// --- mq
|
// --- mq
|
||||||
void hbMgrInitMqHbRspHandle();
|
void hbMgrInitMqHbRspHandle();
|
||||||
|
|
||||||
SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, int32_t code, bool keepQuery);
|
SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, int32_t code, bool keepQuery, void** res);
|
||||||
int32_t getQueryPlan(SRequestObj* pRequest, SQuery* pQuery, SArray** pNodeList);
|
int32_t getQueryPlan(SRequestObj* pRequest, SQuery* pQuery, SArray** pNodeList);
|
||||||
int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList);
|
int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList, void** res);
|
||||||
int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest);
|
int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
|
|
|
@ -55,6 +55,7 @@ typedef struct SStmtQueryResInfo {
|
||||||
|
|
||||||
typedef struct SStmtBindInfo {
|
typedef struct SStmtBindInfo {
|
||||||
bool needParse;
|
bool needParse;
|
||||||
|
bool inExecCache;
|
||||||
uint64_t tbUid;
|
uint64_t tbUid;
|
||||||
uint64_t tbSuid;
|
uint64_t tbSuid;
|
||||||
int32_t sBindRowNum;
|
int32_t sBindRowNum;
|
||||||
|
@ -64,6 +65,7 @@ typedef struct SStmtBindInfo {
|
||||||
void* boundTags;
|
void* boundTags;
|
||||||
char tbName[TSDB_TABLE_FNAME_LEN];;
|
char tbName[TSDB_TABLE_FNAME_LEN];;
|
||||||
char tbFName[TSDB_TABLE_FNAME_LEN];
|
char tbFName[TSDB_TABLE_FNAME_LEN];
|
||||||
|
char stbFName[TSDB_TABLE_FNAME_LEN];
|
||||||
SName sname;
|
SName sname;
|
||||||
} SStmtBindInfo;
|
} SStmtBindInfo;
|
||||||
|
|
||||||
|
@ -72,12 +74,12 @@ typedef struct SStmtExecInfo {
|
||||||
SRequestObj* pRequest;
|
SRequestObj* pRequest;
|
||||||
SHashObj* pVgHash;
|
SHashObj* pVgHash;
|
||||||
SHashObj* pBlockHash;
|
SHashObj* pBlockHash;
|
||||||
|
bool autoCreateTbl;
|
||||||
} SStmtExecInfo;
|
} SStmtExecInfo;
|
||||||
|
|
||||||
typedef struct SStmtSQLInfo {
|
typedef struct SStmtSQLInfo {
|
||||||
STMT_TYPE type;
|
STMT_TYPE type;
|
||||||
STMT_STATUS status;
|
STMT_STATUS status;
|
||||||
bool autoCreateTbl;
|
|
||||||
uint64_t runTimes;
|
uint64_t runTimes;
|
||||||
SHashObj* pTableCache; //SHash<SStmtTableCache>
|
SHashObj* pTableCache; //SHash<SStmtTableCache>
|
||||||
SQuery* pQuery;
|
SQuery* pQuery;
|
||||||
|
@ -86,6 +88,7 @@ typedef struct SStmtSQLInfo {
|
||||||
SArray* nodeList;
|
SArray* nodeList;
|
||||||
SQueryPlan* pQueryPlan;
|
SQueryPlan* pQueryPlan;
|
||||||
SStmtQueryResInfo queryRes;
|
SStmtQueryResInfo queryRes;
|
||||||
|
bool autoCreateTbl;
|
||||||
} SStmtSQLInfo;
|
} SStmtSQLInfo;
|
||||||
|
|
||||||
typedef struct STscStmt {
|
typedef struct STscStmt {
|
||||||
|
|
|
@ -286,12 +286,12 @@ void setResPrecision(SReqResultInfo* pResInfo, int32_t precision) {
|
||||||
pResInfo->precision = precision;
|
pResInfo->precision = precision;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList) {
|
int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList, void** pRes) {
|
||||||
void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter;
|
void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter;
|
||||||
|
|
||||||
SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf};
|
SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf};
|
||||||
int32_t code = schedulerExecJob(pTransporter, pNodeList, pDag, &pRequest->body.queryJob, pRequest->sqlstr,
|
int32_t code = schedulerExecJob(pTransporter, pNodeList, pDag, &pRequest->body.queryJob, pRequest->sqlstr,
|
||||||
pRequest->metric.start, &res);
|
pRequest->metric.start, NULL != pRes, &res);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
if (pRequest->body.queryJob != 0) {
|
if (pRequest->body.queryJob != 0) {
|
||||||
schedulerFreeJob(pRequest->body.queryJob);
|
schedulerFreeJob(pRequest->body.queryJob);
|
||||||
|
@ -310,6 +310,10 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pRes) {
|
||||||
|
*pRes = res.res;
|
||||||
|
}
|
||||||
|
|
||||||
pRequest->code = res.code;
|
pRequest->code = res.code;
|
||||||
terrno = res.code;
|
terrno = res.code;
|
||||||
return pRequest->code;
|
return pRequest->code;
|
||||||
|
@ -320,7 +324,7 @@ int32_t getQueryPlan(SRequestObj* pRequest, SQuery* pQuery, SArray** pNodeList)
|
||||||
return getPlan(pRequest, pQuery, &pRequest->body.pDag, *pNodeList);
|
return getPlan(pRequest, pQuery, &pRequest->body.pDag, *pNodeList);
|
||||||
}
|
}
|
||||||
|
|
||||||
SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, int32_t code, bool keepQuery) {
|
SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, int32_t code, bool keepQuery, void** res) {
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
switch (pQuery->execMode) {
|
switch (pQuery->execMode) {
|
||||||
case QUERY_EXEC_MODE_LOCAL:
|
case QUERY_EXEC_MODE_LOCAL:
|
||||||
|
@ -333,7 +337,7 @@ SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, int32_t code
|
||||||
SArray* pNodeList = taosArrayInit(4, sizeof(struct SQueryNodeAddr));
|
SArray* pNodeList = taosArrayInit(4, sizeof(struct SQueryNodeAddr));
|
||||||
code = getPlan(pRequest, pQuery, &pRequest->body.pDag, pNodeList);
|
code = getPlan(pRequest, pQuery, &pRequest->body.pDag, pNodeList);
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = scheduleQuery(pRequest, pRequest->body.pDag, pNodeList);
|
code = scheduleQuery(pRequest, pRequest->body.pDag, pNodeList, res);
|
||||||
}
|
}
|
||||||
taosArrayDestroy(pNodeList);
|
taosArrayDestroy(pNodeList);
|
||||||
break;
|
break;
|
||||||
|
@ -373,7 +377,7 @@ SRequestObj* launchQuery(STscObj* pTscObj, const char* sql, int sqlLen) {
|
||||||
return pRequest;
|
return pRequest;
|
||||||
}
|
}
|
||||||
|
|
||||||
return launchQueryImpl(pRequest, pQuery, code, false);
|
return launchQueryImpl(pRequest, pQuery, code, false, NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest) {
|
int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest) {
|
||||||
|
|
|
@ -1668,7 +1668,7 @@ static int32_t smlInsertData(SSmlHandle* info) {
|
||||||
smlBuildOutput(info->exec, info->pVgHash);
|
smlBuildOutput(info->exec, info->pVgHash);
|
||||||
info->cost.insertRpcTime = taosGetTimestampUs();
|
info->cost.insertRpcTime = taosGetTimestampUs();
|
||||||
|
|
||||||
launchQueryImpl(info->pRequest, info->pQuery, TSDB_CODE_SUCCESS, true);
|
launchQueryImpl(info->pRequest, info->pQuery, TSDB_CODE_SUCCESS, true, NULL);
|
||||||
|
|
||||||
info->affectedRows = taos_affected_rows(info->pRequest);
|
info->affectedRows = taos_affected_rows(info->pRequest);
|
||||||
return info->pRequest->code;
|
return info->pRequest->code;
|
||||||
|
|
|
@ -136,11 +136,12 @@ int32_t stmtUpdateBindInfo(TAOS_STMT* stmt, STableMeta* pTableMeta, void* tags,
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t stmtUpdateExecInfo(TAOS_STMT* stmt, SHashObj* pVgHash, SHashObj* pBlockHash) {
|
int32_t stmtUpdateExecInfo(TAOS_STMT* stmt, SHashObj* pVgHash, SHashObj* pBlockHash, bool autoCreateTbl) {
|
||||||
STscStmt* pStmt = (STscStmt*)stmt;
|
STscStmt* pStmt = (STscStmt*)stmt;
|
||||||
|
|
||||||
pStmt->exec.pVgHash = pVgHash;
|
pStmt->exec.pVgHash = pVgHash;
|
||||||
pStmt->exec.pBlockHash = pBlockHash;
|
pStmt->exec.pBlockHash = pBlockHash;
|
||||||
|
pStmt->exec.autoCreateTbl = autoCreateTbl;
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -149,10 +150,10 @@ int32_t stmtUpdateInfo(TAOS_STMT* stmt, STableMeta* pTableMeta, void* tags, char
|
||||||
STscStmt* pStmt = (STscStmt*)stmt;
|
STscStmt* pStmt = (STscStmt*)stmt;
|
||||||
|
|
||||||
STMT_ERR_RET(stmtUpdateBindInfo(stmt, pTableMeta, tags, tbFName));
|
STMT_ERR_RET(stmtUpdateBindInfo(stmt, pTableMeta, tags, tbFName));
|
||||||
STMT_ERR_RET(stmtUpdateExecInfo(stmt, pVgHash, pBlockHash));
|
STMT_ERR_RET(stmtUpdateExecInfo(stmt, pVgHash, pBlockHash, autoCreateTbl));
|
||||||
|
|
||||||
pStmt->sql.autoCreateTbl = autoCreateTbl;
|
|
||||||
|
|
||||||
|
pStmt->sql.autoCreateTbl = autoCreateTbl;
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -192,8 +193,12 @@ int32_t stmtCacheBlock(STscStmt *pStmt) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
|
||||||
pStmt->bInfo.boundTags = NULL;
|
if (pStmt->sql.autoCreateTbl) {
|
||||||
|
pStmt->bInfo.tagsCached = true;
|
||||||
|
} else {
|
||||||
|
pStmt->bInfo.boundTags = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -235,6 +240,7 @@ int32_t stmtCleanBindInfo(STscStmt* pStmt) {
|
||||||
pStmt->bInfo.tbSuid = 0;
|
pStmt->bInfo.tbSuid = 0;
|
||||||
pStmt->bInfo.tbType = 0;
|
pStmt->bInfo.tbType = 0;
|
||||||
pStmt->bInfo.needParse = true;
|
pStmt->bInfo.needParse = true;
|
||||||
|
pStmt->bInfo.inExecCache = false;
|
||||||
|
|
||||||
pStmt->bInfo.tbName[0] = 0;
|
pStmt->bInfo.tbName[0] = 0;
|
||||||
pStmt->bInfo.tbFName[0] = 0;
|
pStmt->bInfo.tbFName[0] = 0;
|
||||||
|
@ -259,7 +265,7 @@ int32_t stmtCleanExecInfo(STscStmt* pStmt, bool keepTable, bool freeRequest) {
|
||||||
char *key = taosHashGetKey(pIter, &keyLen);
|
char *key = taosHashGetKey(pIter, &keyLen);
|
||||||
STableMeta* pMeta = qGetTableMetaInDataBlock(pBlocks);
|
STableMeta* pMeta = qGetTableMetaInDataBlock(pBlocks);
|
||||||
|
|
||||||
if (keepTable && (pMeta->uid == pStmt->bInfo.tbUid)) {
|
if (keepTable && (strlen(pStmt->bInfo.tbFName) == keyLen) && strncmp(pStmt->bInfo.tbFName, key, keyLen) == 0) {
|
||||||
STMT_ERR_RET(qResetStmtDataBlock(pBlocks, true));
|
STMT_ERR_RET(qResetStmtDataBlock(pBlocks, true));
|
||||||
|
|
||||||
pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
|
pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
|
||||||
|
@ -272,6 +278,8 @@ int32_t stmtCleanExecInfo(STscStmt* pStmt, bool keepTable, bool freeRequest) {
|
||||||
pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
|
pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pStmt->exec.autoCreateTbl = false;
|
||||||
|
|
||||||
if (keepTable) {
|
if (keepTable) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -315,11 +323,45 @@ int32_t stmtCleanSQLInfo(STscStmt* pStmt) {
|
||||||
|
|
||||||
int32_t stmtGetFromCache(STscStmt* pStmt) {
|
int32_t stmtGetFromCache(STscStmt* pStmt) {
|
||||||
pStmt->bInfo.needParse = true;
|
pStmt->bInfo.needParse = true;
|
||||||
|
pStmt->bInfo.inExecCache = false;
|
||||||
|
|
||||||
|
STableDataBlocks *pBlockInExec = taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
|
||||||
|
if (pBlockInExec) {
|
||||||
|
pStmt->bInfo.inExecCache = true;
|
||||||
|
|
||||||
|
if (pStmt->sql.autoCreateTbl) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (NULL == pStmt->sql.pTableCache || taosHashGetSize(pStmt->sql.pTableCache) <= 0) {
|
if (NULL == pStmt->sql.pTableCache || taosHashGetSize(pStmt->sql.pTableCache) <= 0) {
|
||||||
|
if (pStmt->bInfo.inExecCache) {
|
||||||
|
ASSERT(taosHashGetSize(pStmt->exec.pBlockHash) == 1);
|
||||||
|
pStmt->bInfo.needParse = false;
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pStmt->sql.autoCreateTbl) {
|
||||||
|
SStmtTableCache* pCache = taosHashGet(pStmt->sql.pTableCache, &pStmt->bInfo.tbSuid, sizeof(pStmt->bInfo.tbSuid));
|
||||||
|
if (pCache) {
|
||||||
|
pStmt->bInfo.needParse = false;
|
||||||
|
|
||||||
|
STableDataBlocks* pNewBlock = NULL;
|
||||||
|
STMT_ERR_RET(qRebuildStmtDataBlock(&pNewBlock, pCache->pDataBlock, 0));
|
||||||
|
|
||||||
|
if (taosHashPut(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName), &pNewBlock, POINTER_BYTES)) {
|
||||||
|
STMT_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||||
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
STMT_RET(stmtCleanBindInfo(pStmt));
|
||||||
|
}
|
||||||
|
|
||||||
if (NULL == pStmt->pCatalog) {
|
if (NULL == pStmt->pCatalog) {
|
||||||
STMT_ERR_RET(catalogGetHandle(pStmt->taos->pAppInfo->clusterId, &pStmt->pCatalog));
|
STMT_ERR_RET(catalogGetHandle(pStmt->taos->pAppInfo->clusterId, &pStmt->pCatalog));
|
||||||
}
|
}
|
||||||
|
@ -347,7 +389,7 @@ int32_t stmtGetFromCache(STscStmt* pStmt) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName))) {
|
if (pStmt->bInfo.inExecCache) {
|
||||||
SStmtTableCache* pCache = taosHashGet(pStmt->sql.pTableCache, &cacheUid, sizeof(cacheUid));
|
SStmtTableCache* pCache = taosHashGet(pStmt->sql.pTableCache, &cacheUid, sizeof(cacheUid));
|
||||||
if (NULL == pCache) {
|
if (NULL == pCache) {
|
||||||
tscError("table [%s, %" PRIx64 ", %" PRIx64 "] found in exec blockHash, but not in sql blockHash", pStmt->bInfo.tbFName, uid, cacheUid);
|
tscError("table [%s, %" PRIx64 ", %" PRIx64 "] found in exec blockHash, but not in sql blockHash", pStmt->bInfo.tbFName, uid, cacheUid);
|
||||||
|
@ -484,11 +526,13 @@ int stmtSetTbTags(TAOS_STMT *stmt, TAOS_MULTI_BIND *tags) {
|
||||||
|
|
||||||
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_SETTAGS));
|
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_SETTAGS));
|
||||||
|
|
||||||
if (!pStmt->bInfo.needParse) { // table already cached, no need create table
|
if (pStmt->bInfo.needParse) {
|
||||||
return TSDB_CODE_SUCCESS;
|
STMT_ERR_RET(stmtParseSql(pStmt));
|
||||||
}
|
}
|
||||||
|
|
||||||
STMT_ERR_RET(stmtParseSql(pStmt));
|
if (pStmt->bInfo.inExecCache) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
STableDataBlocks **pDataBlock = (STableDataBlocks**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
|
STableDataBlocks **pDataBlock = (STableDataBlocks**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
|
||||||
if (NULL == pDataBlock) {
|
if (NULL == pDataBlock) {
|
||||||
|
@ -612,17 +656,73 @@ int stmtAddBatch(TAOS_STMT *stmt) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int stmtUpdateTableUid(STscStmt* pStmt, SSubmitRsp *pRsp) {
|
||||||
|
if (pRsp->nBlocks <= 0) {
|
||||||
|
tscError("invalid submit resp block number %d", pRsp->nBlocks);
|
||||||
|
STMT_ERR_RET(TSDB_CODE_TSC_APP_ERROR);
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t keyLen = 0;
|
||||||
|
STableDataBlocks **pIter = taosHashIterate(pStmt->exec.pBlockHash, NULL);
|
||||||
|
while (pIter) {
|
||||||
|
STableDataBlocks *pBlock = *pIter;
|
||||||
|
char *key = taosHashGetKey(pIter, &keyLen);
|
||||||
|
|
||||||
|
STableMeta *pMeta = qGetTableMetaInDataBlock(pBlock);
|
||||||
|
if (pMeta->uid != pStmt->bInfo.tbUid) {
|
||||||
|
tscError("table uid %" PRIx64 " mis-match with current table uid %" PRIx64, pMeta->uid, pStmt->bInfo.tbUid);
|
||||||
|
STMT_ERR_RET(TSDB_CODE_TSC_APP_ERROR);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pMeta->uid) {
|
||||||
|
pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
SSubmitBlkRsp *blkRsp = NULL;
|
||||||
|
int32_t i = 0;
|
||||||
|
for (; i < pRsp->nBlocks; ++i) {
|
||||||
|
blkRsp = pRsp->pBlocks + i;
|
||||||
|
if (strlen(blkRsp->tblFName) != keyLen) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (strncmp(blkRsp->tblFName, key, keyLen)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (i < pRsp->nBlocks) {
|
||||||
|
tscDebug("auto created table %s uid updated from %" PRIx64 " to %" PRIx64, blkRsp->tblFName, pMeta->uid, blkRsp->uid);
|
||||||
|
|
||||||
|
pMeta->uid = blkRsp->uid;
|
||||||
|
pStmt->bInfo.tbUid = blkRsp->uid;
|
||||||
|
} else {
|
||||||
|
tscError("table %s not found in submit rsp", pStmt->bInfo.tbFName);
|
||||||
|
STMT_ERR_RET(TSDB_CODE_TSC_APP_ERROR);
|
||||||
|
}
|
||||||
|
|
||||||
|
pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
|
||||||
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
int stmtExec(TAOS_STMT *stmt) {
|
int stmtExec(TAOS_STMT *stmt) {
|
||||||
STscStmt* pStmt = (STscStmt*)stmt;
|
STscStmt* pStmt = (STscStmt*)stmt;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
SSubmitRsp *pRsp = NULL;
|
||||||
|
bool autoCreateTbl = pStmt->exec.autoCreateTbl;
|
||||||
|
|
||||||
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_EXECUTE));
|
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_EXECUTE));
|
||||||
|
|
||||||
if (STMT_TYPE_QUERY == pStmt->sql.type) {
|
if (STMT_TYPE_QUERY == pStmt->sql.type) {
|
||||||
scheduleQuery(pStmt->exec.pRequest, pStmt->sql.pQueryPlan, pStmt->sql.nodeList);
|
scheduleQuery(pStmt->exec.pRequest, pStmt->sql.pQueryPlan, pStmt->sql.nodeList, NULL);
|
||||||
} else {
|
} else {
|
||||||
STMT_ERR_RET(qBuildStmtOutput(pStmt->sql.pQuery, pStmt->exec.pVgHash, pStmt->exec.pBlockHash));
|
STMT_ERR_RET(qBuildStmtOutput(pStmt->sql.pQuery, pStmt->exec.pVgHash, pStmt->exec.pBlockHash));
|
||||||
launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, TSDB_CODE_SUCCESS, true);
|
launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, TSDB_CODE_SUCCESS, true, (autoCreateTbl ? (void**)&pRsp : NULL));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pStmt->exec.pRequest->code && NEED_CLIENT_HANDLE_ERROR(pStmt->exec.pRequest->code)) {
|
if (pStmt->exec.pRequest->code && NEED_CLIENT_HANDLE_ERROR(pStmt->exec.pRequest->code)) {
|
||||||
|
@ -643,6 +743,15 @@ int stmtExec(TAOS_STMT *stmt) {
|
||||||
_return:
|
_return:
|
||||||
|
|
||||||
stmtCleanExecInfo(pStmt, (code ? false : true), false);
|
stmtCleanExecInfo(pStmt, (code ? false : true), false);
|
||||||
|
|
||||||
|
if (TSDB_CODE_SUCCESS == code && autoCreateTbl) {
|
||||||
|
if (NULL == pRsp) {
|
||||||
|
tscError("no submit resp got for auto create table");
|
||||||
|
STMT_ERR_RET(TSDB_CODE_TSC_APP_ERROR);
|
||||||
|
}
|
||||||
|
|
||||||
|
STMT_ERR_RET(stmtUpdateTableUid(pStmt, pRsp));
|
||||||
|
}
|
||||||
|
|
||||||
++pStmt->sql.runTimes;
|
++pStmt->sql.runTimes;
|
||||||
|
|
||||||
|
|
|
@ -4022,7 +4022,7 @@ static int32_t tEncodeSSubmitBlkRsp(SEncoder *pEncoder, const SSubmitBlkRsp *pBl
|
||||||
if (tEncodeI8(pEncoder, pBlock->hashMeta) < 0) return -1;
|
if (tEncodeI8(pEncoder, pBlock->hashMeta) < 0) return -1;
|
||||||
if (pBlock->hashMeta) {
|
if (pBlock->hashMeta) {
|
||||||
if (tEncodeI64(pEncoder, pBlock->uid) < 0) return -1;
|
if (tEncodeI64(pEncoder, pBlock->uid) < 0) return -1;
|
||||||
if (tEncodeCStr(pEncoder, pBlock->ename) < 0) return -1;
|
if (tEncodeCStr(pEncoder, pBlock->tblFName) < 0) return -1;
|
||||||
}
|
}
|
||||||
if (tEncodeI32v(pEncoder, pBlock->numOfRows) < 0) return -1;
|
if (tEncodeI32v(pEncoder, pBlock->numOfRows) < 0) return -1;
|
||||||
if (tEncodeI32v(pEncoder, pBlock->affectedRows) < 0) return -1;
|
if (tEncodeI32v(pEncoder, pBlock->affectedRows) < 0) return -1;
|
||||||
|
@ -4037,7 +4037,9 @@ static int32_t tDecodeSSubmitBlkRsp(SDecoder *pDecoder, SSubmitBlkRsp *pBlock) {
|
||||||
if (tDecodeI8(pDecoder, &pBlock->hashMeta) < 0) return -1;
|
if (tDecodeI8(pDecoder, &pBlock->hashMeta) < 0) return -1;
|
||||||
if (pBlock->hashMeta) {
|
if (pBlock->hashMeta) {
|
||||||
if (tDecodeI64(pDecoder, &pBlock->uid) < 0) return -1;
|
if (tDecodeI64(pDecoder, &pBlock->uid) < 0) return -1;
|
||||||
if (tDecodeCStr(pDecoder, &pBlock->dname) < 0) return -1;
|
pBlock->tblFName= taosMemoryCalloc(TSDB_TABLE_FNAME_LEN, 1);
|
||||||
|
if (NULL == pBlock->tblFName) return -1;
|
||||||
|
if (tDecodeCStrTo(pDecoder, pBlock->tblFName) < 0) return -1;
|
||||||
}
|
}
|
||||||
if (tDecodeI32v(pDecoder, &pBlock->numOfRows) < 0) return -1;
|
if (tDecodeI32v(pDecoder, &pBlock->numOfRows) < 0) return -1;
|
||||||
if (tDecodeI32v(pDecoder, &pBlock->affectedRows) < 0) return -1;
|
if (tDecodeI32v(pDecoder, &pBlock->affectedRows) < 0) return -1;
|
||||||
|
@ -4068,12 +4070,29 @@ int32_t tDecodeSSubmitRsp(SDecoder *pDecoder, SSubmitRsp *pRsp) {
|
||||||
if (tDecodeI32v(pDecoder, &pRsp->numOfRows) < 0) return -1;
|
if (tDecodeI32v(pDecoder, &pRsp->numOfRows) < 0) return -1;
|
||||||
if (tDecodeI32v(pDecoder, &pRsp->affectedRows) < 0) return -1;
|
if (tDecodeI32v(pDecoder, &pRsp->affectedRows) < 0) return -1;
|
||||||
if (tDecodeI32v(pDecoder, &pRsp->nBlocks) < 0) return -1;
|
if (tDecodeI32v(pDecoder, &pRsp->nBlocks) < 0) return -1;
|
||||||
pRsp->pBlocks = tDecoderMalloc(pDecoder, sizeof(*pRsp->pBlocks) * pRsp->nBlocks);
|
pRsp->pBlocks = taosMemoryCalloc(pRsp->nBlocks, sizeof(*pRsp->pBlocks));
|
||||||
if (pRsp->pBlocks == NULL) return -1;
|
if (pRsp->pBlocks == NULL) return -1;
|
||||||
for (int32_t iBlock = 0; iBlock < pRsp->nBlocks; iBlock++) {
|
for (int32_t iBlock = 0; iBlock < pRsp->nBlocks; iBlock++) {
|
||||||
if (tDecodeSSubmitBlkRsp(pDecoder, pRsp->pBlocks + iBlock) < 0) return -1;
|
if (tDecodeSSubmitBlkRsp(pDecoder, pRsp->pBlocks + iBlock) < 0) return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
tEndDecode(pDecoder);
|
tEndDecode(pDecoder);
|
||||||
|
tDecoderClear(pDecoder);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void tFreeSSubmitRsp(SSubmitRsp *pRsp) {
|
||||||
|
if (NULL == pRsp) return;
|
||||||
|
|
||||||
|
if (pRsp->pBlocks) {
|
||||||
|
for (int32_t i = 0; i < pRsp->nBlocks; ++i) {
|
||||||
|
SSubmitBlkRsp *sRsp = pRsp->pBlocks + i;
|
||||||
|
taosMemoryFree(sRsp->tblFName);
|
||||||
|
}
|
||||||
|
|
||||||
|
taosMemoryFree(pRsp->pBlocks);
|
||||||
|
}
|
||||||
|
|
||||||
|
taosMemoryFree(pRsp);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
|
@ -342,8 +342,8 @@ int tsdbInsertTableData(STsdb *pTsdb, SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlo
|
||||||
if (pMemTable->keyMin > keyMin) pMemTable->keyMin = keyMin;
|
if (pMemTable->keyMin > keyMin) pMemTable->keyMin = keyMin;
|
||||||
if (pMemTable->keyMax < keyMax) pMemTable->keyMax = keyMax;
|
if (pMemTable->keyMax < keyMax) pMemTable->keyMax = keyMax;
|
||||||
|
|
||||||
pRsp->numOfRows = pRsp->numOfRows;
|
pRsp->numOfRows = pMsgIter->numOfRows;
|
||||||
pRsp->affectedRows = pRsp->affectedRows;
|
pRsp->affectedRows = pMsgIter->numOfRows;
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -119,7 +119,7 @@ _exit:
|
||||||
taosMemoryFree(metaRsp.pSchemas);
|
taosMemoryFree(metaRsp.pSchemas);
|
||||||
metaReaderClear(&mer2);
|
metaReaderClear(&mer2);
|
||||||
metaReaderClear(&mer1);
|
metaReaderClear(&mer1);
|
||||||
return code;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) {
|
int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) {
|
||||||
|
|
|
@ -497,43 +497,51 @@ _exit:
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int vnodeDebugPrintSingleSubmitMsg(SMeta *pMeta, SSubmitBlk *pBlock, SSubmitMsgIter *msgIter, const char *tags) {
|
||||||
|
SSubmitBlkIter blkIter = {0};
|
||||||
|
STSchema *pSchema = NULL;
|
||||||
|
tb_uid_t suid = 0;
|
||||||
|
STSRow *row = NULL;
|
||||||
|
|
||||||
|
tInitSubmitBlkIter(msgIter, pBlock, &blkIter);
|
||||||
|
if (blkIter.row == NULL) return 0;
|
||||||
|
if (!pSchema || (suid != msgIter->suid)) {
|
||||||
|
if (pSchema) {
|
||||||
|
taosMemoryFreeClear(pSchema);
|
||||||
|
}
|
||||||
|
pSchema = metaGetTbTSchema(pMeta, msgIter->suid, 0); // TODO: use the real schema
|
||||||
|
if (pSchema) {
|
||||||
|
suid = msgIter->suid;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!pSchema) {
|
||||||
|
printf("%s:%d no valid schema\n", tags, __LINE__);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
char __tags[128] = {0};
|
||||||
|
snprintf(__tags, 128, "%s: uid %" PRIi64 " ", tags, msgIter->uid);
|
||||||
|
while ((row = tGetSubmitBlkNext(&blkIter))) {
|
||||||
|
tdSRowPrint(row, pSchema, __tags);
|
||||||
|
}
|
||||||
|
|
||||||
|
taosMemoryFreeClear(pSchema);
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
static int vnodeDebugPrintSubmitMsg(SVnode *pVnode, SSubmitReq *pMsg, const char *tags) {
|
static int vnodeDebugPrintSubmitMsg(SVnode *pVnode, SSubmitReq *pMsg, const char *tags) {
|
||||||
ASSERT(pMsg != NULL);
|
ASSERT(pMsg != NULL);
|
||||||
SSubmitMsgIter msgIter = {0};
|
SSubmitMsgIter msgIter = {0};
|
||||||
SMeta *pMeta = pVnode->pMeta;
|
SMeta *pMeta = pVnode->pMeta;
|
||||||
SSubmitBlk *pBlock = NULL;
|
SSubmitBlk *pBlock = NULL;
|
||||||
SSubmitBlkIter blkIter = {0};
|
|
||||||
STSRow *row = NULL;
|
|
||||||
STSchema *pSchema = NULL;
|
|
||||||
tb_uid_t suid = 0;
|
|
||||||
|
|
||||||
if (tInitSubmitMsgIter(pMsg, &msgIter) < 0) return -1;
|
if (tInitSubmitMsgIter(pMsg, &msgIter) < 0) return -1;
|
||||||
while (true) {
|
while (true) {
|
||||||
if (tGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1;
|
if (tGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1;
|
||||||
if (pBlock == NULL) break;
|
if (pBlock == NULL) break;
|
||||||
tInitSubmitBlkIter(&msgIter, pBlock, &blkIter);
|
|
||||||
if (blkIter.row == NULL) continue;
|
|
||||||
if (!pSchema || (suid != msgIter.suid)) {
|
|
||||||
if (pSchema) {
|
|
||||||
taosMemoryFreeClear(pSchema);
|
|
||||||
}
|
|
||||||
pSchema = metaGetTbTSchema(pMeta, msgIter.suid, 0); // TODO: use the real schema
|
|
||||||
if (pSchema) {
|
|
||||||
suid = msgIter.suid;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (!pSchema) {
|
|
||||||
printf("%s:%d no valid schema\n", tags, __LINE__);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
char __tags[128] = {0};
|
|
||||||
snprintf(__tags, 128, "%s: uid %" PRIi64 " ", tags, msgIter.uid);
|
|
||||||
while ((row = tGetSubmitBlkNext(&blkIter))) {
|
|
||||||
tdSRowPrint(row, pSchema, __tags);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
taosMemoryFreeClear(pSchema);
|
vnodeDebugPrintSingleSubmitMsg(pMeta, pBlock, &msgIter, tags);
|
||||||
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -589,8 +597,8 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in
|
||||||
}
|
}
|
||||||
|
|
||||||
submitBlkRsp.uid = createTbReq.uid;
|
submitBlkRsp.uid = createTbReq.uid;
|
||||||
submitBlkRsp.ename = taosMemoryMalloc(strlen(pVnode->config.dbname) + strlen(createTbReq.name) + 2);
|
submitBlkRsp.tblFName = taosMemoryMalloc(strlen(pVnode->config.dbname) + strlen(createTbReq.name) + 2);
|
||||||
sprintf(submitBlkRsp.ename, "%s.%s", pVnode->config.dbname, createTbReq.name);
|
sprintf(submitBlkRsp.tblFName, "%s.%s", pVnode->config.dbname, createTbReq.name);
|
||||||
|
|
||||||
msgIter.uid = createTbReq.uid;
|
msgIter.uid = createTbReq.uid;
|
||||||
if (createTbReq.type == TSDB_CHILD_TABLE) {
|
if (createTbReq.type == TSDB_CHILD_TABLE) {
|
||||||
|
@ -599,6 +607,7 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in
|
||||||
msgIter.suid = 0;
|
msgIter.suid = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
vnodeDebugPrintSingleSubmitMsg(pVnode->pMeta, pBlock, &msgIter, "real uid");
|
||||||
tDecoderClear(&decoder);
|
tDecoderClear(&decoder);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -621,7 +630,7 @@ _exit:
|
||||||
tEncoderClear(&encoder);
|
tEncoderClear(&encoder);
|
||||||
|
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(submitRsp.pArray); i++) {
|
for (int32_t i = 0; i < taosArrayGetSize(submitRsp.pArray); i++) {
|
||||||
taosMemoryFree(((SSubmitBlkRsp *)taosArrayGet(submitRsp.pArray, i))[0].ename);
|
taosMemoryFree(((SSubmitBlkRsp *)taosArrayGet(submitRsp.pArray, i))[0].tblFName);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayDestroy(submitRsp.pArray);
|
taosArrayDestroy(submitRsp.pArray);
|
||||||
|
|
|
@ -839,6 +839,7 @@ static int32_t storeTableMeta(SInsertParseContext* pCxt, SHashObj* pHash, SName*
|
||||||
|
|
||||||
pMeta->uid = 0;
|
pMeta->uid = 0;
|
||||||
pMeta->vgId = vg.vgId;
|
pMeta->vgId = vg.vgId;
|
||||||
|
pMeta->tableType = TSDB_CHILD_TABLE;
|
||||||
|
|
||||||
STableMeta* pBackup = NULL;
|
STableMeta* pBackup = NULL;
|
||||||
if (TSDB_CODE_SUCCESS != cloneTableMeta(pMeta, &pBackup)) {
|
if (TSDB_CODE_SUCCESS != cloneTableMeta(pMeta, &pBackup)) {
|
||||||
|
@ -1094,8 +1095,6 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
|
||||||
|
|
||||||
sToken.z = tbName;
|
sToken.z = tbName;
|
||||||
sToken.n = strlen(tbName);
|
sToken.n = strlen(tbName);
|
||||||
|
|
||||||
autoCreateTbl = true;
|
|
||||||
} else {
|
} else {
|
||||||
return buildSyntaxErrMsg(&pCxt->msg, "? only used in stmt", sToken.z);
|
return buildSyntaxErrMsg(&pCxt->msg, "? only used in stmt", sToken.z);
|
||||||
}
|
}
|
||||||
|
@ -1112,6 +1111,7 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
|
||||||
if (TK_USING == sToken.type) {
|
if (TK_USING == sToken.type) {
|
||||||
CHECK_CODE(parseUsingClause(pCxt, &name, tbFName));
|
CHECK_CODE(parseUsingClause(pCxt, &name, tbFName));
|
||||||
NEXT_TOKEN(pCxt->pSql, sToken);
|
NEXT_TOKEN(pCxt->pSql, sToken);
|
||||||
|
autoCreateTbl = true;
|
||||||
} else {
|
} else {
|
||||||
CHECK_CODE(getTableMeta(pCxt, &name, tbFName));
|
CHECK_CODE(getTableMeta(pCxt, &name, tbFName));
|
||||||
}
|
}
|
||||||
|
|
|
@ -445,7 +445,7 @@ int32_t mergeTableDataBlocks(SHashObj* pHashObj, uint8_t payloadType, SArray** p
|
||||||
const int INSERT_HEAD_SIZE = sizeof(SSubmitReq);
|
const int INSERT_HEAD_SIZE = sizeof(SSubmitReq);
|
||||||
int code = 0;
|
int code = 0;
|
||||||
bool isRawPayload = IS_RAW_PAYLOAD(payloadType);
|
bool isRawPayload = IS_RAW_PAYLOAD(payloadType);
|
||||||
SHashObj* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
|
SHashObj* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false);
|
||||||
SArray* pVnodeDataBlockList = taosArrayInit(8, POINTER_BYTES);
|
SArray* pVnodeDataBlockList = taosArrayInit(8, POINTER_BYTES);
|
||||||
|
|
||||||
STableDataBlocks** p = taosHashIterate(pHashObj, NULL);
|
STableDataBlocks** p = taosHashIterate(pHashObj, NULL);
|
||||||
|
|
|
@ -159,6 +159,7 @@ typedef struct SSchTask {
|
||||||
|
|
||||||
typedef struct SSchJobAttr {
|
typedef struct SSchJobAttr {
|
||||||
EExplainMode explainMode;
|
EExplainMode explainMode;
|
||||||
|
bool needRes;
|
||||||
bool syncSchedule;
|
bool syncSchedule;
|
||||||
bool queryJob;
|
bool queryJob;
|
||||||
bool needFlowCtrl;
|
bool needFlowCtrl;
|
||||||
|
@ -190,6 +191,7 @@ typedef struct SSchJob {
|
||||||
SSchTask *fetchTask;
|
SSchTask *fetchTask;
|
||||||
int32_t errCode;
|
int32_t errCode;
|
||||||
SArray *errList; // SArray<SQueryErrorInfo>
|
SArray *errList; // SArray<SQueryErrorInfo>
|
||||||
|
SRWLatch resLock;
|
||||||
void *resData; //TODO free it or not
|
void *resData; //TODO free it or not
|
||||||
int32_t resNumOfRows;
|
int32_t resNumOfRows;
|
||||||
const char *sql;
|
const char *sql;
|
||||||
|
|
|
@ -70,7 +70,7 @@ int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t schInitJob(SSchJob **pSchJob, SQueryPlan *pDag, void *transport, SArray *pNodeList, const char *sql,
|
int32_t schInitJob(SSchJob **pSchJob, SQueryPlan *pDag, void *transport, SArray *pNodeList, const char *sql,
|
||||||
int64_t startTs, bool syncSchedule) {
|
int64_t startTs, bool needRes, bool syncSchedule) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int64_t refId = -1;
|
int64_t refId = -1;
|
||||||
SSchJob *pJob = taosMemoryCalloc(1, sizeof(SSchJob));
|
SSchJob *pJob = taosMemoryCalloc(1, sizeof(SSchJob));
|
||||||
|
@ -81,6 +81,7 @@ int32_t schInitJob(SSchJob **pSchJob, SQueryPlan *pDag, void *transport, SArray
|
||||||
|
|
||||||
pJob->attr.explainMode = pDag->explainInfo.mode;
|
pJob->attr.explainMode = pDag->explainInfo.mode;
|
||||||
pJob->attr.syncSchedule = syncSchedule;
|
pJob->attr.syncSchedule = syncSchedule;
|
||||||
|
pJob->attr.needRes = needRes;
|
||||||
pJob->transport = transport;
|
pJob->transport = transport;
|
||||||
pJob->sql = sql;
|
pJob->sql = sql;
|
||||||
|
|
||||||
|
@ -1133,16 +1134,39 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case TDMT_VND_SUBMIT_RSP: {
|
case TDMT_VND_SUBMIT_RSP: {
|
||||||
if (msg) {
|
|
||||||
SSubmitRsp *rsp = (SSubmitRsp *)msg;
|
|
||||||
// SCH_ERR_JRET(rsp->code);
|
|
||||||
}
|
|
||||||
|
|
||||||
SCH_ERR_JRET(rspCode);
|
SCH_ERR_JRET(rspCode);
|
||||||
|
|
||||||
SSubmitRsp *rsp = (SSubmitRsp *)msg;
|
if (msg) {
|
||||||
if (rsp) {
|
SDecoder coder = {0};
|
||||||
pJob->resNumOfRows += rsp->affectedRows;
|
SSubmitRsp *rsp = taosMemoryMalloc(sizeof(*rsp));
|
||||||
|
tDecoderInit(&coder, msg, msgSize);
|
||||||
|
code = tDecodeSSubmitRsp(&coder, rsp);
|
||||||
|
if (code) {
|
||||||
|
SCH_TASK_ELOG("decode submitRsp failed, code:%d", code);
|
||||||
|
tFreeSSubmitRsp(rsp);
|
||||||
|
SCH_ERR_JRET(code);
|
||||||
|
}
|
||||||
|
|
||||||
|
atomic_add_fetch_32(&pJob->resNumOfRows, rsp->affectedRows);
|
||||||
|
SCH_TASK_DLOG("submit succeed, affectedRows:%d", rsp->affectedRows);
|
||||||
|
|
||||||
|
if (pJob->attr.needRes) {
|
||||||
|
SCH_LOCK(SCH_WRITE, &pJob->resLock);
|
||||||
|
if (pJob->resData) {
|
||||||
|
SSubmitRsp *sum = pJob->resData;
|
||||||
|
sum->affectedRows += rsp->affectedRows;
|
||||||
|
sum->nBlocks += rsp->nBlocks;
|
||||||
|
sum->pBlocks = taosMemoryRealloc(sum->pBlocks, sum->nBlocks * sizeof(*sum->pBlocks));
|
||||||
|
memcpy(sum->pBlocks + sum->nBlocks - rsp->nBlocks, rsp->pBlocks, rsp->nBlocks * sizeof(*sum->pBlocks));
|
||||||
|
taosMemoryFree(rsp->pBlocks);
|
||||||
|
taosMemoryFree(rsp);
|
||||||
|
} else {
|
||||||
|
pJob->resData = rsp;
|
||||||
|
}
|
||||||
|
SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
|
||||||
|
} else {
|
||||||
|
tFreeSSubmitRsp(rsp);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
|
SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
|
||||||
|
@ -2350,7 +2374,7 @@ void schFreeJobImpl(void *job) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql,
|
static int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql,
|
||||||
int64_t startTs, bool syncSchedule) {
|
int64_t startTs, bool needRes, bool syncSchedule) {
|
||||||
qDebug("QID:0x%" PRIx64 " job started", pDag->queryId);
|
qDebug("QID:0x%" PRIx64 " job started", pDag->queryId);
|
||||||
|
|
||||||
if (pNodeList == NULL || taosArrayGetSize(pNodeList) <= 0) {
|
if (pNodeList == NULL || taosArrayGetSize(pNodeList) <= 0) {
|
||||||
|
@ -2359,7 +2383,7 @@ static int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryPlan *pD
|
||||||
|
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SSchJob *pJob = NULL;
|
SSchJob *pJob = NULL;
|
||||||
SCH_ERR_JRET(schInitJob(&pJob, pDag, transport, pNodeList, sql, startTs, syncSchedule));
|
SCH_ERR_JRET(schInitJob(&pJob, pDag, transport, pNodeList, sql, startTs, needRes, syncSchedule));
|
||||||
|
|
||||||
SCH_ERR_JRET(schLaunchJob(pJob));
|
SCH_ERR_JRET(schLaunchJob(pJob));
|
||||||
|
|
||||||
|
@ -2473,7 +2497,7 @@ int32_t schedulerInit(SSchedulerCfg *cfg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql,
|
int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql,
|
||||||
int64_t startTs, SQueryResult *pRes) {
|
int64_t startTs, bool needRes, SQueryResult *pRes) {
|
||||||
if (NULL == transport || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob || NULL == pRes) {
|
if (NULL == transport || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob || NULL == pRes) {
|
||||||
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||||
}
|
}
|
||||||
|
@ -2481,13 +2505,17 @@ int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryPlan *pDag, in
|
||||||
if (EXPLAIN_MODE_STATIC == pDag->explainInfo.mode) {
|
if (EXPLAIN_MODE_STATIC == pDag->explainInfo.mode) {
|
||||||
SCH_ERR_RET(schExecStaticExplain(transport, nodeList, pDag, pJob, sql, true));
|
SCH_ERR_RET(schExecStaticExplain(transport, nodeList, pDag, pJob, sql, true));
|
||||||
} else {
|
} else {
|
||||||
SCH_ERR_RET(schExecJobImpl(transport, nodeList, pDag, pJob, sql, startTs, true));
|
SCH_ERR_RET(schExecJobImpl(transport, nodeList, pDag, pJob, sql, startTs, needRes, true));
|
||||||
}
|
}
|
||||||
|
|
||||||
SSchJob *job = schAcquireJob(*pJob);
|
SSchJob *job = schAcquireJob(*pJob);
|
||||||
|
|
||||||
pRes->code = atomic_load_32(&job->errCode);
|
pRes->code = atomic_load_32(&job->errCode);
|
||||||
pRes->numOfRows = job->resNumOfRows;
|
pRes->numOfRows = job->resNumOfRows;
|
||||||
|
if (needRes) {
|
||||||
|
pRes->res = job->resData;
|
||||||
|
job->resData = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
schReleaseJob(*pJob);
|
schReleaseJob(*pJob);
|
||||||
|
|
||||||
|
@ -2502,7 +2530,7 @@ int32_t schedulerAsyncExecJob(void *transport, SArray *pNodeList, SQueryPlan *pD
|
||||||
if (EXPLAIN_MODE_STATIC == pDag->explainInfo.mode) {
|
if (EXPLAIN_MODE_STATIC == pDag->explainInfo.mode) {
|
||||||
SCH_ERR_RET(schExecStaticExplain(transport, pNodeList, pDag, pJob, sql, false));
|
SCH_ERR_RET(schExecStaticExplain(transport, pNodeList, pDag, pJob, sql, false));
|
||||||
} else {
|
} else {
|
||||||
SCH_ERR_RET(schExecJobImpl(transport, pNodeList, pDag, pJob, sql, 0, false));
|
SCH_ERR_RET(schExecJobImpl(transport, pNodeList, pDag, pJob, sql, 0, false, false));
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
|
@ -985,7 +985,7 @@ TEST(insertTest, normalCase) {
|
||||||
taosThreadCreate(&(thread1), &thattr, schtSendRsp, &insertJobRefId);
|
taosThreadCreate(&(thread1), &thattr, schtSendRsp, &insertJobRefId);
|
||||||
|
|
||||||
SQueryResult res = {0};
|
SQueryResult res = {0};
|
||||||
code = schedulerExecJob(mockPointer, qnodeList, &dag, &insertJobRefId, "insert into tb values(now,1)", 0, &res);
|
code = schedulerExecJob(mockPointer, qnodeList, &dag, &insertJobRefId, "insert into tb values(now,1)", 0, false, &res);
|
||||||
ASSERT_EQ(code, 0);
|
ASSERT_EQ(code, 0);
|
||||||
ASSERT_EQ(res.numOfRows, 20);
|
ASSERT_EQ(res.numOfRows, 20);
|
||||||
|
|
||||||
|
|
|
@ -213,7 +213,7 @@ CaseCtrl gCaseCtrl = { // default
|
||||||
|
|
||||||
|
|
||||||
#if 1
|
#if 1
|
||||||
CaseCtrl gCaseCtrl = { // default
|
CaseCtrl gCaseCtrl = {
|
||||||
.bindNullNum = 0,
|
.bindNullNum = 0,
|
||||||
.printCreateTblSql = true,
|
.printCreateTblSql = true,
|
||||||
.printQuerySql = true,
|
.printQuerySql = true,
|
||||||
|
@ -233,9 +233,9 @@ CaseCtrl gCaseCtrl = { // default
|
||||||
.printRes = true,
|
.printRes = true,
|
||||||
.runTimes = 0,
|
.runTimes = 0,
|
||||||
.caseIdx = -1,
|
.caseIdx = -1,
|
||||||
.caseNum = 1,
|
.caseNum = 15,
|
||||||
.caseRunIdx = 11,
|
.caseRunIdx = 8,
|
||||||
.caseRunNum = 1,
|
.caseRunNum = 15,
|
||||||
};
|
};
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue