fix:error for sml
This commit is contained in:
parent
d86e4cee9c
commit
96aa44e2cd
|
@ -106,11 +106,10 @@ int32_t qCreateSName(SName* pName, const char* pTableName, int32_t acctId, char*
|
||||||
|
|
||||||
void qDestroyBoundColInfo(void* pInfo);
|
void qDestroyBoundColInfo(void* pInfo);
|
||||||
|
|
||||||
void* smlInitHandle(SQuery* pQuery);
|
SQuery* smlInitHandle();
|
||||||
void smlDestroyHandle(void* pHandle);
|
int32_t smlBindData(SQuery* handle, SArray* tags, SArray* colsSchema, SArray* cols, bool format, STableMeta* pTableMeta,
|
||||||
int32_t smlBindData(void* handle, SArray* tags, SArray* colsSchema, SArray* cols, bool format, STableMeta* pTableMeta,
|
|
||||||
char* tableName, const char* sTableName, int32_t sTableNameLen, int32_t ttl, char* msgBuf, int16_t msgBufLen);
|
char* tableName, const char* sTableName, int32_t sTableNameLen, int32_t ttl, char* msgBuf, int16_t msgBufLen);
|
||||||
int32_t smlBuildOutput(void* handle, SHashObj* pVgHash);
|
int32_t smlBuildOutput(SQuery* handle, SHashObj* pVgHash);
|
||||||
|
|
||||||
int32_t rewriteToVnodeModifyOpStmt(SQuery* pQuery, SArray* pBufArray);
|
int32_t rewriteToVnodeModifyOpStmt(SQuery* pQuery, SArray* pBufArray);
|
||||||
SArray* serializeVgroupsCreateTableBatch(SHashObj* pVgroupHashmap);
|
SArray* serializeVgroupsCreateTableBatch(SHashObj* pVgroupHashmap);
|
||||||
|
|
|
@ -170,7 +170,6 @@ typedef struct {
|
||||||
SHashObj *childTables;
|
SHashObj *childTables;
|
||||||
SHashObj *superTables;
|
SHashObj *superTables;
|
||||||
SHashObj *pVgHash;
|
SHashObj *pVgHash;
|
||||||
void *exec;
|
|
||||||
|
|
||||||
STscObj *taos;
|
STscObj *taos;
|
||||||
SCatalog *pCatalog;
|
SCatalog *pCatalog;
|
||||||
|
@ -1488,7 +1487,6 @@ static void smlDestroyCols(SArray *cols) {
|
||||||
static void smlDestroyInfo(SSmlHandle *info) {
|
static void smlDestroyInfo(SSmlHandle *info) {
|
||||||
if (!info) return;
|
if (!info) return;
|
||||||
qDestroyQuery(info->pQuery);
|
qDestroyQuery(info->pQuery);
|
||||||
smlDestroyHandle(info->exec);
|
|
||||||
|
|
||||||
// destroy info->childTables
|
// destroy info->childTables
|
||||||
void **p1 = (void **)taosHashIterate(info->childTables, NULL);
|
void **p1 = (void **)taosHashIterate(info->childTables, NULL);
|
||||||
|
@ -1526,19 +1524,7 @@ static SSmlHandle *smlBuildSmlInfo(STscObj *pTscObj, SRequestObj *request, SMLPr
|
||||||
}
|
}
|
||||||
info->id = smlGenId();
|
info->id = smlGenId();
|
||||||
|
|
||||||
info->pQuery = (SQuery *)nodesMakeNode(QUERY_NODE_QUERY);
|
info->pQuery = smlInitHandle();
|
||||||
if (NULL == info->pQuery) {
|
|
||||||
uError("SML:0x%" PRIx64 " create info->pQuery error", info->id);
|
|
||||||
goto cleanup;
|
|
||||||
}
|
|
||||||
info->pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
|
|
||||||
info->pQuery->haveResultSet = false;
|
|
||||||
info->pQuery->msgType = TDMT_VND_SUBMIT;
|
|
||||||
info->pQuery->pRoot = (SNode *)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT);
|
|
||||||
if (NULL == info->pQuery->pRoot) {
|
|
||||||
uError("SML:0x%" PRIx64 " create info->pQuery->pRoot error", info->id);
|
|
||||||
goto cleanup;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pTscObj) {
|
if (pTscObj) {
|
||||||
info->taos = pTscObj;
|
info->taos = pTscObj;
|
||||||
|
@ -1561,10 +1547,8 @@ static SSmlHandle *smlBuildSmlInfo(STscObj *pTscObj, SRequestObj *request, SMLPr
|
||||||
info->pRequest = request;
|
info->pRequest = request;
|
||||||
info->msgBuf.buf = info->pRequest->msgBuf;
|
info->msgBuf.buf = info->pRequest->msgBuf;
|
||||||
info->msgBuf.len = ERROR_MSG_BUF_DEFAULT_SIZE;
|
info->msgBuf.len = ERROR_MSG_BUF_DEFAULT_SIZE;
|
||||||
info->pRequest->stmtType = info->pQuery->pRoot->type;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
info->exec = smlInitHandle(info->pQuery);
|
|
||||||
info->childTables = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
info->childTables = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
||||||
info->superTables = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
info->superTables = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
||||||
info->pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
|
info->pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
|
||||||
|
@ -1577,7 +1561,7 @@ static SSmlHandle *smlBuildSmlInfo(STscObj *pTscObj, SRequestObj *request, SMLPr
|
||||||
goto cleanup;
|
goto cleanup;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (NULL == info->exec || NULL == info->childTables || NULL == info->superTables || NULL == info->pVgHash ||
|
if (NULL == info->pQuery || NULL == info->childTables || NULL == info->superTables || NULL == info->pVgHash ||
|
||||||
NULL == info->dumplicateKey) {
|
NULL == info->dumplicateKey) {
|
||||||
uError("SML:0x%" PRIx64 " create info failed", info->id);
|
uError("SML:0x%" PRIx64 " create info failed", info->id);
|
||||||
goto cleanup;
|
goto cleanup;
|
||||||
|
@ -2337,7 +2321,7 @@ static int32_t smlInsertData(SSmlHandle *info) {
|
||||||
(*pMeta)->tableMeta->vgId = vg.vgId;
|
(*pMeta)->tableMeta->vgId = vg.vgId;
|
||||||
(*pMeta)->tableMeta->uid = tableData->uid; // one table merge data block together according uid
|
(*pMeta)->tableMeta->uid = tableData->uid; // one table merge data block together according uid
|
||||||
|
|
||||||
code = smlBindData(info->exec, tableData->tags, (*pMeta)->cols, tableData->cols, info->dataFormat,
|
code = smlBindData(info->pQuery, tableData->tags, (*pMeta)->cols, tableData->cols, info->dataFormat,
|
||||||
(*pMeta)->tableMeta, tableData->childTableName, tableData->sTableName, tableData->sTableNameLen,
|
(*pMeta)->tableMeta, tableData->childTableName, tableData->sTableName, tableData->sTableNameLen,
|
||||||
info->ttl, info->msgBuf.buf, info->msgBuf.len);
|
info->ttl, info->msgBuf.buf, info->msgBuf.len);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -2347,7 +2331,7 @@ static int32_t smlInsertData(SSmlHandle *info) {
|
||||||
oneTable = (SSmlTableInfo **)taosHashIterate(info->childTables, oneTable);
|
oneTable = (SSmlTableInfo **)taosHashIterate(info->childTables, oneTable);
|
||||||
}
|
}
|
||||||
|
|
||||||
code = smlBuildOutput(info->exec, info->pVgHash);
|
code = smlBuildOutput(info->pQuery, info->pVgHash);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
uError("SML:0x%" PRIx64 " smlBuildOutput failed", info->id);
|
uError("SML:0x%" PRIx64 " smlBuildOutput failed", info->id);
|
||||||
return code;
|
return code;
|
||||||
|
|
|
@ -45,12 +45,6 @@ int32_t qCreateSName(SName* pName, const char* pTableName, int32_t acctId, char*
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
typedef struct SmlExecHandle {
|
|
||||||
SHashObj* pBlockHash;
|
|
||||||
SQuery* pQuery;
|
|
||||||
} SSmlExecHandle;
|
|
||||||
|
|
||||||
|
|
||||||
static int32_t smlBoundColumnData(SArray* cols, SBoundColInfo* pBoundInfo, SSchema* pSchema, bool isTag) {
|
static int32_t smlBoundColumnData(SArray* cols, SBoundColInfo* pBoundInfo, SSchema* pSchema, bool isTag) {
|
||||||
bool* pUseCols = taosMemoryCalloc(pBoundInfo->numOfCols, sizeof(bool));
|
bool* pUseCols = taosMemoryCalloc(pBoundInfo->numOfCols, sizeof(bool));
|
||||||
if (NULL == pUseCols) {
|
if (NULL == pUseCols) {
|
||||||
|
@ -164,11 +158,10 @@ end:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t smlBindData(void* handle, SArray* tags, SArray* colsSchema, SArray* cols, bool format, STableMeta* pTableMeta,
|
int32_t smlBindData(SQuery* query, SArray* tags, SArray* colsSchema, SArray* cols, bool format, STableMeta* pTableMeta,
|
||||||
char* tableName, const char* sTableName, int32_t sTableNameLen, int32_t ttl, char* msgBuf, int16_t msgBufLen) {
|
char* tableName, const char* sTableName, int32_t sTableNameLen, int32_t ttl, char* msgBuf, int16_t msgBufLen) {
|
||||||
SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen};
|
SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen};
|
||||||
|
|
||||||
SSmlExecHandle* smlHandle = (SSmlExecHandle*)handle;
|
|
||||||
SSchema* pTagsSchema = getTableTagSchema(pTableMeta);
|
SSchema* pTagsSchema = getTableTagSchema(pTableMeta);
|
||||||
SBoundColInfo bindTags = {0};
|
SBoundColInfo bindTags = {0};
|
||||||
SVCreateTbReq *pCreateTblReq = NULL;
|
SVCreateTbReq *pCreateTblReq = NULL;
|
||||||
|
@ -200,7 +193,7 @@ int32_t smlBindData(void* handle, SArray* tags, SArray* colsSchema, SArray* cols
|
||||||
memcpy(pCreateTblReq->ctb.stbName, sTableName, sTableNameLen);
|
memcpy(pCreateTblReq->ctb.stbName, sTableName, sTableNameLen);
|
||||||
|
|
||||||
STableDataCxt* pTableCxt = NULL;
|
STableDataCxt* pTableCxt = NULL;
|
||||||
ret = insGetTableDataCxt(smlHandle->pBlockHash, &pTableMeta->uid, sizeof(pTableMeta->uid),
|
ret = insGetTableDataCxt(((SVnodeModifOpStmt *)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid, sizeof(pTableMeta->uid),
|
||||||
pTableMeta, &pCreateTblReq, &pTableCxt, false);
|
pTableMeta, &pCreateTblReq, &pTableCxt, false);
|
||||||
if (ret != TSDB_CODE_SUCCESS) {
|
if (ret != TSDB_CODE_SUCCESS) {
|
||||||
buildInvalidOperationMsg(&pBuf, "insGetTableDataCxt error");
|
buildInvalidOperationMsg(&pBuf, "insGetTableDataCxt error");
|
||||||
|
@ -247,12 +240,12 @@ int32_t smlBindData(void* handle, SArray* tags, SArray* colsSchema, SArray* cols
|
||||||
}
|
}
|
||||||
if (kv->type == TSDB_DATA_TYPE_NCHAR){
|
if (kv->type == TSDB_DATA_TYPE_NCHAR){
|
||||||
int32_t len = 0;
|
int32_t len = 0;
|
||||||
char* pUcs4 = taosMemoryCalloc(1, pSchema->bytes - VARSTR_HEADER_SIZE);
|
char* pUcs4 = taosMemoryCalloc(1, pColSchema->bytes - VARSTR_HEADER_SIZE);
|
||||||
if (NULL == pUcs4) {
|
if (NULL == pUcs4) {
|
||||||
ret = TSDB_CODE_OUT_OF_MEMORY;
|
ret = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
if (!taosMbsToUcs4(kv->value, kv->length, (TdUcs4*)pUcs4, pSchema->bytes - VARSTR_HEADER_SIZE, &len)) {
|
if (!taosMbsToUcs4(kv->value, kv->length, (TdUcs4*)pUcs4, pColSchema->bytes - VARSTR_HEADER_SIZE, &len)) {
|
||||||
if (errno == E2BIG) {
|
if (errno == E2BIG) {
|
||||||
buildInvalidOperationMsg(&pBuf, "value too long");
|
buildInvalidOperationMsg(&pBuf, "value too long");
|
||||||
ret = TSDB_CODE_PAR_VALUE_TOO_LONG;
|
ret = TSDB_CODE_PAR_VALUE_TOO_LONG;
|
||||||
|
@ -271,6 +264,14 @@ int32_t smlBindData(void* handle, SArray* tags, SArray* colsSchema, SArray* cols
|
||||||
}
|
}
|
||||||
pVal->flag = CV_FLAG_VALUE;
|
pVal->flag = CV_FLAG_VALUE;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SRow** pRow = taosArrayReserve(pTableCxt->pData->aRowP, 1);
|
||||||
|
ret = tRowBuild(pTableCxt->pValues, pTableCxt->pSchema, pRow);
|
||||||
|
if (TSDB_CODE_SUCCESS != ret) {
|
||||||
|
buildInvalidOperationMsg(&pBuf, "tRowBuild error");
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
insCheckTableDataOrder(pTableCxt, TD_ROW_KEY(*pRow));
|
||||||
}
|
}
|
||||||
|
|
||||||
end:
|
end:
|
||||||
|
@ -280,23 +281,42 @@ end:
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
void* smlInitHandle(SQuery* pQuery) {
|
SQuery* smlInitHandle() {
|
||||||
SSmlExecHandle* handle = taosMemoryCalloc(1, sizeof(SSmlExecHandle));
|
SQuery *pQuery = (SQuery *)nodesMakeNode(QUERY_NODE_QUERY);
|
||||||
if (!handle) return NULL;
|
if (NULL == pQuery) {
|
||||||
handle->pBlockHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
|
uError("create pQuery error");
|
||||||
handle->pQuery = pQuery;
|
return NULL;
|
||||||
|
}
|
||||||
|
pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
|
||||||
|
pQuery->haveResultSet = false;
|
||||||
|
pQuery->msgType = TDMT_VND_SUBMIT;
|
||||||
|
SVnodeModifOpStmt *stmt = (SVnodeModifOpStmt*)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT);
|
||||||
|
if (NULL == stmt) {
|
||||||
|
uError("create SVnodeModifOpStmt error");
|
||||||
|
qDestroyQuery(pQuery);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
stmt->pVgroupsHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
|
||||||
|
stmt->pTableBlockHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
|
||||||
|
stmt->freeHashFunc = insDestroyTableDataCxtHashMap;
|
||||||
|
stmt->freeArrayFunc = insDestroyVgroupDataCxtList;
|
||||||
|
|
||||||
return handle;
|
pQuery->pRoot = (SNode *)stmt;
|
||||||
|
return pQuery;
|
||||||
}
|
}
|
||||||
|
|
||||||
void smlDestroyHandle(void* pHandle) {
|
int32_t smlBuildOutput(SQuery * handle, SHashObj* pVgHash) {
|
||||||
if (!pHandle) return;
|
SVnodeModifOpStmt *pStmt = (SVnodeModifOpStmt*)(handle)->pRoot;
|
||||||
SSmlExecHandle* handle = (SSmlExecHandle*)pHandle;
|
// merge according to vgId
|
||||||
insDestroyBlockHashmap(handle->pBlockHash);
|
int32_t code = insMergeTableDataCxt(pStmt->pTableBlockHashObj, &pStmt->pVgDataBlocks);
|
||||||
taosMemoryFree(handle);
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
}
|
uError("insMergeTableDataCxt failed");
|
||||||
|
return code;
|
||||||
int32_t smlBuildOutput(void* handle, SHashObj* pVgHash) {
|
}
|
||||||
SSmlExecHandle* smlHandle = (SSmlExecHandle*)handle;
|
code = insBuildVgDataBlocks(pVgHash, pStmt->pVgDataBlocks, &pStmt->pDataBlocks);
|
||||||
return qBuildStmtOutput(smlHandle->pQuery, pVgHash, smlHandle->pBlockHash);
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
uError("insBuildVgDataBlocks failed");
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1083,8 +1083,9 @@ static int32_t createTableDataCxt(STableMeta* pTableMeta, SVCreateTbReq** pCreat
|
||||||
|
|
||||||
int32_t insGetTableDataCxt(SHashObj* pHash, void* id, int32_t idLen, STableMeta* pTableMeta,
|
int32_t insGetTableDataCxt(SHashObj* pHash, void* id, int32_t idLen, STableMeta* pTableMeta,
|
||||||
SVCreateTbReq** pCreateTbReq, STableDataCxt** pTableCxt, bool colMode) {
|
SVCreateTbReq** pCreateTbReq, STableDataCxt** pTableCxt, bool colMode) {
|
||||||
*pTableCxt = taosHashGet(pHash, id, idLen);
|
STableDataCxt** tmp = (STableDataCxt**)taosHashGet(pHash, id, idLen);
|
||||||
if (NULL != *pTableCxt) {
|
if (NULL != tmp) {
|
||||||
|
*pTableCxt = *tmp;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
int32_t code = createTableDataCxt(pTableMeta, pCreateTbReq, pTableCxt, colMode);
|
int32_t code = createTableDataCxt(pTableMeta, pCreateTbReq, pTableCxt, colMode);
|
||||||
|
|
Loading…
Reference in New Issue