Merge pull request #12349 from taosdata/feature/qnode
feat: stmt auto create table
This commit is contained in:
commit
31d0fa36a0
|
@ -252,24 +252,31 @@ STSRow* tGetSubmitBlkNext(SSubmitBlkIter* pIter);
|
|||
int32_t tPrintFixedSchemaSubmitReq(const SSubmitReq* pReq, STSchema* pSchema);
|
||||
|
||||
typedef struct {
|
||||
int32_t index; // index of failed block in submit blocks
|
||||
int32_t vnode; // vnode index of failed block
|
||||
int32_t sid; // table index of failed block
|
||||
int32_t code; // errorcode while write data to vnode, such as not created, dropped, no space, invalid table
|
||||
} SSubmitRspBlock;
|
||||
int8_t hashMeta;
|
||||
int64_t uid;
|
||||
char* tblFName;
|
||||
int32_t numOfRows;
|
||||
int32_t affectedRows;
|
||||
} SSubmitBlkRsp;
|
||||
|
||||
typedef struct {
|
||||
int32_t code; // 0-success, > 0 error code
|
||||
int32_t numOfRows; // number of records the client is trying to write
|
||||
int32_t affectedRows; // number of records actually written
|
||||
int32_t failedRows; // number of failed records (exclude duplicate records)
|
||||
int32_t numOfFailedBlocks;
|
||||
SSubmitRspBlock failedBlocks[];
|
||||
int32_t numOfRows;
|
||||
int32_t affectedRows;
|
||||
int32_t nBlocks;
|
||||
union {
|
||||
SArray* pArray;
|
||||
SSubmitBlkRsp* pBlocks;
|
||||
};
|
||||
} SSubmitRsp;
|
||||
|
||||
int32_t tEncodeSSubmitRsp(SEncoder* pEncoder, const SSubmitRsp* pRsp);
|
||||
int32_t tDecodeSSubmitRsp(SDecoder* pDecoder, SSubmitRsp* pRsp);
|
||||
void tFreeSSubmitRsp(SSubmitRsp *pRsp);
|
||||
|
||||
#define COL_SMA_ON ((int8_t)0x1)
|
||||
#define COL_IDX_ON ((int8_t)0x2)
|
||||
#define COL_VAL_SET ((int8_t)0x4)
|
||||
|
||||
typedef struct SSchema {
|
||||
int8_t type;
|
||||
int8_t flags;
|
||||
|
@ -476,6 +483,7 @@ typedef struct {
|
|||
int32_t acctId;
|
||||
int64_t clusterId;
|
||||
uint32_t connId;
|
||||
int32_t dnodeNum;
|
||||
int8_t superUser;
|
||||
int8_t connType;
|
||||
SEpSet epSet;
|
||||
|
|
|
@ -26,8 +26,7 @@ extern "C" {
|
|||
typedef struct SStmtCallback {
|
||||
TAOS_STMT* pStmt;
|
||||
int32_t (*getTbNameFn)(TAOS_STMT*, char**);
|
||||
int32_t (*setBindInfoFn)(TAOS_STMT*, STableMeta*, void*);
|
||||
int32_t (*setExecInfoFn)(TAOS_STMT*, SHashObj*, SHashObj*);
|
||||
int32_t (*setInfoFn)(TAOS_STMT*, STableMeta*, void*, char*, bool, SHashObj*, SHashObj*);
|
||||
int32_t (*getExecInfoFn)(TAOS_STMT*, SHashObj**, SHashObj**);
|
||||
} SStmtCallback;
|
||||
|
||||
|
@ -59,8 +58,9 @@ int32_t qBuildStmtOutput(SQuery* pQuery, SHashObj* pVgHash, SHashObj* pBlockHash
|
|||
int32_t qResetStmtDataBlock(void* block, bool keepBuf);
|
||||
int32_t qCloneStmtDataBlock(void** pDst, void* pSrc);
|
||||
void qFreeStmtDataBlock(void* pDataBlock);
|
||||
int32_t qRebuildStmtDataBlock(void** pDst, void* pSrc);
|
||||
int32_t qRebuildStmtDataBlock(void** pDst, void* pSrc, uint64_t uid, int32_t vgId);
|
||||
void qDestroyStmtDataBlock(void* pBlock);
|
||||
STableMeta *qGetTableMetaInDataBlock(void* pDataBlock);
|
||||
|
||||
int32_t qBindStmtColsValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen);
|
||||
int32_t qBindStmtSingleColValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen, int32_t colIdx,
|
||||
|
|
|
@ -56,6 +56,7 @@ typedef struct SQueryResult {
|
|||
uint64_t numOfRows;
|
||||
int32_t msgSize;
|
||||
char *msg;
|
||||
void *res;
|
||||
} SQueryResult;
|
||||
|
||||
typedef struct STaskInfo {
|
||||
|
@ -71,7 +72,7 @@ int32_t schedulerInit(SSchedulerCfg *cfg);
|
|||
* @param nodeList Qnode/Vnode address list, element is SQueryNodeAddr
|
||||
* @return
|
||||
*/
|
||||
int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql, int64_t startTs, SQueryResult *pRes);
|
||||
int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql, int64_t startTs, bool needRes, SQueryResult *pRes);
|
||||
|
||||
/**
|
||||
* Process the query job, generated according to the query physical plan.
|
||||
|
|
|
@ -307,9 +307,9 @@ int hbAddConnInfo(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, void* key, void* v
|
|||
// --- mq
|
||||
void hbMgrInitMqHbRspHandle();
|
||||
|
||||
SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, int32_t code, bool keepQuery);
|
||||
SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, int32_t code, bool keepQuery, void** res);
|
||||
int32_t getQueryPlan(SRequestObj* pRequest, SQuery* pQuery, SArray** pNodeList);
|
||||
int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList);
|
||||
int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList, void** res);
|
||||
int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest);
|
||||
|
||||
#ifdef __cplusplus
|
||||
|
|
|
@ -55,6 +55,7 @@ typedef struct SStmtQueryResInfo {
|
|||
|
||||
typedef struct SStmtBindInfo {
|
||||
bool needParse;
|
||||
bool inExecCache;
|
||||
uint64_t tbUid;
|
||||
uint64_t tbSuid;
|
||||
int32_t sBindRowNum;
|
||||
|
@ -62,7 +63,9 @@ typedef struct SStmtBindInfo {
|
|||
int8_t tbType;
|
||||
bool tagsCached;
|
||||
void* boundTags;
|
||||
char* tbName;
|
||||
char tbName[TSDB_TABLE_FNAME_LEN];;
|
||||
char tbFName[TSDB_TABLE_FNAME_LEN];
|
||||
char stbFName[TSDB_TABLE_FNAME_LEN];
|
||||
SName sname;
|
||||
} SStmtBindInfo;
|
||||
|
||||
|
@ -71,12 +74,12 @@ typedef struct SStmtExecInfo {
|
|||
SRequestObj* pRequest;
|
||||
SHashObj* pVgHash;
|
||||
SHashObj* pBlockHash;
|
||||
bool autoCreateTbl;
|
||||
} SStmtExecInfo;
|
||||
|
||||
typedef struct SStmtSQLInfo {
|
||||
STMT_TYPE type;
|
||||
STMT_STATUS status;
|
||||
bool autoCreate;
|
||||
uint64_t runTimes;
|
||||
SHashObj* pTableCache; //SHash<SStmtTableCache>
|
||||
SQuery* pQuery;
|
||||
|
@ -85,6 +88,7 @@ typedef struct SStmtSQLInfo {
|
|||
SArray* nodeList;
|
||||
SQueryPlan* pQueryPlan;
|
||||
SStmtQueryResInfo queryRes;
|
||||
bool autoCreateTbl;
|
||||
} SStmtSQLInfo;
|
||||
|
||||
typedef struct STscStmt {
|
||||
|
|
|
@ -286,12 +286,12 @@ void setResPrecision(SReqResultInfo* pResInfo, int32_t precision) {
|
|||
pResInfo->precision = precision;
|
||||
}
|
||||
|
||||
int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList) {
|
||||
int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList, void** pRes) {
|
||||
void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter;
|
||||
|
||||
SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf};
|
||||
int32_t code = schedulerExecJob(pTransporter, pNodeList, pDag, &pRequest->body.queryJob, pRequest->sqlstr,
|
||||
pRequest->metric.start, &res);
|
||||
pRequest->metric.start, NULL != pRes, &res);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
if (pRequest->body.queryJob != 0) {
|
||||
schedulerFreeJob(pRequest->body.queryJob);
|
||||
|
@ -310,6 +310,10 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList
|
|||
}
|
||||
}
|
||||
|
||||
if (pRes) {
|
||||
*pRes = res.res;
|
||||
}
|
||||
|
||||
pRequest->code = res.code;
|
||||
terrno = res.code;
|
||||
return pRequest->code;
|
||||
|
@ -320,7 +324,7 @@ int32_t getQueryPlan(SRequestObj* pRequest, SQuery* pQuery, SArray** pNodeList)
|
|||
return getPlan(pRequest, pQuery, &pRequest->body.pDag, *pNodeList);
|
||||
}
|
||||
|
||||
SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, int32_t code, bool keepQuery) {
|
||||
SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, int32_t code, bool keepQuery, void** res) {
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
switch (pQuery->execMode) {
|
||||
case QUERY_EXEC_MODE_LOCAL:
|
||||
|
@ -333,7 +337,7 @@ SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, int32_t code
|
|||
SArray* pNodeList = taosArrayInit(4, sizeof(struct SQueryNodeAddr));
|
||||
code = getPlan(pRequest, pQuery, &pRequest->body.pDag, pNodeList);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = scheduleQuery(pRequest, pRequest->body.pDag, pNodeList);
|
||||
code = scheduleQuery(pRequest, pRequest->body.pDag, pNodeList, res);
|
||||
}
|
||||
taosArrayDestroy(pNodeList);
|
||||
break;
|
||||
|
@ -373,7 +377,7 @@ SRequestObj* launchQuery(STscObj* pTscObj, const char* sql, int sqlLen) {
|
|||
return pRequest;
|
||||
}
|
||||
|
||||
return launchQueryImpl(pRequest, pQuery, code, false);
|
||||
return launchQueryImpl(pRequest, pQuery, code, false, NULL);
|
||||
}
|
||||
|
||||
int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest) {
|
||||
|
|
|
@ -58,7 +58,7 @@ int32_t processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) {
|
|||
return code;
|
||||
}
|
||||
|
||||
if (!isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, &connectRsp.epSet)) {
|
||||
if (connectRsp.dnodeNum > 1 && !isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, &connectRsp.epSet)) {
|
||||
updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, &connectRsp.epSet);
|
||||
}
|
||||
|
||||
|
|
|
@ -1660,7 +1660,7 @@ static int32_t smlInsertData(SSmlHandle* info) {
|
|||
smlBuildOutput(info->exec, info->pVgHash);
|
||||
info->cost.insertRpcTime = taosGetTimestampUs();
|
||||
|
||||
launchQueryImpl(info->pRequest, info->pQuery, TSDB_CODE_SUCCESS, true);
|
||||
launchQueryImpl(info->pRequest, info->pQuery, TSDB_CODE_SUCCESS, true, NULL);
|
||||
|
||||
info->affectedRows = taos_affected_rows(info->pRequest);
|
||||
return info->pRequest->code;
|
||||
|
|
|
@ -67,8 +67,8 @@ int32_t stmtGetTbName(TAOS_STMT* stmt, char** tbName) {
|
|||
STscStmt* pStmt = (STscStmt*)stmt;
|
||||
|
||||
pStmt->sql.type = STMT_TYPE_MULTI_INSERT;
|
||||
|
||||
if (NULL == pStmt->bInfo.tbName) {
|
||||
|
||||
if ('\0' == pStmt->bInfo.tbName[0]) {
|
||||
tscError("no table name set");
|
||||
STMT_ERR_RET(TSDB_CODE_TSC_STMT_TBNAME_ERROR);
|
||||
}
|
||||
|
@ -121,9 +121,12 @@ int32_t stmtRestoreQueryFields(STscStmt* pStmt) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t stmtSetBindInfo(TAOS_STMT* stmt, STableMeta* pTableMeta, void* tags) {
|
||||
int32_t stmtUpdateBindInfo(TAOS_STMT* stmt, STableMeta* pTableMeta, void* tags, char* tbFName) {
|
||||
STscStmt* pStmt = (STscStmt*)stmt;
|
||||
|
||||
strncpy(pStmt->bInfo.tbFName, tbFName, sizeof(pStmt->bInfo.tbFName) - 1);
|
||||
pStmt->bInfo.tbFName[sizeof(pStmt->bInfo.tbFName) - 1] = 0;
|
||||
|
||||
pStmt->bInfo.tbUid = pTableMeta->uid;
|
||||
pStmt->bInfo.tbSuid = pTableMeta->suid;
|
||||
pStmt->bInfo.tbType = pTableMeta->tableType;
|
||||
|
@ -133,15 +136,28 @@ int32_t stmtSetBindInfo(TAOS_STMT* stmt, STableMeta* pTableMeta, void* tags) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t stmtSetExecInfo(TAOS_STMT* stmt, SHashObj* pVgHash, SHashObj* pBlockHash) {
|
||||
int32_t stmtUpdateExecInfo(TAOS_STMT* stmt, SHashObj* pVgHash, SHashObj* pBlockHash, bool autoCreateTbl) {
|
||||
STscStmt* pStmt = (STscStmt*)stmt;
|
||||
|
||||
pStmt->exec.pVgHash = pVgHash;
|
||||
pStmt->exec.pBlockHash = pBlockHash;
|
||||
pStmt->exec.autoCreateTbl = autoCreateTbl;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t stmtUpdateInfo(TAOS_STMT* stmt, STableMeta* pTableMeta, void* tags, char* tbFName, bool autoCreateTbl, SHashObj* pVgHash, SHashObj* pBlockHash) {
|
||||
STscStmt* pStmt = (STscStmt*)stmt;
|
||||
|
||||
STMT_ERR_RET(stmtUpdateBindInfo(stmt, pTableMeta, tags, tbFName));
|
||||
STMT_ERR_RET(stmtUpdateExecInfo(stmt, pVgHash, pBlockHash, autoCreateTbl));
|
||||
|
||||
pStmt->sql.autoCreateTbl = autoCreateTbl;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int32_t stmtGetExecInfo(TAOS_STMT* stmt, SHashObj** pVgHash, SHashObj** pBlockHash) {
|
||||
STscStmt* pStmt = (STscStmt*)stmt;
|
||||
|
||||
|
@ -156,16 +172,16 @@ int32_t stmtCacheBlock(STscStmt* pStmt) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
uint64_t uid = pStmt->bInfo.tbUid;
|
||||
uint64_t tuid = (TSDB_CHILD_TABLE == pStmt->bInfo.tbType) ? pStmt->bInfo.tbSuid : uid;
|
||||
uint64_t uid = pStmt->bInfo.tbUid;
|
||||
uint64_t cacheUid = (TSDB_CHILD_TABLE == pStmt->bInfo.tbType) ? pStmt->bInfo.tbSuid : uid;
|
||||
|
||||
if (taosHashGet(pStmt->sql.pTableCache, &tuid, sizeof(tuid))) {
|
||||
if (taosHashGet(pStmt->sql.pTableCache, &cacheUid, sizeof(cacheUid))) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
STableDataBlocks** pSrc = taosHashGet(pStmt->exec.pBlockHash, &uid, sizeof(uid));
|
||||
STableDataBlocks* pDst = NULL;
|
||||
|
||||
STableDataBlocks** pSrc = taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
|
||||
STableDataBlocks* pDst = NULL;
|
||||
|
||||
STMT_ERR_RET(qCloneStmtDataBlock(&pDst, *pSrc));
|
||||
|
||||
SStmtTableCache cache = {
|
||||
|
@ -173,22 +189,25 @@ int32_t stmtCacheBlock(STscStmt* pStmt) {
|
|||
.boundTags = pStmt->bInfo.boundTags,
|
||||
};
|
||||
|
||||
if (taosHashPut(pStmt->sql.pTableCache, &tuid, sizeof(tuid), &cache, sizeof(cache))) {
|
||||
if (taosHashPut(pStmt->sql.pTableCache, &cacheUid, sizeof(cacheUid), &cache, sizeof(cache))) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
pStmt->bInfo.boundTags = NULL;
|
||||
|
||||
if (pStmt->sql.autoCreateTbl) {
|
||||
pStmt->bInfo.tagsCached = true;
|
||||
} else {
|
||||
pStmt->bInfo.boundTags = NULL;
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t stmtParseSql(STscStmt* pStmt) {
|
||||
SStmtCallback stmtCb = {
|
||||
.pStmt = pStmt,
|
||||
.getTbNameFn = stmtGetTbName,
|
||||
.setBindInfoFn = stmtSetBindInfo,
|
||||
.setExecInfoFn = stmtSetExecInfo,
|
||||
.getExecInfoFn = stmtGetExecInfo,
|
||||
.pStmt = pStmt,
|
||||
.getTbNameFn = stmtGetTbName,
|
||||
.setInfoFn = stmtUpdateInfo,
|
||||
.getExecInfoFn = stmtGetExecInfo,
|
||||
};
|
||||
|
||||
if (NULL == pStmt->exec.pRequest) {
|
||||
|
@ -221,8 +240,10 @@ int32_t stmtCleanBindInfo(STscStmt* pStmt) {
|
|||
pStmt->bInfo.tbSuid = 0;
|
||||
pStmt->bInfo.tbType = 0;
|
||||
pStmt->bInfo.needParse = true;
|
||||
pStmt->bInfo.inExecCache = false;
|
||||
|
||||
taosMemoryFreeClear(pStmt->bInfo.tbName);
|
||||
pStmt->bInfo.tbName[0] = 0;
|
||||
pStmt->bInfo.tbFName[0] = 0;
|
||||
if (!pStmt->bInfo.tagsCached) {
|
||||
destroyBoundColumnInfo(pStmt->bInfo.boundTags);
|
||||
taosMemoryFreeClear(pStmt->bInfo.boundTags);
|
||||
|
@ -237,12 +258,14 @@ int32_t stmtCleanExecInfo(STscStmt* pStmt, bool keepTable, bool freeRequest) {
|
|||
pStmt->exec.pRequest = NULL;
|
||||
}
|
||||
|
||||
void* pIter = taosHashIterate(pStmt->exec.pBlockHash, NULL);
|
||||
size_t keyLen = 0;
|
||||
void *pIter = taosHashIterate(pStmt->exec.pBlockHash, NULL);
|
||||
while (pIter) {
|
||||
STableDataBlocks* pBlocks = *(STableDataBlocks**)pIter;
|
||||
uint64_t* key = taosHashGetKey(pIter, NULL);
|
||||
|
||||
if (keepTable && (*key == pStmt->bInfo.tbUid)) {
|
||||
STableDataBlocks* pBlocks = *(STableDataBlocks**)pIter;
|
||||
char *key = taosHashGetKey(pIter, &keyLen);
|
||||
STableMeta* pMeta = qGetTableMetaInDataBlock(pBlocks);
|
||||
|
||||
if (keepTable && (strlen(pStmt->bInfo.tbFName) == keyLen) && strncmp(pStmt->bInfo.tbFName, key, keyLen) == 0) {
|
||||
STMT_ERR_RET(qResetStmtDataBlock(pBlocks, true));
|
||||
|
||||
pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
|
||||
|
@ -250,11 +273,13 @@ int32_t stmtCleanExecInfo(STscStmt* pStmt, bool keepTable, bool freeRequest) {
|
|||
}
|
||||
|
||||
qFreeStmtDataBlock(pBlocks);
|
||||
taosHashRemove(pStmt->exec.pBlockHash, key, sizeof(*key));
|
||||
taosHashRemove(pStmt->exec.pBlockHash, key, keyLen);
|
||||
|
||||
pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
|
||||
}
|
||||
|
||||
pStmt->exec.autoCreateTbl = false;
|
||||
|
||||
if (keepTable) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -296,10 +321,39 @@ int32_t stmtCleanSQLInfo(STscStmt* pStmt) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t stmtRebuildDataBlock(STscStmt* pStmt, STableDataBlocks *pDataBlock, STableDataBlocks **newBlock, uint64_t uid) {
|
||||
SEpSet ep = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp);
|
||||
SVgroupInfo vgInfo = {0};
|
||||
|
||||
STMT_ERR_RET(catalogGetTableHashVgroup(pStmt->pCatalog, pStmt->taos->pAppInfo->pTransporter, &ep, &pStmt->bInfo.sname, &vgInfo));
|
||||
STMT_ERR_RET(taosHashPut(pStmt->exec.pVgHash, (const char*)&vgInfo.vgId, sizeof(vgInfo.vgId), (char*)&vgInfo, sizeof(vgInfo)));
|
||||
|
||||
STMT_ERR_RET(qRebuildStmtDataBlock(newBlock, pDataBlock, uid, vgInfo.vgId));
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t stmtGetFromCache(STscStmt* pStmt) {
|
||||
pStmt->bInfo.needParse = true;
|
||||
pStmt->bInfo.inExecCache = false;
|
||||
|
||||
STableDataBlocks *pBlockInExec = taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
|
||||
if (pBlockInExec) {
|
||||
pStmt->bInfo.needParse = false;
|
||||
pStmt->bInfo.inExecCache = true;
|
||||
|
||||
if (pStmt->sql.autoCreateTbl) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
}
|
||||
|
||||
if (NULL == pStmt->sql.pTableCache || taosHashGetSize(pStmt->sql.pTableCache) <= 0) {
|
||||
if (pStmt->bInfo.inExecCache) {
|
||||
ASSERT(taosHashGetSize(pStmt->exec.pBlockHash) == 1);
|
||||
pStmt->bInfo.needParse = false;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -307,10 +361,31 @@ int32_t stmtGetFromCache(STscStmt* pStmt) {
|
|||
STMT_ERR_RET(catalogGetHandle(pStmt->taos->pAppInfo->clusterId, &pStmt->pCatalog));
|
||||
}
|
||||
|
||||
STableMeta* pTableMeta = NULL;
|
||||
SEpSet ep = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp);
|
||||
int32_t code =
|
||||
catalogGetTableMeta(pStmt->pCatalog, pStmt->taos->pAppInfo->pTransporter, &ep, &pStmt->bInfo.sname, &pTableMeta);
|
||||
if (pStmt->sql.autoCreateTbl) {
|
||||
SStmtTableCache* pCache = taosHashGet(pStmt->sql.pTableCache, &pStmt->bInfo.tbSuid, sizeof(pStmt->bInfo.tbSuid));
|
||||
if (pCache) {
|
||||
pStmt->bInfo.needParse = false;
|
||||
pStmt->exec.autoCreateTbl = true;
|
||||
|
||||
pStmt->bInfo.tbUid = 0;
|
||||
|
||||
STableDataBlocks* pNewBlock = NULL;
|
||||
STMT_ERR_RET(stmtRebuildDataBlock(pStmt, pCache->pDataBlock, &pNewBlock, 0));
|
||||
|
||||
if (taosHashPut(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName), &pNewBlock, POINTER_BYTES)) {
|
||||
STMT_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
STMT_RET(stmtCleanBindInfo(pStmt));
|
||||
}
|
||||
|
||||
|
||||
STableMeta *pTableMeta = NULL;
|
||||
SEpSet ep = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp);
|
||||
int32_t code = catalogGetTableMeta(pStmt->pCatalog, pStmt->taos->pAppInfo->pTransporter, &ep, &pStmt->bInfo.sname, &pTableMeta);
|
||||
if (TSDB_CODE_PAR_TABLE_NOT_EXIST == code) {
|
||||
STMT_ERR_RET(stmtCleanBindInfo(pStmt));
|
||||
|
||||
|
@ -323,18 +398,19 @@ int32_t stmtGetFromCache(STscStmt* pStmt) {
|
|||
uint64_t suid = pTableMeta->suid;
|
||||
int8_t tableType = pTableMeta->tableType;
|
||||
taosMemoryFree(pTableMeta);
|
||||
|
||||
uint64_t cacheUid = (TSDB_CHILD_TABLE == tableType) ? suid : uid;
|
||||
|
||||
if (uid == pStmt->bInfo.tbUid) {
|
||||
pStmt->bInfo.needParse = false;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (taosHashGet(pStmt->exec.pBlockHash, &uid, sizeof(uid))) {
|
||||
SStmtTableCache* pCache = taosHashGet(pStmt->sql.pTableCache, &uid, sizeof(uid));
|
||||
if (pStmt->bInfo.inExecCache) {
|
||||
SStmtTableCache* pCache = taosHashGet(pStmt->sql.pTableCache, &cacheUid, sizeof(cacheUid));
|
||||
if (NULL == pCache) {
|
||||
tscError("table uid %" PRIx64 "found in exec blockHash, but not in sql blockHash", uid);
|
||||
|
||||
tscError("table [%s, %" PRIx64 ", %" PRIx64 "] found in exec blockHash, but not in sql blockHash", pStmt->bInfo.tbFName, uid, cacheUid);
|
||||
|
||||
STMT_ERR_RET(TSDB_CODE_TSC_APP_ERROR);
|
||||
}
|
||||
|
||||
|
@ -349,7 +425,7 @@ int32_t stmtGetFromCache(STscStmt* pStmt) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SStmtTableCache* pCache = taosHashGet(pStmt->sql.pTableCache, &uid, sizeof(uid));
|
||||
SStmtTableCache* pCache = taosHashGet(pStmt->sql.pTableCache, &cacheUid, sizeof(cacheUid));
|
||||
if (pCache) {
|
||||
pStmt->bInfo.needParse = false;
|
||||
|
||||
|
@ -360,10 +436,9 @@ int32_t stmtGetFromCache(STscStmt* pStmt) {
|
|||
pStmt->bInfo.tagsCached = true;
|
||||
|
||||
STableDataBlocks* pNewBlock = NULL;
|
||||
STMT_ERR_RET(qRebuildStmtDataBlock(&pNewBlock, pCache->pDataBlock));
|
||||
STMT_ERR_RET(stmtRebuildDataBlock(pStmt, pCache->pDataBlock, &pNewBlock, uid));
|
||||
|
||||
if (taosHashPut(pStmt->exec.pBlockHash, &pStmt->bInfo.tbUid, sizeof(pStmt->bInfo.tbUid), &pNewBlock,
|
||||
POINTER_BYTES)) {
|
||||
if (taosHashPut(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName), &pNewBlock, POINTER_BYTES)) {
|
||||
STMT_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
|
@ -447,15 +522,15 @@ int stmtSetTbName(TAOS_STMT* stmt, const char* tbName) {
|
|||
if (NULL == pStmt->exec.pRequest) {
|
||||
STMT_ERR_RET(buildRequest(pStmt->taos, pStmt->sql.sqlStr, pStmt->sql.sqlLen, &pStmt->exec.pRequest));
|
||||
}
|
||||
|
||||
STMT_ERR_RET(qCreateSName(&pStmt->bInfo.sname, tbName, pStmt->taos->acctId, pStmt->exec.pRequest->pDb,
|
||||
pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen));
|
||||
|
||||
|
||||
STMT_ERR_RET(qCreateSName(&pStmt->bInfo.sname, tbName, pStmt->taos->acctId, pStmt->exec.pRequest->pDb, pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen));
|
||||
tNameExtractFullName(&pStmt->bInfo.sname, pStmt->bInfo.tbFName);
|
||||
|
||||
STMT_ERR_RET(stmtGetFromCache(pStmt));
|
||||
|
||||
if (pStmt->bInfo.needParse) {
|
||||
taosMemoryFree(pStmt->bInfo.tbName);
|
||||
pStmt->bInfo.tbName = strdup(tbName);
|
||||
strncpy(pStmt->bInfo.tbName, tbName, sizeof(pStmt->bInfo.tbName) - 1);
|
||||
pStmt->bInfo.tbName[sizeof(pStmt->bInfo.tbName) - 1] = 0;
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -466,16 +541,17 @@ int stmtSetTbTags(TAOS_STMT* stmt, TAOS_MULTI_BIND* tags) {
|
|||
|
||||
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_SETTAGS));
|
||||
|
||||
if (!pStmt->bInfo.needParse) {
|
||||
if (pStmt->bInfo.needParse) {
|
||||
STMT_ERR_RET(stmtParseSql(pStmt));
|
||||
}
|
||||
|
||||
if (pStmt->bInfo.inExecCache) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
STMT_ERR_RET(stmtParseSql(pStmt));
|
||||
|
||||
STableDataBlocks** pDataBlock = (STableDataBlocks**)taosHashGet(
|
||||
pStmt->exec.pBlockHash, (const char*)&pStmt->bInfo.tbUid, sizeof(pStmt->bInfo.tbUid));
|
||||
STableDataBlocks **pDataBlock = (STableDataBlocks**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
|
||||
if (NULL == pDataBlock) {
|
||||
tscError("table uid %" PRIx64 "not found in exec blockHash", pStmt->bInfo.tbUid);
|
||||
tscError("table %s not found in exec blockHash", pStmt->bInfo.tbFName);
|
||||
STMT_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
|
||||
}
|
||||
|
||||
|
@ -491,10 +567,9 @@ int32_t stmtFetchTagFields(STscStmt* pStmt, int32_t* fieldNum, TAOS_FIELD** fiel
|
|||
STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
|
||||
}
|
||||
|
||||
STableDataBlocks** pDataBlock = (STableDataBlocks**)taosHashGet(
|
||||
pStmt->exec.pBlockHash, (const char*)&pStmt->bInfo.tbUid, sizeof(pStmt->bInfo.tbUid));
|
||||
STableDataBlocks **pDataBlock = (STableDataBlocks**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
|
||||
if (NULL == pDataBlock) {
|
||||
tscError("table uid %" PRIx64 "not found in exec blockHash", pStmt->bInfo.tbUid);
|
||||
tscError("table %s not found in exec blockHash", pStmt->bInfo.tbFName);
|
||||
STMT_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
|
||||
}
|
||||
|
||||
|
@ -509,10 +584,9 @@ int32_t stmtFetchColFields(STscStmt* pStmt, int32_t* fieldNum, TAOS_FIELD** fiel
|
|||
STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
|
||||
}
|
||||
|
||||
STableDataBlocks** pDataBlock = (STableDataBlocks**)taosHashGet(
|
||||
pStmt->exec.pBlockHash, (const char*)&pStmt->bInfo.tbUid, sizeof(pStmt->bInfo.tbUid));
|
||||
STableDataBlocks **pDataBlock = (STableDataBlocks**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
|
||||
if (NULL == pDataBlock) {
|
||||
tscError("table uid %" PRIx64 "not found in exec blockHash", pStmt->bInfo.tbUid);
|
||||
tscError("table %s not found in exec blockHash", pStmt->bInfo.tbFName);
|
||||
STMT_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
|
||||
}
|
||||
|
||||
|
@ -557,11 +631,10 @@ int stmtBindBatch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind, int32_t colIdx) {
|
|||
bool emptyResult = false;
|
||||
STMT_RET(qStmtBindParam(pStmt->sql.pQueryPlan, bind, colIdx, pStmt->exec.pRequest->requestId, &emptyResult));
|
||||
}
|
||||
|
||||
STableDataBlocks** pDataBlock = (STableDataBlocks**)taosHashGet(
|
||||
pStmt->exec.pBlockHash, (const char*)&pStmt->bInfo.tbUid, sizeof(pStmt->bInfo.tbUid));
|
||||
|
||||
STableDataBlocks **pDataBlock = (STableDataBlocks**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
|
||||
if (NULL == pDataBlock) {
|
||||
tscError("table uid %" PRIx64 "not found in exec blockHash", pStmt->bInfo.tbUid);
|
||||
tscError("table %s not found in exec blockHash", pStmt->bInfo.tbFName);
|
||||
STMT_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
|
||||
}
|
||||
|
||||
|
@ -600,17 +673,73 @@ int stmtAddBatch(TAOS_STMT* stmt) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int stmtExec(TAOS_STMT* stmt) {
|
||||
int stmtUpdateTableUid(STscStmt* pStmt, SSubmitRsp *pRsp) {
|
||||
if (pRsp->nBlocks <= 0) {
|
||||
tscError("invalid submit resp block number %d", pRsp->nBlocks);
|
||||
STMT_ERR_RET(TSDB_CODE_TSC_APP_ERROR);
|
||||
}
|
||||
|
||||
size_t keyLen = 0;
|
||||
STableDataBlocks **pIter = taosHashIterate(pStmt->exec.pBlockHash, NULL);
|
||||
while (pIter) {
|
||||
STableDataBlocks *pBlock = *pIter;
|
||||
char *key = taosHashGetKey(pIter, &keyLen);
|
||||
|
||||
STableMeta *pMeta = qGetTableMetaInDataBlock(pBlock);
|
||||
if (pMeta->uid != pStmt->bInfo.tbUid) {
|
||||
tscError("table uid %" PRIx64 " mis-match with current table uid %" PRIx64, pMeta->uid, pStmt->bInfo.tbUid);
|
||||
STMT_ERR_RET(TSDB_CODE_TSC_APP_ERROR);
|
||||
}
|
||||
|
||||
if (pMeta->uid) {
|
||||
pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
|
||||
continue;
|
||||
}
|
||||
|
||||
SSubmitBlkRsp *blkRsp = NULL;
|
||||
int32_t i = 0;
|
||||
for (; i < pRsp->nBlocks; ++i) {
|
||||
blkRsp = pRsp->pBlocks + i;
|
||||
if (strlen(blkRsp->tblFName) != keyLen) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (strncmp(blkRsp->tblFName, key, keyLen)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
if (i < pRsp->nBlocks) {
|
||||
tscDebug("auto created table %s uid updated from %" PRIx64 " to %" PRIx64, blkRsp->tblFName, pMeta->uid, blkRsp->uid);
|
||||
|
||||
pMeta->uid = blkRsp->uid;
|
||||
pStmt->bInfo.tbUid = blkRsp->uid;
|
||||
} else {
|
||||
tscError("table %s not found in submit rsp", pStmt->bInfo.tbFName);
|
||||
STMT_ERR_RET(TSDB_CODE_TSC_APP_ERROR);
|
||||
}
|
||||
|
||||
pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int stmtExec(TAOS_STMT *stmt) {
|
||||
STscStmt* pStmt = (STscStmt*)stmt;
|
||||
int32_t code = 0;
|
||||
int32_t code = 0;
|
||||
SSubmitRsp *pRsp = NULL;
|
||||
bool autoCreateTbl = pStmt->exec.autoCreateTbl;
|
||||
|
||||
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_EXECUTE));
|
||||
|
||||
if (STMT_TYPE_QUERY == pStmt->sql.type) {
|
||||
scheduleQuery(pStmt->exec.pRequest, pStmt->sql.pQueryPlan, pStmt->sql.nodeList);
|
||||
scheduleQuery(pStmt->exec.pRequest, pStmt->sql.pQueryPlan, pStmt->sql.nodeList, NULL);
|
||||
} else {
|
||||
STMT_ERR_RET(qBuildStmtOutput(pStmt->sql.pQuery, pStmt->exec.pVgHash, pStmt->exec.pBlockHash));
|
||||
launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, TSDB_CODE_SUCCESS, true);
|
||||
launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, TSDB_CODE_SUCCESS, true, (autoCreateTbl ? (void**)&pRsp : NULL));
|
||||
}
|
||||
|
||||
if (pStmt->exec.pRequest->code && NEED_CLIENT_HANDLE_ERROR(pStmt->exec.pRequest->code)) {
|
||||
|
@ -632,6 +761,15 @@ _return:
|
|||
|
||||
stmtCleanExecInfo(pStmt, (code ? false : true), false);
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code && autoCreateTbl) {
|
||||
if (NULL == pRsp) {
|
||||
tscError("no submit resp got for auto create table");
|
||||
STMT_ERR_RET(TSDB_CODE_TSC_APP_ERROR);
|
||||
}
|
||||
|
||||
STMT_ERR_RET(stmtUpdateTableUid(pStmt, pRsp));
|
||||
}
|
||||
|
||||
++pStmt->sql.runTimes;
|
||||
|
||||
STMT_RET(code);
|
||||
|
|
|
@ -2764,6 +2764,7 @@ int32_t tSerializeSConnectRsp(void *buf, int32_t bufLen, SConnectRsp *pRsp) {
|
|||
if (tEncodeI32(&encoder, pRsp->acctId) < 0) return -1;
|
||||
if (tEncodeI64(&encoder, pRsp->clusterId) < 0) return -1;
|
||||
if (tEncodeU32(&encoder, pRsp->connId) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pRsp->dnodeNum) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, pRsp->superUser) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, pRsp->connType) < 0) return -1;
|
||||
if (tEncodeSEpSet(&encoder, &pRsp->epSet) < 0) return -1;
|
||||
|
@ -2783,6 +2784,7 @@ int32_t tDeserializeSConnectRsp(void *buf, int32_t bufLen, SConnectRsp *pRsp) {
|
|||
if (tDecodeI32(&decoder, &pRsp->acctId) < 0) return -1;
|
||||
if (tDecodeI64(&decoder, &pRsp->clusterId) < 0) return -1;
|
||||
if (tDecodeU32(&decoder, &pRsp->connId) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pRsp->dnodeNum) < 0) return -1;
|
||||
if (tDecodeI8(&decoder, &pRsp->superUser) < 0) return -1;
|
||||
if (tDecodeI8(&decoder, &pRsp->connType) < 0) return -1;
|
||||
if (tDecodeSEpSet(&decoder, &pRsp->epSet) < 0) return -1;
|
||||
|
@ -4026,3 +4028,84 @@ int32_t tDecodeSVSubmitReq(SDecoder *pCoder, SVSubmitReq *pReq) {
|
|||
tEndDecode(pCoder);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t tEncodeSSubmitBlkRsp(SEncoder *pEncoder, const SSubmitBlkRsp *pBlock) {
|
||||
if (tStartEncode(pEncoder) < 0) return -1;
|
||||
|
||||
if (tEncodeI8(pEncoder, pBlock->hashMeta) < 0) return -1;
|
||||
if (pBlock->hashMeta) {
|
||||
if (tEncodeI64(pEncoder, pBlock->uid) < 0) return -1;
|
||||
if (tEncodeCStr(pEncoder, pBlock->tblFName) < 0) return -1;
|
||||
}
|
||||
if (tEncodeI32v(pEncoder, pBlock->numOfRows) < 0) return -1;
|
||||
if (tEncodeI32v(pEncoder, pBlock->affectedRows) < 0) return -1;
|
||||
|
||||
tEndEncode(pEncoder);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t tDecodeSSubmitBlkRsp(SDecoder *pDecoder, SSubmitBlkRsp *pBlock) {
|
||||
if (tStartDecode(pDecoder) < 0) return -1;
|
||||
|
||||
if (tDecodeI8(pDecoder, &pBlock->hashMeta) < 0) return -1;
|
||||
if (pBlock->hashMeta) {
|
||||
if (tDecodeI64(pDecoder, &pBlock->uid) < 0) return -1;
|
||||
pBlock->tblFName= taosMemoryCalloc(TSDB_TABLE_FNAME_LEN, 1);
|
||||
if (NULL == pBlock->tblFName) return -1;
|
||||
if (tDecodeCStrTo(pDecoder, pBlock->tblFName) < 0) return -1;
|
||||
}
|
||||
if (tDecodeI32v(pDecoder, &pBlock->numOfRows) < 0) return -1;
|
||||
if (tDecodeI32v(pDecoder, &pBlock->affectedRows) < 0) return -1;
|
||||
|
||||
tEndDecode(pDecoder);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tEncodeSSubmitRsp(SEncoder *pEncoder, const SSubmitRsp *pRsp) {
|
||||
int32_t nBlocks = taosArrayGetSize(pRsp->pArray);
|
||||
|
||||
if (tStartEncode(pEncoder) < 0) return -1;
|
||||
|
||||
if (tEncodeI32v(pEncoder, pRsp->numOfRows) < 0) return -1;
|
||||
if (tEncodeI32v(pEncoder, pRsp->affectedRows) < 0) return -1;
|
||||
if (tEncodeI32v(pEncoder, nBlocks) < 0) return -1;
|
||||
for (int32_t iBlock = 0; iBlock < nBlocks; iBlock++) {
|
||||
if (tEncodeSSubmitBlkRsp(pEncoder, (SSubmitBlkRsp *)taosArrayGet(pRsp->pArray, iBlock)) < 0) return -1;
|
||||
}
|
||||
|
||||
tEndEncode(pEncoder);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tDecodeSSubmitRsp(SDecoder *pDecoder, SSubmitRsp *pRsp) {
|
||||
if (tStartDecode(pDecoder) < 0) return -1;
|
||||
|
||||
if (tDecodeI32v(pDecoder, &pRsp->numOfRows) < 0) return -1;
|
||||
if (tDecodeI32v(pDecoder, &pRsp->affectedRows) < 0) return -1;
|
||||
if (tDecodeI32v(pDecoder, &pRsp->nBlocks) < 0) return -1;
|
||||
pRsp->pBlocks = taosMemoryCalloc(pRsp->nBlocks, sizeof(*pRsp->pBlocks));
|
||||
if (pRsp->pBlocks == NULL) return -1;
|
||||
for (int32_t iBlock = 0; iBlock < pRsp->nBlocks; iBlock++) {
|
||||
if (tDecodeSSubmitBlkRsp(pDecoder, pRsp->pBlocks + iBlock) < 0) return -1;
|
||||
}
|
||||
|
||||
tEndDecode(pDecoder);
|
||||
tDecoderClear(pDecoder);
|
||||
return 0;
|
||||
}
|
||||
|
||||
void tFreeSSubmitRsp(SSubmitRsp *pRsp) {
|
||||
if (NULL == pRsp) return;
|
||||
|
||||
if (pRsp->pBlocks) {
|
||||
for (int32_t i = 0; i < pRsp->nBlocks; ++i) {
|
||||
SSubmitBlkRsp *sRsp = pRsp->pBlocks + i;
|
||||
taosMemoryFree(sRsp->tblFName);
|
||||
}
|
||||
|
||||
taosMemoryFree(pRsp->pBlocks);
|
||||
}
|
||||
|
||||
taosMemoryFree(pRsp);
|
||||
}
|
||||
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
#include "mndShow.h"
|
||||
#include "mndStb.h"
|
||||
#include "mndUser.h"
|
||||
#include "mndDnode.h"
|
||||
#include "tglobal.h"
|
||||
#include "version.h"
|
||||
|
||||
|
@ -227,6 +228,7 @@ static int32_t mndProcessConnectReq(SNodeMsg *pReq) {
|
|||
connectRsp.clusterId = pMnode->clusterId;
|
||||
connectRsp.connId = pConn->id;
|
||||
connectRsp.connType = connReq.connType;
|
||||
connectRsp.dnodeNum = mndGetDnodeSize(pMnode);
|
||||
|
||||
snprintf(connectRsp.sVersion, sizeof(connectRsp.sVersion), "ver:%s\nbuild:%s\ngitinfo:%s", version, buildinfo,
|
||||
gitinfo);
|
||||
|
|
|
@ -102,7 +102,7 @@ int32_t tsdbUpdateSmaWindow(STsdb* pTsdb, SSubmitReq* pMsg, int64_t version
|
|||
int32_t tsdbCreateTSma(STsdb* pTsdb, char* pMsg);
|
||||
int32_t tsdbInsertTSmaData(STsdb* pTsdb, int64_t indexUid, const char* msg);
|
||||
int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSubmitRsp* pRsp);
|
||||
int tsdbInsertTableData(STsdb* pTsdb, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock, int32_t* pAffectedRows);
|
||||
int tsdbInsertTableData(STsdb* pTsdb, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock, SSubmitBlkRsp* pRsp);
|
||||
tsdbReaderT* tsdbQueryTables(SVnode* pVnode, SQueryTableDataCond* pCond, STableGroupInfo* groupList, uint64_t qId,
|
||||
uint64_t taskId);
|
||||
tsdbReaderT tsdbQueryCacheLastT(STsdb* tsdb, SQueryTableDataCond* pCond, STableGroupInfo* groupList, uint64_t qId,
|
||||
|
|
|
@ -290,7 +290,7 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey
|
|||
return 0;
|
||||
}
|
||||
|
||||
int tsdbInsertTableData(STsdb *pTsdb, SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlock, int32_t *pAffectedRows) {
|
||||
int tsdbInsertTableData(STsdb *pTsdb, SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlock, SSubmitBlkRsp *pRsp) {
|
||||
SSubmitBlkIter blkIter = {0};
|
||||
STsdbMemTable *pMemTable = pTsdb->mem;
|
||||
void *tptr;
|
||||
|
@ -344,7 +344,8 @@ int tsdbInsertTableData(STsdb *pTsdb, SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlo
|
|||
if (pMemTable->keyMin > keyMin) pMemTable->keyMin = keyMin;
|
||||
if (pMemTable->keyMax < keyMax) pMemTable->keyMax = keyMax;
|
||||
|
||||
(*pAffectedRows) = pMsgIter->numOfRows;
|
||||
pRsp->numOfRows = pMsgIter->numOfRows;
|
||||
pRsp->affectedRows = pMsgIter->numOfRows;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -36,9 +36,10 @@ int tsdbInsertData(STsdb *pTsdb, int64_t version, SSubmitReq *pMsg, SSubmitRsp *
|
|||
// loop to insert
|
||||
tInitSubmitMsgIter(pMsg, &msgIter);
|
||||
while (true) {
|
||||
SSubmitBlkRsp r = {0};
|
||||
tGetSubmitMsgNext(&msgIter, &pBlock);
|
||||
if (pBlock == NULL) break;
|
||||
if (tsdbInsertTableData(pTsdb, &msgIter, pBlock, &affectedrows) < 0) {
|
||||
if (tsdbInsertTableData(pTsdb, &msgIter, pBlock, &r) < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -46,8 +47,8 @@ int tsdbInsertData(STsdb *pTsdb, int64_t version, SSubmitReq *pMsg, SSubmitRsp *
|
|||
}
|
||||
|
||||
if (pRsp != NULL) {
|
||||
pRsp->affectedRows = affectedrows;
|
||||
pRsp->numOfRows = numOfRows;
|
||||
// pRsp->affectedRows = affectedrows;
|
||||
// pRsp->numOfRows = numOfRows;
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
|
|
@ -119,7 +119,7 @@ _exit:
|
|||
taosMemoryFree(metaRsp.pSchemas);
|
||||
metaReaderClear(&mer2);
|
||||
metaReaderClear(&mer1);
|
||||
return code;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) {
|
||||
|
|
|
@ -502,55 +502,66 @@ _exit:
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int vnodeDebugPrintSingleSubmitMsg(SMeta *pMeta, SSubmitBlk *pBlock, SSubmitMsgIter *msgIter, const char *tags) {
|
||||
SSubmitBlkIter blkIter = {0};
|
||||
STSchema *pSchema = NULL;
|
||||
tb_uid_t suid = 0;
|
||||
STSRow *row = NULL;
|
||||
|
||||
tInitSubmitBlkIter(msgIter, pBlock, &blkIter);
|
||||
if (blkIter.row == NULL) return 0;
|
||||
if (!pSchema || (suid != msgIter->suid)) {
|
||||
if (pSchema) {
|
||||
taosMemoryFreeClear(pSchema);
|
||||
}
|
||||
pSchema = metaGetTbTSchema(pMeta, msgIter->suid, 0); // TODO: use the real schema
|
||||
if (pSchema) {
|
||||
suid = msgIter->suid;
|
||||
}
|
||||
}
|
||||
if (!pSchema) {
|
||||
printf("%s:%d no valid schema\n", tags, __LINE__);
|
||||
return -1;
|
||||
}
|
||||
char __tags[128] = {0};
|
||||
snprintf(__tags, 128, "%s: uid %" PRIi64 " ", tags, msgIter->uid);
|
||||
while ((row = tGetSubmitBlkNext(&blkIter))) {
|
||||
tdSRowPrint(row, pSchema, __tags);
|
||||
}
|
||||
|
||||
taosMemoryFreeClear(pSchema);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int vnodeDebugPrintSubmitMsg(SVnode *pVnode, SSubmitReq *pMsg, const char *tags) {
|
||||
ASSERT(pMsg != NULL);
|
||||
SSubmitMsgIter msgIter = {0};
|
||||
SMeta *pMeta = pVnode->pMeta;
|
||||
SSubmitBlk *pBlock = NULL;
|
||||
SSubmitBlkIter blkIter = {0};
|
||||
STSRow *row = NULL;
|
||||
STSchema *pSchema = NULL;
|
||||
tb_uid_t suid = 0;
|
||||
|
||||
if (tInitSubmitMsgIter(pMsg, &msgIter) < 0) return -1;
|
||||
while (true) {
|
||||
if (tGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1;
|
||||
if (pBlock == NULL) break;
|
||||
tInitSubmitBlkIter(&msgIter, pBlock, &blkIter);
|
||||
if (blkIter.row == NULL) continue;
|
||||
if (!pSchema || (suid != msgIter.suid)) {
|
||||
if (pSchema) {
|
||||
taosMemoryFreeClear(pSchema);
|
||||
}
|
||||
pSchema = metaGetTbTSchema(pMeta, msgIter.suid, 0); // TODO: use the real schema
|
||||
if (pSchema) {
|
||||
suid = msgIter.suid;
|
||||
}
|
||||
}
|
||||
if (!pSchema) {
|
||||
printf("%s:%d no valid schema\n", tags, __LINE__);
|
||||
continue;
|
||||
}
|
||||
char __tags[128] = {0};
|
||||
snprintf(__tags, 128, "%s: uid %" PRIi64 " ", tags, msgIter.uid);
|
||||
while ((row = tGetSubmitBlkNext(&blkIter))) {
|
||||
tdSRowPrint(row, pSchema, __tags);
|
||||
}
|
||||
|
||||
vnodeDebugPrintSingleSubmitMsg(pMeta, pBlock, &msgIter, tags);
|
||||
}
|
||||
|
||||
taosMemoryFreeClear(pSchema);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||
SSubmitReq *pSubmitReq = (SSubmitReq *)pReq;
|
||||
SSubmitRsp submitRsp = {0};
|
||||
SSubmitMsgIter msgIter = {0};
|
||||
SSubmitBlk *pBlock;
|
||||
SSubmitRsp rsp = {0};
|
||||
SVCreateTbReq createTbReq = {0};
|
||||
SDecoder decoder = {0};
|
||||
int32_t nRows;
|
||||
int32_t tsize, ret;
|
||||
SEncoder encoder = {0};
|
||||
|
||||
pRsp->code = 0;
|
||||
|
||||
|
@ -564,12 +575,17 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in
|
|||
goto _exit;
|
||||
}
|
||||
|
||||
for (;;) {
|
||||
submitRsp.pArray = taosArrayInit(pSubmitReq->numOfBlocks, sizeof(SSubmitBlkRsp));
|
||||
for (int i = 0;;) {
|
||||
tGetSubmitMsgNext(&msgIter, &pBlock);
|
||||
if (pBlock == NULL) break;
|
||||
|
||||
SSubmitBlkRsp submitBlkRsp = {0};
|
||||
|
||||
// create table for auto create table mode
|
||||
if (msgIter.schemaLen > 0) {
|
||||
submitBlkRsp.hashMeta = 1;
|
||||
|
||||
tDecoderInit(&decoder, pBlock->data, msgIter.schemaLen);
|
||||
if (tDecodeSVCreateTbReq(&decoder, &createTbReq) < 0) {
|
||||
pRsp->code = TSDB_CODE_INVALID_MSG;
|
||||
|
@ -585,6 +601,10 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in
|
|||
}
|
||||
}
|
||||
|
||||
submitBlkRsp.uid = createTbReq.uid;
|
||||
submitBlkRsp.tblFName = taosMemoryMalloc(strlen(pVnode->config.dbname) + strlen(createTbReq.name) + 2);
|
||||
sprintf(submitBlkRsp.tblFName, "%s.%s", pVnode->config.dbname, createTbReq.name);
|
||||
|
||||
msgIter.uid = createTbReq.uid;
|
||||
if (createTbReq.type == TSDB_CHILD_TABLE) {
|
||||
msgIter.suid = createTbReq.ctb.suid;
|
||||
|
@ -592,22 +612,33 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in
|
|||
msgIter.suid = 0;
|
||||
}
|
||||
|
||||
vnodeDebugPrintSingleSubmitMsg(pVnode->pMeta, pBlock, &msgIter, "real uid");
|
||||
tDecoderClear(&decoder);
|
||||
}
|
||||
|
||||
if (tsdbInsertTableData(pVnode->pTsdb, &msgIter, pBlock, &nRows) < 0) {
|
||||
if (tsdbInsertTableData(pVnode->pTsdb, &msgIter, pBlock, &submitBlkRsp) < 0) {
|
||||
pRsp->code = terrno;
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
rsp.affectedRows += nRows;
|
||||
submitRsp.numOfRows += submitBlkRsp.numOfRows;
|
||||
submitRsp.affectedRows += submitBlkRsp.affectedRows;
|
||||
taosArrayPush(submitRsp.pArray, &submitBlkRsp);
|
||||
}
|
||||
|
||||
_exit:
|
||||
// encode the response (TODO)
|
||||
pRsp->pCont = rpcMallocCont(sizeof(SSubmitRsp));
|
||||
memcpy(pRsp->pCont, &rsp, sizeof(rsp));
|
||||
pRsp->contLen = sizeof(SSubmitRsp);
|
||||
tEncodeSize(tEncodeSSubmitRsp, &submitRsp, tsize, ret);
|
||||
pRsp->pCont = rpcMallocCont(tsize);
|
||||
pRsp->contLen = tsize;
|
||||
tEncoderInit(&encoder, pRsp->pCont, tsize);
|
||||
tEncodeSSubmitRsp(&encoder, &submitRsp);
|
||||
tEncoderClear(&encoder);
|
||||
|
||||
for (int32_t i = 0; i < taosArrayGetSize(submitRsp.pArray); i++) {
|
||||
taosMemoryFree(((SSubmitBlkRsp *)taosArrayGet(submitRsp.pArray, i))[0].tblFName);
|
||||
}
|
||||
|
||||
taosArrayDestroy(submitRsp.pArray);
|
||||
|
||||
tsdbTriggerRSma(pVnode->pTsdb, pReq, STREAM_DATA_TYPE_SUBMIT_BLOCK);
|
||||
|
||||
|
|
|
@ -137,7 +137,7 @@ void destroyBlockArrayList(SArray* pDataBlockList);
|
|||
void destroyBlockHashmap(SHashObj* pDataBlockHash);
|
||||
int initRowBuilder(SRowBuilder *pBuilder, int16_t schemaVer, SParsedDataColInfo *pColInfo);
|
||||
int32_t allocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int32_t * numOfRows);
|
||||
int32_t getDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, int32_t startOffset, int32_t rowSize,
|
||||
int32_t getDataBlockFromList(SHashObj* pHashList, void* id, int32_t idLen, int32_t size, int32_t startOffset, int32_t rowSize,
|
||||
STableMeta* pTableMeta, STableDataBlocks** dataBlocks, SArray* pBlockList, SVCreateTbReq* pCreateTbReq);
|
||||
int32_t mergeTableDataBlocks(SHashObj* pHashObj, uint8_t payloadType, SArray** pVgDataBlocks);
|
||||
int32_t buildCreateTbMsg(STableDataBlocks* pBlocks, SVCreateTbReq* pCreateTbReq);
|
||||
|
|
|
@ -237,13 +237,8 @@ static int32_t createSName(SName* pName, SToken* pTableName, int32_t acctId, con
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t getTableMetaImpl(SInsertParseContext* pCxt, SToken* pTname, bool isStb) {
|
||||
static int32_t getTableMetaImpl(SInsertParseContext* pCxt, SName* name, char *dbFname, bool isStb) {
|
||||
SParseContext* pBasicCtx = pCxt->pComCxt;
|
||||
SName name = {0};
|
||||
createSName(&name, pTname, pBasicCtx->acctId, pBasicCtx->db, &pCxt->msg);
|
||||
|
||||
char dbFname[TSDB_DB_FNAME_LEN] = {0};
|
||||
tNameGetFullDbName(&name, dbFname);
|
||||
|
||||
bool pass = false;
|
||||
CHECK_CODE(catalogChkAuth(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, pBasicCtx->pUser,
|
||||
|
@ -252,22 +247,22 @@ static int32_t getTableMetaImpl(SInsertParseContext* pCxt, SToken* pTname, bool
|
|||
return TSDB_CODE_PAR_PERMISSION_DENIED;
|
||||
}
|
||||
if (isStb) {
|
||||
CHECK_CODE(catalogGetSTableMeta(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, &name,
|
||||
CHECK_CODE(catalogGetSTableMeta(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, name,
|
||||
&pCxt->pTableMeta));
|
||||
} else {
|
||||
CHECK_CODE(catalogGetTableMeta(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, &name,
|
||||
CHECK_CODE(catalogGetTableMeta(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, name,
|
||||
&pCxt->pTableMeta));
|
||||
SVgroupInfo vg;
|
||||
CHECK_CODE(
|
||||
catalogGetTableHashVgroup(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, &name, &vg));
|
||||
catalogGetTableHashVgroup(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, name, &vg));
|
||||
CHECK_CODE(taosHashPut(pCxt->pVgroupsHashObj, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg)));
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t getTableMeta(SInsertParseContext* pCxt, SToken* pTname) { return getTableMetaImpl(pCxt, pTname, false); }
|
||||
static int32_t getTableMeta(SInsertParseContext* pCxt, SName* name, char *dbFname) { return getTableMetaImpl(pCxt, name, dbFname, false); }
|
||||
|
||||
static int32_t getSTableMeta(SInsertParseContext* pCxt, SToken* pTname) { return getTableMetaImpl(pCxt, pTname, true); }
|
||||
static int32_t getSTableMeta(SInsertParseContext* pCxt, SName* name, char *dbFname) { return getTableMetaImpl(pCxt, name, dbFname, true); }
|
||||
|
||||
static int32_t findCol(SToken* pColname, int32_t start, int32_t end, SSchema* pSchema) {
|
||||
while (start < end) {
|
||||
|
@ -841,8 +836,9 @@ static int32_t storeTableMeta(SInsertParseContext* pCxt, SHashObj* pHash, SName*
|
|||
catalogGetTableHashVgroup(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, pTableName, &vg));
|
||||
CHECK_CODE(taosHashPut(pCxt->pVgroupsHashObj, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg)));
|
||||
|
||||
pMeta->uid = tGenIdPI64();
|
||||
pMeta->uid = 0;
|
||||
pMeta->vgId = vg.vgId;
|
||||
pMeta->tableType = TSDB_CHILD_TABLE;
|
||||
|
||||
STableMeta* pBackup = NULL;
|
||||
if (TSDB_CODE_SUCCESS != cloneTableMeta(pMeta, &pBackup)) {
|
||||
|
@ -852,11 +848,7 @@ static int32_t storeTableMeta(SInsertParseContext* pCxt, SHashObj* pHash, SName*
|
|||
}
|
||||
|
||||
// pSql -> stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)
|
||||
static int32_t parseUsingClause(SInsertParseContext* pCxt, SToken* pTbnameToken) {
|
||||
SName name;
|
||||
createSName(&name, pTbnameToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg);
|
||||
char tbFName[TSDB_TABLE_FNAME_LEN];
|
||||
tNameExtractFullName(&name, tbFName);
|
||||
static int32_t parseUsingClause(SInsertParseContext* pCxt, SName* name, char* tbFName) {
|
||||
int32_t len = strlen(tbFName);
|
||||
STableMeta** pMeta = taosHashGet(pCxt->pSubTableHashObj, tbFName, len);
|
||||
if (NULL != pMeta) {
|
||||
|
@ -866,11 +858,17 @@ static int32_t parseUsingClause(SInsertParseContext* pCxt, SToken* pTbnameToken)
|
|||
SToken sToken;
|
||||
// pSql -> stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)
|
||||
NEXT_TOKEN(pCxt->pSql, sToken);
|
||||
CHECK_CODE(getSTableMeta(pCxt, &sToken));
|
||||
|
||||
SName sname;
|
||||
createSName(&sname, &sToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg);
|
||||
char stbFName[TSDB_TABLE_FNAME_LEN];
|
||||
tNameExtractFullName(&sname, stbFName);
|
||||
|
||||
CHECK_CODE(getSTableMeta(pCxt, &sname, stbFName));
|
||||
if (TSDB_SUPER_TABLE != pCxt->pTableMeta->tableType) {
|
||||
return buildInvalidOperationMsg(&pCxt->msg, "create table only from super table is allowed");
|
||||
}
|
||||
CHECK_CODE(storeTableMeta(pCxt, pCxt->pSubTableHashObj, &name, tbFName, len, pCxt->pTableMeta));
|
||||
CHECK_CODE(storeTableMeta(pCxt, pCxt->pSubTableHashObj, name, tbFName, len, pCxt->pTableMeta));
|
||||
|
||||
SSchema* pTagsSchema = getTableTagSchema(pCxt->pTableMeta);
|
||||
setBoundColumnInfo(&pCxt->tags, pTagsSchema, getNumOfTags(pCxt->pTableMeta));
|
||||
|
@ -890,7 +888,7 @@ static int32_t parseUsingClause(SInsertParseContext* pCxt, SToken* pTbnameToken)
|
|||
if (TK_NK_LP != sToken.type) {
|
||||
return buildSyntaxErrMsg(&pCxt->msg, "( is expected", sToken.z);
|
||||
}
|
||||
CHECK_CODE(parseTagsClause(pCxt, pCxt->pTableMeta->schema, getTableInfo(pCxt->pTableMeta).precision, name.tname));
|
||||
CHECK_CODE(parseTagsClause(pCxt, pCxt->pTableMeta->schema, getTableInfo(pCxt->pTableMeta).precision, name->tname));
|
||||
NEXT_TOKEN(pCxt->pSql, sToken);
|
||||
if (TK_NK_RP != sToken.type) {
|
||||
return buildSyntaxErrMsg(&pCxt->msg, ") is expected", sToken.z);
|
||||
|
@ -1065,6 +1063,8 @@ static void destroyInsertParseContext(SInsertParseContext* pCxt) {
|
|||
// [...];
|
||||
static int32_t parseInsertBody(SInsertParseContext* pCxt) {
|
||||
int32_t tbNum = 0;
|
||||
char tbFName[TSDB_TABLE_FNAME_LEN];
|
||||
bool autoCreateTbl = false;
|
||||
|
||||
// for each table
|
||||
while (1) {
|
||||
|
@ -1102,16 +1102,21 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
|
|||
SToken tbnameToken = sToken;
|
||||
NEXT_TOKEN(pCxt->pSql, sToken);
|
||||
|
||||
SName name;
|
||||
createSName(&name, &tbnameToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg);
|
||||
tNameExtractFullName(&name, tbFName);
|
||||
|
||||
// USING cluase
|
||||
if (TK_USING == sToken.type) {
|
||||
CHECK_CODE(parseUsingClause(pCxt, &tbnameToken));
|
||||
CHECK_CODE(parseUsingClause(pCxt, &name, tbFName));
|
||||
NEXT_TOKEN(pCxt->pSql, sToken);
|
||||
autoCreateTbl = true;
|
||||
} else {
|
||||
CHECK_CODE(getTableMeta(pCxt, &tbnameToken));
|
||||
CHECK_CODE(getTableMeta(pCxt, &name, tbFName));
|
||||
}
|
||||
|
||||
STableDataBlocks* dataBuf = NULL;
|
||||
CHECK_CODE(getDataBlockFromList(pCxt->pTableBlockHashObj, pCxt->pTableMeta->uid, TSDB_DEFAULT_PAYLOAD_SIZE,
|
||||
CHECK_CODE(getDataBlockFromList(pCxt->pTableBlockHashObj, tbFName, strlen(tbFName), TSDB_DEFAULT_PAYLOAD_SIZE,
|
||||
sizeof(SSubmitBlk), getTableInfo(pCxt->pTableMeta).rowSize, pCxt->pTableMeta,
|
||||
&dataBuf, NULL, &pCxt->createTblReq));
|
||||
|
||||
|
@ -1153,10 +1158,9 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
|
|||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
}
|
||||
memcpy(tags, &pCxt->tags, sizeof(pCxt->tags));
|
||||
(*pCxt->pStmtCb->setBindInfoFn)(pCxt->pStmtCb->pStmt, pCxt->pTableMeta, tags);
|
||||
memset(&pCxt->tags, 0, sizeof(pCxt->tags));
|
||||
(*pCxt->pStmtCb->setInfoFn)(pCxt->pStmtCb->pStmt, pCxt->pTableMeta, tags, tbFName, autoCreateTbl, pCxt->pVgroupsHashObj, pCxt->pTableBlockHashObj);
|
||||
|
||||
(*pCxt->pStmtCb->setExecInfoFn)(pCxt->pStmtCb->pStmt, pCxt->pVgroupsHashObj, pCxt->pTableBlockHashObj);
|
||||
memset(&pCxt->tags, 0, sizeof(pCxt->tags));
|
||||
pCxt->pVgroupsHashObj = NULL;
|
||||
pCxt->pTableBlockHashObj = NULL;
|
||||
pCxt->pTableMeta = NULL;
|
||||
|
@ -1193,7 +1197,7 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) {
|
|||
&context.pTableBlockHashObj);
|
||||
} else {
|
||||
context.pVgroupsHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false);
|
||||
context.pTableBlockHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
|
||||
context.pTableBlockHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
|
||||
}
|
||||
|
||||
if (NULL == context.pVgroupsHashObj || NULL == context.pTableBlockHashObj || NULL == context.pSubTableHashObj ||
|
||||
|
@ -1389,7 +1393,6 @@ int32_t qBindStmtColsValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, in
|
|||
tdSRowPrint(row, pSTSchema, __func__);
|
||||
taosMemoryFree(pSTSchema);
|
||||
#endif
|
||||
|
||||
pDataBlock->size += extendedRowSize;
|
||||
}
|
||||
|
||||
|
@ -1476,6 +1479,7 @@ int32_t qBindStmtSingleColValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBu
|
|||
taosMemoryFree(pSTSchema);
|
||||
}
|
||||
#endif
|
||||
|
||||
}
|
||||
|
||||
if (rowEnd) {
|
||||
|
@ -1673,9 +1677,10 @@ int32_t smlBindData(void* handle, SArray* tags, SArray* colsFormat, SArray* cols
|
|||
buildCreateTbReq(&smlHandle->createTblReq, tableName, row, pTableMeta->suid);
|
||||
|
||||
STableDataBlocks* pDataBlock = NULL;
|
||||
ret = getDataBlockFromList(smlHandle->pBlockHash, pTableMeta->uid, TSDB_DEFAULT_PAYLOAD_SIZE, sizeof(SSubmitBlk),
|
||||
getTableInfo(pTableMeta).rowSize, pTableMeta, &pDataBlock, NULL, &smlHandle->createTblReq);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
ret = getDataBlockFromList(smlHandle->pBlockHash, &pTableMeta->uid, sizeof(pTableMeta->uid), TSDB_DEFAULT_PAYLOAD_SIZE,
|
||||
sizeof(SSubmitBlk), getTableInfo(pTableMeta).rowSize, pTableMeta,
|
||||
&pDataBlock, NULL, &smlHandle->createTblReq);
|
||||
if(ret != TSDB_CODE_SUCCESS){
|
||||
buildInvalidOperationMsg(&pBuf, "create data block error");
|
||||
return ret;
|
||||
}
|
||||
|
|
|
@ -185,11 +185,11 @@ int32_t buildCreateTbMsg(STableDataBlocks* pBlocks, SVCreateTbReq* pCreateTbReq)
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t getDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, int32_t startOffset, int32_t rowSize,
|
||||
int32_t getDataBlockFromList(SHashObj* pHashList, void* id, int32_t idLen, int32_t size, int32_t startOffset, int32_t rowSize,
|
||||
STableMeta* pTableMeta, STableDataBlocks** dataBlocks, SArray* pBlockList,
|
||||
SVCreateTbReq* pCreateTbReq) {
|
||||
*dataBlocks = NULL;
|
||||
STableDataBlocks** t1 = (STableDataBlocks**)taosHashGet(pHashList, (const char*)&id, sizeof(id));
|
||||
STableDataBlocks** t1 = (STableDataBlocks**)taosHashGet(pHashList, (const char*)id, idLen);
|
||||
if (t1 != NULL) {
|
||||
*dataBlocks = *t1;
|
||||
}
|
||||
|
@ -207,7 +207,7 @@ int32_t getDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, int3
|
|||
}
|
||||
}
|
||||
|
||||
taosHashPut(pHashList, (const char*)&id, sizeof(int64_t), (char*)dataBlocks, POINTER_BYTES);
|
||||
taosHashPut(pHashList, (const char*)id, idLen, (char*)dataBlocks, POINTER_BYTES);
|
||||
if (pBlockList) {
|
||||
taosArrayPush(pBlockList, dataBlocks);
|
||||
}
|
||||
|
@ -445,7 +445,7 @@ int32_t mergeTableDataBlocks(SHashObj* pHashObj, uint8_t payloadType, SArray** p
|
|||
const int INSERT_HEAD_SIZE = sizeof(SSubmitReq);
|
||||
int code = 0;
|
||||
bool isRawPayload = IS_RAW_PAYLOAD(payloadType);
|
||||
SHashObj* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
|
||||
SHashObj* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false);
|
||||
SArray* pVnodeDataBlockList = taosArrayInit(8, POINTER_BYTES);
|
||||
|
||||
STableDataBlocks** p = taosHashIterate(pHashObj, NULL);
|
||||
|
@ -457,7 +457,7 @@ int32_t mergeTableDataBlocks(SHashObj* pHashObj, uint8_t payloadType, SArray** p
|
|||
STableDataBlocks* dataBuf = NULL;
|
||||
pOneTableBlock->pTableMeta->vgId = pOneTableBlock->vgId; // for schemaless, restore origin vgId
|
||||
int32_t ret =
|
||||
getDataBlockFromList(pVnodeDataBlockHashList, pOneTableBlock->vgId, TSDB_PAYLOAD_SIZE, INSERT_HEAD_SIZE, 0,
|
||||
getDataBlockFromList(pVnodeDataBlockHashList, &pOneTableBlock->vgId, sizeof(pOneTableBlock->vgId), TSDB_PAYLOAD_SIZE, INSERT_HEAD_SIZE, 0,
|
||||
pOneTableBlock->pTableMeta, &dataBuf, pVnodeDataBlockList, NULL);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
taosHashCleanup(pVnodeDataBlockHashList);
|
||||
|
@ -620,7 +620,7 @@ int32_t qCloneStmtDataBlock(void** pDst, void* pSrc) {
|
|||
return qResetStmtDataBlock(*pDst, false);
|
||||
}
|
||||
|
||||
int32_t qRebuildStmtDataBlock(void** pDst, void* pSrc) {
|
||||
int32_t qRebuildStmtDataBlock(void** pDst, void* pSrc, uint64_t uid, int32_t vgId) {
|
||||
int32_t code = qCloneStmtDataBlock(pDst, pSrc);
|
||||
if (code) {
|
||||
return code;
|
||||
|
@ -633,11 +633,22 @@ int32_t qRebuildStmtDataBlock(void** pDst, void* pSrc) {
|
|||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
pBlock->vgId = vgId;
|
||||
|
||||
if (pBlock->pTableMeta) {
|
||||
pBlock->pTableMeta->uid = uid;
|
||||
pBlock->pTableMeta->vgId = vgId;
|
||||
}
|
||||
|
||||
memset(pBlock->pData, 0, sizeof(SSubmitBlk));
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
STableMeta *qGetTableMetaInDataBlock(void* pDataBlock) {
|
||||
return ((STableDataBlocks*)pDataBlock)->pTableMeta;
|
||||
}
|
||||
|
||||
void qFreeStmtDataBlock(void* pDataBlock) {
|
||||
if (pDataBlock == NULL) {
|
||||
return;
|
||||
|
|
|
@ -260,7 +260,7 @@ int8_t filterGetCompFuncIdx(int32_t type, int32_t optr) {
|
|||
comparFn = 20;
|
||||
} else if (optr == OP_TYPE_LIKE) {
|
||||
comparFn = 9;
|
||||
} else if (optr == OP_TYPE_LIKE) {
|
||||
} else if (optr == OP_TYPE_NOT_LIKE) {
|
||||
comparFn = 27;
|
||||
} else if (optr == OP_TYPE_IN) {
|
||||
comparFn = 8;
|
||||
|
@ -3512,7 +3512,8 @@ void fltConvertToTsValueNode(SFltTreeStat *stat, SValueNode* valueNode) {
|
|||
valueNode->datum.i = 0;
|
||||
}
|
||||
taosMemoryFree(timeStr);
|
||||
|
||||
|
||||
valueNode->typeData = valueNode->datum.i;
|
||||
valueNode->node.resType.type = TSDB_DATA_TYPE_TIMESTAMP;
|
||||
valueNode->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes;
|
||||
}
|
||||
|
|
|
@ -92,8 +92,9 @@ int32_t scalarGenerateSetFromList(void **data, void *pNode, uint32_t type) {
|
|||
}
|
||||
|
||||
if (IS_VAR_DATA_TYPE(type)) {
|
||||
len = varDataLen(out.columnData->pData);
|
||||
buf = varDataVal(out.columnData->pData);
|
||||
char* data = colDataGetVarData(out.columnData, 0);
|
||||
len = varDataLen(data);
|
||||
buf = varDataVal(data);
|
||||
} else {
|
||||
len = tDataTypes[type].bytes;
|
||||
buf = out.columnData->pData;
|
||||
|
@ -109,7 +110,7 @@ int32_t scalarGenerateSetFromList(void **data, void *pNode, uint32_t type) {
|
|||
}
|
||||
|
||||
if (taosHashPut(pObj, buf, (size_t)len, NULL, 0)) {
|
||||
sclError("taosHashPut failed");
|
||||
sclError("taosHashPut to set failed");
|
||||
SCL_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
|
|
|
@ -328,9 +328,10 @@ static FORCE_INLINE void varToBool(char *buf, SScalarParam* pOut, int32_t rowInd
|
|||
static FORCE_INLINE void varToNchar(char* buf, SScalarParam* pOut, int32_t rowIndex) {
|
||||
int32_t len = 0;
|
||||
int32_t inputLen = varDataLen(buf);
|
||||
int32_t outputMaxLen = (inputLen + 1) * TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE;
|
||||
|
||||
char* t = taosMemoryCalloc(1,(inputLen + 1) * TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE);
|
||||
/*int32_t resLen = */taosMbsToUcs4(varDataVal(buf), inputLen, (TdUcs4*) varDataVal(t), pOut->columnData->info.bytes, &len);
|
||||
char* t = taosMemoryCalloc(1, outputMaxLen);
|
||||
/*int32_t resLen = */taosMbsToUcs4(varDataVal(buf), inputLen, (TdUcs4*) varDataVal(t), outputMaxLen, &len);
|
||||
varDataSetLen(t, len);
|
||||
|
||||
colDataAppend(pOut->columnData, rowIndex, t, false);
|
||||
|
@ -512,7 +513,7 @@ int32_t vectorConvertToVarData(const SScalarParam* pIn, SScalarParam* pOut, int1
|
|||
if (outType == TSDB_DATA_TYPE_NCHAR) {
|
||||
varToNchar(tmp, pOut, i);
|
||||
} else {
|
||||
colDataAppend(pOutputCol, i, (char *)&value, false);
|
||||
colDataAppend(pOutputCol, i, (char *)tmp, false);
|
||||
}
|
||||
}
|
||||
} else if (IS_UNSIGNED_NUMERIC_TYPE(inType)) {
|
||||
|
@ -529,7 +530,7 @@ int32_t vectorConvertToVarData(const SScalarParam* pIn, SScalarParam* pOut, int1
|
|||
if (outType == TSDB_DATA_TYPE_NCHAR) {
|
||||
varToNchar(tmp, pOut, i);
|
||||
} else {
|
||||
colDataAppend(pOutputCol, i, (char *)&value, false);
|
||||
colDataAppend(pOutputCol, i, (char *)tmp, false);
|
||||
}
|
||||
}
|
||||
} else if (IS_FLOAT_TYPE(inType)) {
|
||||
|
@ -546,7 +547,7 @@ int32_t vectorConvertToVarData(const SScalarParam* pIn, SScalarParam* pOut, int1
|
|||
if (outType == TSDB_DATA_TYPE_NCHAR) {
|
||||
varToNchar(tmp, pOut, i);
|
||||
} else {
|
||||
colDataAppend(pOutputCol, i, (char *)&value, false);
|
||||
colDataAppend(pOutputCol, i, (char *)tmp, false);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
|
|
@ -159,6 +159,7 @@ typedef struct SSchTask {
|
|||
|
||||
typedef struct SSchJobAttr {
|
||||
EExplainMode explainMode;
|
||||
bool needRes;
|
||||
bool syncSchedule;
|
||||
bool queryJob;
|
||||
bool needFlowCtrl;
|
||||
|
@ -190,6 +191,7 @@ typedef struct SSchJob {
|
|||
SSchTask *fetchTask;
|
||||
int32_t errCode;
|
||||
SArray *errList; // SArray<SQueryErrorInfo>
|
||||
SRWLatch resLock;
|
||||
void *resData; //TODO free it or not
|
||||
int32_t resNumOfRows;
|
||||
const char *sql;
|
||||
|
|
|
@ -70,7 +70,7 @@ int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *
|
|||
}
|
||||
|
||||
int32_t schInitJob(SSchJob **pSchJob, SQueryPlan *pDag, void *transport, SArray *pNodeList, const char *sql,
|
||||
int64_t startTs, bool syncSchedule) {
|
||||
int64_t startTs, bool needRes, bool syncSchedule) {
|
||||
int32_t code = 0;
|
||||
int64_t refId = -1;
|
||||
SSchJob *pJob = taosMemoryCalloc(1, sizeof(SSchJob));
|
||||
|
@ -81,6 +81,7 @@ int32_t schInitJob(SSchJob **pSchJob, SQueryPlan *pDag, void *transport, SArray
|
|||
|
||||
pJob->attr.explainMode = pDag->explainInfo.mode;
|
||||
pJob->attr.syncSchedule = syncSchedule;
|
||||
pJob->attr.needRes = needRes;
|
||||
pJob->transport = transport;
|
||||
pJob->sql = sql;
|
||||
|
||||
|
@ -1133,16 +1134,39 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
|
|||
break;
|
||||
}
|
||||
case TDMT_VND_SUBMIT_RSP: {
|
||||
if (msg) {
|
||||
SSubmitRsp *rsp = (SSubmitRsp *)msg;
|
||||
SCH_ERR_JRET(rsp->code);
|
||||
}
|
||||
|
||||
SCH_ERR_JRET(rspCode);
|
||||
|
||||
SSubmitRsp *rsp = (SSubmitRsp *)msg;
|
||||
if (rsp) {
|
||||
pJob->resNumOfRows += rsp->affectedRows;
|
||||
if (msg) {
|
||||
SDecoder coder = {0};
|
||||
SSubmitRsp *rsp = taosMemoryMalloc(sizeof(*rsp));
|
||||
tDecoderInit(&coder, msg, msgSize);
|
||||
code = tDecodeSSubmitRsp(&coder, rsp);
|
||||
if (code) {
|
||||
SCH_TASK_ELOG("decode submitRsp failed, code:%d", code);
|
||||
tFreeSSubmitRsp(rsp);
|
||||
SCH_ERR_JRET(code);
|
||||
}
|
||||
|
||||
atomic_add_fetch_32(&pJob->resNumOfRows, rsp->affectedRows);
|
||||
SCH_TASK_DLOG("submit succeed, affectedRows:%d", rsp->affectedRows);
|
||||
|
||||
if (pJob->attr.needRes) {
|
||||
SCH_LOCK(SCH_WRITE, &pJob->resLock);
|
||||
if (pJob->resData) {
|
||||
SSubmitRsp *sum = pJob->resData;
|
||||
sum->affectedRows += rsp->affectedRows;
|
||||
sum->nBlocks += rsp->nBlocks;
|
||||
sum->pBlocks = taosMemoryRealloc(sum->pBlocks, sum->nBlocks * sizeof(*sum->pBlocks));
|
||||
memcpy(sum->pBlocks + sum->nBlocks - rsp->nBlocks, rsp->pBlocks, rsp->nBlocks * sizeof(*sum->pBlocks));
|
||||
taosMemoryFree(rsp->pBlocks);
|
||||
taosMemoryFree(rsp);
|
||||
} else {
|
||||
pJob->resData = rsp;
|
||||
}
|
||||
SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
|
||||
} else {
|
||||
tFreeSSubmitRsp(rsp);
|
||||
}
|
||||
}
|
||||
|
||||
SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
|
||||
|
@ -2350,7 +2374,7 @@ void schFreeJobImpl(void *job) {
|
|||
}
|
||||
|
||||
static int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql,
|
||||
int64_t startTs, bool syncSchedule) {
|
||||
int64_t startTs, bool needRes, bool syncSchedule) {
|
||||
qDebug("QID:0x%" PRIx64 " job started", pDag->queryId);
|
||||
|
||||
if (pNodeList == NULL || taosArrayGetSize(pNodeList) <= 0) {
|
||||
|
@ -2359,7 +2383,7 @@ static int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryPlan *pD
|
|||
|
||||
int32_t code = 0;
|
||||
SSchJob *pJob = NULL;
|
||||
SCH_ERR_JRET(schInitJob(&pJob, pDag, transport, pNodeList, sql, startTs, syncSchedule));
|
||||
SCH_ERR_JRET(schInitJob(&pJob, pDag, transport, pNodeList, sql, startTs, needRes, syncSchedule));
|
||||
|
||||
SCH_ERR_JRET(schLaunchJob(pJob));
|
||||
|
||||
|
@ -2473,7 +2497,7 @@ int32_t schedulerInit(SSchedulerCfg *cfg) {
|
|||
}
|
||||
|
||||
int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql,
|
||||
int64_t startTs, SQueryResult *pRes) {
|
||||
int64_t startTs, bool needRes, SQueryResult *pRes) {
|
||||
if (NULL == transport || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob || NULL == pRes) {
|
||||
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
@ -2481,13 +2505,17 @@ int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryPlan *pDag, in
|
|||
if (EXPLAIN_MODE_STATIC == pDag->explainInfo.mode) {
|
||||
SCH_ERR_RET(schExecStaticExplain(transport, nodeList, pDag, pJob, sql, true));
|
||||
} else {
|
||||
SCH_ERR_RET(schExecJobImpl(transport, nodeList, pDag, pJob, sql, startTs, true));
|
||||
SCH_ERR_RET(schExecJobImpl(transport, nodeList, pDag, pJob, sql, startTs, needRes, true));
|
||||
}
|
||||
|
||||
SSchJob *job = schAcquireJob(*pJob);
|
||||
|
||||
pRes->code = atomic_load_32(&job->errCode);
|
||||
pRes->numOfRows = job->resNumOfRows;
|
||||
if (needRes) {
|
||||
pRes->res = job->resData;
|
||||
job->resData = NULL;
|
||||
}
|
||||
|
||||
schReleaseJob(*pJob);
|
||||
|
||||
|
@ -2502,7 +2530,7 @@ int32_t schedulerAsyncExecJob(void *transport, SArray *pNodeList, SQueryPlan *pD
|
|||
if (EXPLAIN_MODE_STATIC == pDag->explainInfo.mode) {
|
||||
SCH_ERR_RET(schExecStaticExplain(transport, pNodeList, pDag, pJob, sql, false));
|
||||
} else {
|
||||
SCH_ERR_RET(schExecJobImpl(transport, pNodeList, pDag, pJob, sql, 0, false));
|
||||
SCH_ERR_RET(schExecJobImpl(transport, pNodeList, pDag, pJob, sql, 0, false, false));
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
|
|
@ -985,7 +985,7 @@ TEST(insertTest, normalCase) {
|
|||
taosThreadCreate(&(thread1), &thattr, schtSendRsp, &insertJobRefId);
|
||||
|
||||
SQueryResult res = {0};
|
||||
code = schedulerExecJob(mockPointer, qnodeList, &dag, &insertJobRefId, "insert into tb values(now,1)", 0, &res);
|
||||
code = schedulerExecJob(mockPointer, qnodeList, &dag, &insertJobRefId, "insert into tb values(now,1)", 0, false, &res);
|
||||
ASSERT_EQ(code, 0);
|
||||
ASSERT_EQ(res.numOfRows, 20);
|
||||
|
||||
|
|
|
@ -48,10 +48,18 @@ int64_t tGenIdPI64(void) {
|
|||
}
|
||||
}
|
||||
|
||||
int64_t ts = taosGetTimestampMs();
|
||||
uint64_t pid = taosGetPId();
|
||||
int32_t val = atomic_add_fetch_32(&tUUIDSerialNo, 1);
|
||||
int64_t id;
|
||||
|
||||
while (true) {
|
||||
int64_t ts = taosGetTimestampMs();
|
||||
uint64_t pid = taosGetPId();
|
||||
int32_t val = atomic_add_fetch_32(&tUUIDSerialNo, 1);
|
||||
|
||||
id = ((tUUIDHashId & 0x07FF) << 52) | ((pid & 0x0FFF) << 40) | ((ts & 0xFFFFFF) << 16) | (val & 0xFFFF);
|
||||
if (id) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
int64_t id = ((tUUIDHashId & 0x07FF) << 52) | ((pid & 0x0FFF) << 40) | ((ts & 0xFFFFFF) << 16) | (val & 0xFFFF);
|
||||
return id;
|
||||
}
|
||||
|
|
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue