Merge branch 'refact/submit_req' into refact/TD-20895-3.0
This commit is contained in:
commit
e7c2c62e5c
|
@ -106,11 +106,10 @@ int32_t qCreateSName(SName* pName, const char* pTableName, int32_t acctId, char*
|
||||||
|
|
||||||
void qDestroyBoundColInfo(void* pInfo);
|
void qDestroyBoundColInfo(void* pInfo);
|
||||||
|
|
||||||
void* smlInitHandle(SQuery* pQuery);
|
SQuery* smlInitHandle();
|
||||||
void smlDestroyHandle(void* pHandle);
|
int32_t smlBindData(SQuery* handle, SArray* tags, SArray* colsSchema, SArray* cols, bool format, STableMeta* pTableMeta,
|
||||||
int32_t smlBindData(void* handle, SArray* tags, SArray* colsSchema, SArray* cols, bool format, STableMeta* pTableMeta,
|
|
||||||
char* tableName, const char* sTableName, int32_t sTableNameLen, int32_t ttl, char* msgBuf, int16_t msgBufLen);
|
char* tableName, const char* sTableName, int32_t sTableNameLen, int32_t ttl, char* msgBuf, int16_t msgBufLen);
|
||||||
int32_t smlBuildOutput(void* handle, SHashObj* pVgHash);
|
int32_t smlBuildOutput(SQuery* handle, SHashObj* pVgHash);
|
||||||
|
|
||||||
int32_t rewriteToVnodeModifyOpStmt(SQuery* pQuery, SArray* pBufArray);
|
int32_t rewriteToVnodeModifyOpStmt(SQuery* pQuery, SArray* pBufArray);
|
||||||
SArray* serializeVgroupsCreateTableBatch(SHashObj* pVgroupHashmap);
|
SArray* serializeVgroupsCreateTableBatch(SHashObj* pVgroupHashmap);
|
||||||
|
|
|
@ -170,7 +170,6 @@ typedef struct {
|
||||||
SHashObj *childTables;
|
SHashObj *childTables;
|
||||||
SHashObj *superTables;
|
SHashObj *superTables;
|
||||||
SHashObj *pVgHash;
|
SHashObj *pVgHash;
|
||||||
void *exec;
|
|
||||||
|
|
||||||
STscObj *taos;
|
STscObj *taos;
|
||||||
SCatalog *pCatalog;
|
SCatalog *pCatalog;
|
||||||
|
@ -712,21 +711,21 @@ static bool smlParseBool(SSmlKv *kvVal) {
|
||||||
const char *pVal = kvVal->value;
|
const char *pVal = kvVal->value;
|
||||||
int32_t len = kvVal->length;
|
int32_t len = kvVal->length;
|
||||||
if ((len == 1) && (pVal[0] == 't' || pVal[0] == 'T')) {
|
if ((len == 1) && (pVal[0] == 't' || pVal[0] == 'T')) {
|
||||||
kvVal->i = true;
|
kvVal->i = TSDB_TRUE;
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((len == 1) && (pVal[0] == 'f' || pVal[0] == 'F')) {
|
if ((len == 1) && (pVal[0] == 'f' || pVal[0] == 'F')) {
|
||||||
kvVal->i = false;
|
kvVal->i = TSDB_FALSE;
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((len == 4) && !strncasecmp(pVal, "true", len)) {
|
if ((len == 4) && !strncasecmp(pVal, "true", len)) {
|
||||||
kvVal->i = true;
|
kvVal->i = TSDB_TRUE;
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
if ((len == 5) && !strncasecmp(pVal, "false", len)) {
|
if ((len == 5) && !strncasecmp(pVal, "false", len)) {
|
||||||
kvVal->i = false;
|
kvVal->i = TSDB_FALSE;
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
|
@ -1488,7 +1487,6 @@ static void smlDestroyCols(SArray *cols) {
|
||||||
static void smlDestroyInfo(SSmlHandle *info) {
|
static void smlDestroyInfo(SSmlHandle *info) {
|
||||||
if (!info) return;
|
if (!info) return;
|
||||||
qDestroyQuery(info->pQuery);
|
qDestroyQuery(info->pQuery);
|
||||||
smlDestroyHandle(info->exec);
|
|
||||||
|
|
||||||
// destroy info->childTables
|
// destroy info->childTables
|
||||||
void **p1 = (void **)taosHashIterate(info->childTables, NULL);
|
void **p1 = (void **)taosHashIterate(info->childTables, NULL);
|
||||||
|
@ -1526,19 +1524,7 @@ static SSmlHandle *smlBuildSmlInfo(STscObj *pTscObj, SRequestObj *request, SMLPr
|
||||||
}
|
}
|
||||||
info->id = smlGenId();
|
info->id = smlGenId();
|
||||||
|
|
||||||
info->pQuery = (SQuery *)nodesMakeNode(QUERY_NODE_QUERY);
|
info->pQuery = smlInitHandle();
|
||||||
if (NULL == info->pQuery) {
|
|
||||||
uError("SML:0x%" PRIx64 " create info->pQuery error", info->id);
|
|
||||||
goto cleanup;
|
|
||||||
}
|
|
||||||
info->pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
|
|
||||||
info->pQuery->haveResultSet = false;
|
|
||||||
info->pQuery->msgType = TDMT_VND_SUBMIT;
|
|
||||||
info->pQuery->pRoot = (SNode *)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT);
|
|
||||||
if (NULL == info->pQuery->pRoot) {
|
|
||||||
uError("SML:0x%" PRIx64 " create info->pQuery->pRoot error", info->id);
|
|
||||||
goto cleanup;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pTscObj) {
|
if (pTscObj) {
|
||||||
info->taos = pTscObj;
|
info->taos = pTscObj;
|
||||||
|
@ -1561,10 +1547,8 @@ static SSmlHandle *smlBuildSmlInfo(STscObj *pTscObj, SRequestObj *request, SMLPr
|
||||||
info->pRequest = request;
|
info->pRequest = request;
|
||||||
info->msgBuf.buf = info->pRequest->msgBuf;
|
info->msgBuf.buf = info->pRequest->msgBuf;
|
||||||
info->msgBuf.len = ERROR_MSG_BUF_DEFAULT_SIZE;
|
info->msgBuf.len = ERROR_MSG_BUF_DEFAULT_SIZE;
|
||||||
info->pRequest->stmtType = info->pQuery->pRoot->type;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
info->exec = smlInitHandle(info->pQuery);
|
|
||||||
info->childTables = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
info->childTables = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
||||||
info->superTables = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
info->superTables = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
||||||
info->pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
|
info->pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
|
||||||
|
@ -1577,7 +1561,7 @@ static SSmlHandle *smlBuildSmlInfo(STscObj *pTscObj, SRequestObj *request, SMLPr
|
||||||
goto cleanup;
|
goto cleanup;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (NULL == info->exec || NULL == info->childTables || NULL == info->superTables || NULL == info->pVgHash ||
|
if (NULL == info->pQuery || NULL == info->childTables || NULL == info->superTables || NULL == info->pVgHash ||
|
||||||
NULL == info->dumplicateKey) {
|
NULL == info->dumplicateKey) {
|
||||||
uError("SML:0x%" PRIx64 " create info failed", info->id);
|
uError("SML:0x%" PRIx64 " create info failed", info->id);
|
||||||
goto cleanup;
|
goto cleanup;
|
||||||
|
@ -2337,7 +2321,7 @@ static int32_t smlInsertData(SSmlHandle *info) {
|
||||||
(*pMeta)->tableMeta->vgId = vg.vgId;
|
(*pMeta)->tableMeta->vgId = vg.vgId;
|
||||||
(*pMeta)->tableMeta->uid = tableData->uid; // one table merge data block together according uid
|
(*pMeta)->tableMeta->uid = tableData->uid; // one table merge data block together according uid
|
||||||
|
|
||||||
code = smlBindData(info->exec, tableData->tags, (*pMeta)->cols, tableData->cols, info->dataFormat,
|
code = smlBindData(info->pQuery, tableData->tags, (*pMeta)->cols, tableData->cols, info->dataFormat,
|
||||||
(*pMeta)->tableMeta, tableData->childTableName, tableData->sTableName, tableData->sTableNameLen,
|
(*pMeta)->tableMeta, tableData->childTableName, tableData->sTableName, tableData->sTableNameLen,
|
||||||
info->ttl, info->msgBuf.buf, info->msgBuf.len);
|
info->ttl, info->msgBuf.buf, info->msgBuf.len);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -2347,7 +2331,7 @@ static int32_t smlInsertData(SSmlHandle *info) {
|
||||||
oneTable = (SSmlTableInfo **)taosHashIterate(info->childTables, oneTable);
|
oneTable = (SSmlTableInfo **)taosHashIterate(info->childTables, oneTable);
|
||||||
}
|
}
|
||||||
|
|
||||||
code = smlBuildOutput(info->exec, info->pVgHash);
|
code = smlBuildOutput(info->pQuery, info->pVgHash);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
uError("SML:0x%" PRIx64 " smlBuildOutput failed", info->id);
|
uError("SML:0x%" PRIx64 " smlBuildOutput failed", info->id);
|
||||||
return code;
|
return code;
|
||||||
|
|
|
@ -162,7 +162,7 @@ int32_t tRowBuild(SArray *aColVal, STSchema *pTSchema, SRow **ppRow) {
|
||||||
ntp = sizeof(SRow);
|
ntp = sizeof(SRow);
|
||||||
break;
|
break;
|
||||||
case HAS_VALUE:
|
case HAS_VALUE:
|
||||||
ntp = sizeof(SRow) + pTSchema->flen;
|
ntp = sizeof(SRow) + pTSchema->flen + ntp;
|
||||||
break;
|
break;
|
||||||
case (HAS_NULL | HAS_NONE):
|
case (HAS_NULL | HAS_NONE):
|
||||||
ntp = sizeof(SRow) + BIT1_SIZE(pTSchema->numOfCols - 1);
|
ntp = sizeof(SRow) + BIT1_SIZE(pTSchema->numOfCols - 1);
|
||||||
|
|
|
@ -909,6 +909,29 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
|
||||||
goto _exit;
|
goto _exit;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
|
||||||
|
int32_t nColData = TARRAY_SIZE(pSubmitTbData->aCol);
|
||||||
|
SColData *aColData = (SColData *)TARRAY_DATA(pSubmitTbData->aCol);
|
||||||
|
|
||||||
|
if (nColData <= 0) {
|
||||||
|
code = TSDB_CODE_INVALID_MSG;
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (aColData[0].cid != PRIMARYKEY_TIMESTAMP_COL_ID || aColData[0].type != TSDB_DATA_TYPE_TIMESTAMP ||
|
||||||
|
aColData[0].nVal <= 0) {
|
||||||
|
code = TSDB_CODE_INVALID_MSG;
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int32_t i = 1; i < nColData; i++) {
|
||||||
|
if (aColData[i].nVal != aColData[0].nVal) {
|
||||||
|
code = TSDB_CODE_INVALID_MSG;
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// loop to handle
|
// loop to handle
|
||||||
|
|
|
@ -167,6 +167,7 @@ int32_t insInitBoundColsInfo(int32_t numOfBound, SBoundColInfo *pInfo);
|
||||||
void insCheckTableDataOrder(STableDataCxt *pTableCxt, TSKEY tsKey);
|
void insCheckTableDataOrder(STableDataCxt *pTableCxt, TSKEY tsKey);
|
||||||
int32_t insGetTableDataCxt(SHashObj *pHash, void *id, int32_t idLen, STableMeta *pTableMeta,
|
int32_t insGetTableDataCxt(SHashObj *pHash, void *id, int32_t idLen, STableMeta *pTableMeta,
|
||||||
SVCreateTbReq **pCreateTbReq, STableDataCxt **pTableCxt, bool colMode);
|
SVCreateTbReq **pCreateTbReq, STableDataCxt **pTableCxt, bool colMode);
|
||||||
|
int32_t initTableColSubmitData(STableDataCxt* pTableCxt);
|
||||||
int32_t insMergeTableDataCxt(SHashObj *pTableHash, SArray **pVgDataBlocks);
|
int32_t insMergeTableDataCxt(SHashObj *pTableHash, SArray **pVgDataBlocks);
|
||||||
int32_t insBuildVgDataBlocks(SHashObj *pVgroupsHashObj, SArray *pVgDataBlocks, SArray **pDataBlocks);
|
int32_t insBuildVgDataBlocks(SHashObj *pVgroupsHashObj, SArray *pVgDataBlocks, SArray **pDataBlocks);
|
||||||
void insDestroyTableDataCxtHashMap(SHashObj *pTableCxtHash);
|
void insDestroyTableDataCxtHashMap(SHashObj *pTableCxtHash);
|
||||||
|
@ -174,5 +175,5 @@ void insDestroyVgroupDataCxt(SVgroupDataCxt *pVgCxt);
|
||||||
void insDestroyVgroupDataCxtList(SArray *pVgCxtList);
|
void insDestroyVgroupDataCxtList(SArray *pVgCxtList);
|
||||||
void insDestroyVgroupDataCxtHashMap(SHashObj *pVgCxtHash);
|
void insDestroyVgroupDataCxtHashMap(SHashObj *pVgCxtHash);
|
||||||
void insDestroyTableDataCxt(STableDataCxt* pTableCxt);
|
void insDestroyTableDataCxt(STableDataCxt* pTableCxt);
|
||||||
|
void destroyBoundColInfo(SBoundColInfo* pInfo);
|
||||||
#endif // TDENGINE_PAR_INSERT_UTIL_H
|
#endif // TDENGINE_PAR_INSERT_UTIL_H
|
||||||
|
|
|
@ -45,95 +45,46 @@ int32_t qCreateSName(SName* pName, const char* pTableName, int32_t acctId, char*
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
typedef struct SmlExecTableHandle {
|
static int32_t smlBoundColumnData(SArray* cols, SBoundColInfo* pBoundInfo, SSchema* pSchema, bool isTag) {
|
||||||
SParsedDataColInfo tags; // each table
|
bool* pUseCols = taosMemoryCalloc(pBoundInfo->numOfCols, sizeof(bool));
|
||||||
SVCreateTbReq createTblReq; // each table
|
if (NULL == pUseCols) {
|
||||||
} SmlExecTableHandle;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
|
||||||
typedef struct SmlExecHandle {
|
|
||||||
SHashObj* pBlockHash;
|
|
||||||
SmlExecTableHandle tableExecHandle;
|
|
||||||
SQuery* pQuery;
|
|
||||||
} SSmlExecHandle;
|
|
||||||
|
|
||||||
static void smlDestroyTableHandle(void* pHandle) {
|
|
||||||
SmlExecTableHandle* handle = (SmlExecTableHandle*)pHandle;
|
|
||||||
destroyBoundColumnInfo(&handle->tags);
|
|
||||||
tdDestroySVCreateTbReq(&handle->createTblReq);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t smlBoundColumnData(SArray* cols, SParsedDataColInfo* pColList, SSchema* pSchema, bool isTag) {
|
pBoundInfo->numOfBound = 0;
|
||||||
col_id_t nCols = pColList->numOfCols;
|
int16_t lastColIdx = -1; // last column found
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
pColList->numOfBound = 0;
|
|
||||||
pColList->boundNullLen = 0;
|
|
||||||
memset(pColList->boundColumns, 0, sizeof(col_id_t) * nCols);
|
|
||||||
for (col_id_t i = 0; i < nCols; ++i) {
|
|
||||||
pColList->cols[i].valStat = VAL_STAT_NONE;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool isOrdered = true;
|
|
||||||
col_id_t lastColIdx = -1; // last column found
|
|
||||||
for (int i = 0; i < taosArrayGetSize(cols); ++i) {
|
for (int i = 0; i < taosArrayGetSize(cols); ++i) {
|
||||||
SSmlKv* kv = taosArrayGetP(cols, i);
|
SSmlKv* kv = taosArrayGetP(cols, i);
|
||||||
SToken sToken = {.n = kv->keyLen, .z = (char*)kv->key};
|
SToken sToken = {.n = kv->keyLen, .z = (char*)kv->key};
|
||||||
col_id_t t = lastColIdx + 1;
|
col_id_t t = lastColIdx + 1;
|
||||||
col_id_t index = ((t == 0 && !isTag) ? 0 : insFindCol(&sToken, t, nCols, pSchema));
|
col_id_t index = ((t == 0 && !isTag) ? 0 : insFindCol(&sToken, t, pBoundInfo->numOfCols, pSchema));
|
||||||
uDebug("SML, index:%d, t:%d, ncols:%d", index, t, nCols);
|
uDebug("SML, index:%d, t:%d, ncols:%d", index, t, pBoundInfo->numOfCols);
|
||||||
if (index < 0 && t > 0) {
|
if (index < 0 && t > 0) {
|
||||||
index = insFindCol(&sToken, 0, t, pSchema);
|
index = insFindCol(&sToken, 0, t, pSchema);
|
||||||
isOrdered = false;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (index < 0) {
|
if (index < 0) {
|
||||||
uError("smlBoundColumnData. index:%d", index);
|
uError("smlBoundColumnData. index:%d", index);
|
||||||
return TSDB_CODE_SML_INVALID_DATA;
|
code = TSDB_CODE_SML_INVALID_DATA;
|
||||||
|
goto end;
|
||||||
}
|
}
|
||||||
if (pColList->cols[index].valStat == VAL_STAT_HAS) {
|
if (pUseCols[index]) {
|
||||||
uError("smlBoundColumnData. already set. index:%d", index);
|
uError("smlBoundColumnData. already set. index:%d", index);
|
||||||
return TSDB_CODE_SML_INVALID_DATA;
|
code = TSDB_CODE_SML_INVALID_DATA;
|
||||||
|
goto end;
|
||||||
}
|
}
|
||||||
lastColIdx = index;
|
lastColIdx = index;
|
||||||
pColList->cols[index].valStat = VAL_STAT_HAS;
|
pUseCols[index] = true;
|
||||||
pColList->boundColumns[pColList->numOfBound] = index;
|
pBoundInfo->pColIndex[pBoundInfo->numOfBound] = index;
|
||||||
++pColList->numOfBound;
|
++pBoundInfo->numOfBound;
|
||||||
switch (pSchema[t].type) {
|
|
||||||
case TSDB_DATA_TYPE_BINARY:
|
|
||||||
pColList->boundNullLen += (sizeof(VarDataOffsetT) + VARSTR_HEADER_SIZE + CHAR_BYTES);
|
|
||||||
break;
|
|
||||||
case TSDB_DATA_TYPE_NCHAR:
|
|
||||||
pColList->boundNullLen += (sizeof(VarDataOffsetT) + VARSTR_HEADER_SIZE + TSDB_NCHAR_SIZE);
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
pColList->boundNullLen += TYPE_BYTES[pSchema[t].type];
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pColList->orderStatus = isOrdered ? ORDER_STATUS_ORDERED : ORDER_STATUS_DISORDERED;
|
end:
|
||||||
|
taosMemoryFree(pUseCols);
|
||||||
|
|
||||||
if (!isOrdered) {
|
return code;
|
||||||
pColList->colIdxInfo = taosMemoryCalloc(pColList->numOfBound, sizeof(SBoundIdxInfo));
|
|
||||||
if (NULL == pColList->colIdxInfo) {
|
|
||||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
|
||||||
}
|
|
||||||
SBoundIdxInfo* pColIdx = pColList->colIdxInfo;
|
|
||||||
for (col_id_t i = 0; i < pColList->numOfBound; ++i) {
|
|
||||||
pColIdx[i].schemaColIdx = pColList->boundColumns[i];
|
|
||||||
pColIdx[i].boundIdx = i;
|
|
||||||
}
|
|
||||||
taosSort(pColIdx, pColList->numOfBound, sizeof(SBoundIdxInfo), insSchemaIdxCompar);
|
|
||||||
for (col_id_t i = 0; i < pColList->numOfBound; ++i) {
|
|
||||||
pColIdx[i].finalIdx = i;
|
|
||||||
}
|
|
||||||
taosSort(pColIdx, pColList->numOfBound, sizeof(SBoundIdxInfo), insBoundIdxCompar);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pColList->numOfCols > pColList->numOfBound) {
|
|
||||||
memset(&pColList->boundColumns[pColList->numOfBound], 0,
|
|
||||||
sizeof(col_id_t) * (pColList->numOfCols - pColList->numOfBound));
|
|
||||||
}
|
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -146,7 +97,7 @@ static int32_t smlBoundColumnData(SArray* cols, SParsedDataColInfo* pColList, SS
|
||||||
* @param msg
|
* @param msg
|
||||||
* @return int32_t
|
* @return int32_t
|
||||||
*/
|
*/
|
||||||
static int32_t smlBuildTagRow(SArray* cols, SParsedDataColInfo* tags, SSchema* pSchema, STag** ppTag, SArray** tagName,
|
static int32_t smlBuildTagRow(SArray* cols, SBoundColInfo* tags, SSchema* pSchema, STag** ppTag, SArray** tagName,
|
||||||
SMsgBuf* msg) {
|
SMsgBuf* msg) {
|
||||||
SArray* pTagArray = taosArrayInit(tags->numOfBound, sizeof(STagVal));
|
SArray* pTagArray = taosArrayInit(tags->numOfBound, sizeof(STagVal));
|
||||||
if (!pTagArray) {
|
if (!pTagArray) {
|
||||||
|
@ -159,7 +110,7 @@ static int32_t smlBuildTagRow(SArray* cols, SParsedDataColInfo* tags, SSchema* p
|
||||||
|
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
for (int i = 0; i < tags->numOfBound; ++i) {
|
for (int i = 0; i < tags->numOfBound; ++i) {
|
||||||
SSchema* pTagSchema = &pSchema[tags->boundColumns[i]];
|
SSchema* pTagSchema = &pSchema[tags->pColIndex[i]];
|
||||||
SSmlKv* kv = taosArrayGetP(cols, i);
|
SSmlKv* kv = taosArrayGetP(cols, i);
|
||||||
|
|
||||||
taosArrayPush(*tagName, pTagSchema->name);
|
taosArrayPush(*tagName, pTagSchema->name);
|
||||||
|
@ -207,153 +158,165 @@ end:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t smlBindData(void* handle, SArray* tags, SArray* colsSchema, SArray* cols, bool format, STableMeta* pTableMeta,
|
int32_t smlBindData(SQuery* query, SArray* tags, SArray* colsSchema, SArray* cols, bool format, STableMeta* pTableMeta,
|
||||||
char* tableName, const char* sTableName, int32_t sTableNameLen, int32_t ttl, char* msgBuf, int16_t msgBufLen) {
|
char* tableName, const char* sTableName, int32_t sTableNameLen, int32_t ttl, char* msgBuf, int16_t msgBufLen) {
|
||||||
SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen};
|
SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen};
|
||||||
|
|
||||||
SSmlExecHandle* smlHandle = (SSmlExecHandle*)handle;
|
|
||||||
smlDestroyTableHandle(&smlHandle->tableExecHandle); // free for each table
|
|
||||||
SSchema* pTagsSchema = getTableTagSchema(pTableMeta);
|
SSchema* pTagsSchema = getTableTagSchema(pTableMeta);
|
||||||
insSetBoundColumnInfo(&smlHandle->tableExecHandle.tags, pTagsSchema, getNumOfTags(pTableMeta));
|
SBoundColInfo bindTags = {0};
|
||||||
int ret = smlBoundColumnData(tags, &smlHandle->tableExecHandle.tags, pTagsSchema, true);
|
SVCreateTbReq *pCreateTblReq = NULL;
|
||||||
|
SArray* tagName = NULL;
|
||||||
|
|
||||||
|
insInitBoundColsInfo(getNumOfTags(pTableMeta), &bindTags);
|
||||||
|
int ret = smlBoundColumnData(tags, &bindTags, pTagsSchema, true);
|
||||||
if (ret != TSDB_CODE_SUCCESS) {
|
if (ret != TSDB_CODE_SUCCESS) {
|
||||||
buildInvalidOperationMsg(&pBuf, "bound tags error");
|
buildInvalidOperationMsg(&pBuf, "bound tags error");
|
||||||
return ret;
|
goto end;
|
||||||
}
|
}
|
||||||
|
|
||||||
STag* pTag = NULL;
|
STag* pTag = NULL;
|
||||||
SArray* tagName = NULL;
|
|
||||||
ret = smlBuildTagRow(tags, &smlHandle->tableExecHandle.tags, pTagsSchema, &pTag, &tagName, &pBuf);
|
ret = smlBuildTagRow(tags, &bindTags, pTagsSchema, &pTag, &tagName, &pBuf);
|
||||||
if (ret != TSDB_CODE_SUCCESS) {
|
if (ret != TSDB_CODE_SUCCESS) {
|
||||||
taosArrayDestroy(tagName);
|
goto end;
|
||||||
return ret;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
insBuildCreateTbReq(&smlHandle->tableExecHandle.createTblReq, tableName, pTag, pTableMeta->suid, NULL, tagName,
|
pCreateTblReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
|
||||||
|
if (NULL == pCreateTblReq) {
|
||||||
|
ret = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
insBuildCreateTbReq(pCreateTblReq, tableName, pTag, pTableMeta->suid, NULL, tagName,
|
||||||
pTableMeta->tableInfo.numOfTags, ttl);
|
pTableMeta->tableInfo.numOfTags, ttl);
|
||||||
taosArrayDestroy(tagName);
|
|
||||||
|
|
||||||
smlHandle->tableExecHandle.createTblReq.ctb.stbName = taosMemoryMalloc(sTableNameLen + 1);
|
pCreateTblReq->ctb.stbName = taosMemoryCalloc(1, sTableNameLen + 1);
|
||||||
memcpy(smlHandle->tableExecHandle.createTblReq.ctb.stbName, sTableName, sTableNameLen);
|
memcpy(pCreateTblReq->ctb.stbName, sTableName, sTableNameLen);
|
||||||
smlHandle->tableExecHandle.createTblReq.ctb.stbName[sTableNameLen] = 0;
|
|
||||||
|
|
||||||
STableDataBlocks* pDataBlock = NULL;
|
STableDataCxt* pTableCxt = NULL;
|
||||||
ret = insGetDataBlockFromList(smlHandle->pBlockHash, &pTableMeta->uid, sizeof(pTableMeta->uid),
|
ret = insGetTableDataCxt(((SVnodeModifOpStmt *)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid, sizeof(pTableMeta->uid),
|
||||||
TSDB_DEFAULT_PAYLOAD_SIZE, sizeof(SSubmitBlk), getTableInfo(pTableMeta).rowSize,
|
pTableMeta, &pCreateTblReq, &pTableCxt, false);
|
||||||
pTableMeta, &pDataBlock, NULL, &smlHandle->tableExecHandle.createTblReq);
|
|
||||||
if (ret != TSDB_CODE_SUCCESS) {
|
if (ret != TSDB_CODE_SUCCESS) {
|
||||||
buildInvalidOperationMsg(&pBuf, "create data block error");
|
buildInvalidOperationMsg(&pBuf, "insGetTableDataCxt error");
|
||||||
return ret;
|
goto end;
|
||||||
}
|
}
|
||||||
|
|
||||||
SSchema* pSchema = getTableColumnSchema(pTableMeta);
|
SSchema* pSchema = getTableColumnSchema(pTableMeta);
|
||||||
|
ret = smlBoundColumnData(colsSchema, &pTableCxt->boundColsInfo, pSchema, false);
|
||||||
ret = smlBoundColumnData(colsSchema, &pDataBlock->boundColumnInfo, pSchema, false);
|
|
||||||
if (ret != TSDB_CODE_SUCCESS) {
|
if (ret != TSDB_CODE_SUCCESS) {
|
||||||
buildInvalidOperationMsg(&pBuf, "bound cols error");
|
buildInvalidOperationMsg(&pBuf, "bound cols error");
|
||||||
return ret;
|
goto end;
|
||||||
}
|
}
|
||||||
int32_t extendedRowSize = insGetExtendedRowSize(pDataBlock);
|
|
||||||
SParsedDataColInfo* spd = &pDataBlock->boundColumnInfo;
|
|
||||||
SRowBuilder* pBuilder = &pDataBlock->rowBuilder;
|
|
||||||
SMemParam param = {.rb = pBuilder};
|
|
||||||
|
|
||||||
insInitRowBuilder(&pDataBlock->rowBuilder, pDataBlock->pTableMeta->sversion, &pDataBlock->boundColumnInfo);
|
ret = initTableColSubmitData(pTableCxt);
|
||||||
|
if (ret != TSDB_CODE_SUCCESS) {
|
||||||
|
buildInvalidOperationMsg(&pBuf, "initTableColSubmitData error");
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t rowNum = taosArrayGetSize(cols);
|
int32_t rowNum = taosArrayGetSize(cols);
|
||||||
if (rowNum <= 0) {
|
if (rowNum <= 0) {
|
||||||
return buildInvalidOperationMsg(&pBuf, "cols size <= 0");
|
ret = buildInvalidOperationMsg(&pBuf, "cols size <= 0");
|
||||||
}
|
goto end;
|
||||||
ret = insAllocateMemForSize(pDataBlock, extendedRowSize * rowNum);
|
|
||||||
if (ret != TSDB_CODE_SUCCESS) {
|
|
||||||
buildInvalidOperationMsg(&pBuf, "allocate memory error");
|
|
||||||
return ret;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t r = 0; r < rowNum; ++r) {
|
for (int32_t r = 0; r < rowNum; ++r) {
|
||||||
STSRow* row = (STSRow*)(pDataBlock->pData + pDataBlock->size); // skip the SSubmitBlk header
|
|
||||||
tdSRowResetBuf(pBuilder, row);
|
|
||||||
void* rowData = taosArrayGetP(cols, r);
|
void* rowData = taosArrayGetP(cols, r);
|
||||||
size_t rowDataSize = 0;
|
|
||||||
if (format) {
|
|
||||||
rowDataSize = taosArrayGetSize(rowData);
|
|
||||||
}
|
|
||||||
|
|
||||||
// 1. set the parsed value from sql string
|
// 1. set the parsed value from sql string
|
||||||
for (int c = 0, j = 0; c < spd->numOfBound; ++c) {
|
for (int c = 0; c < pTableCxt->boundColsInfo.numOfBound; ++c) {
|
||||||
SSchema* pColSchema = &pSchema[spd->boundColumns[c]];
|
SSchema* pColSchema = &pSchema[pTableCxt->boundColsInfo.pColIndex[c]];
|
||||||
|
SColVal* pVal = taosArrayGet(pTableCxt->pValues, pTableCxt->boundColsInfo.pColIndex[c]);
|
||||||
param.schema = pColSchema;
|
|
||||||
insGetSTSRowAppendInfo(pBuilder->rowType, spd, c, ¶m.toffset, ¶m.colIdx);
|
|
||||||
|
|
||||||
SSmlKv* kv = NULL;
|
SSmlKv* kv = NULL;
|
||||||
if (format) {
|
if (!format){
|
||||||
if (j < rowDataSize) {
|
|
||||||
kv = taosArrayGetP(rowData, j);
|
|
||||||
if (rowDataSize != spd->numOfBound && j != 0 &&
|
|
||||||
(kv->keyLen != strlen(pColSchema->name) || strncmp(kv->key, pColSchema->name, kv->keyLen) != 0)) {
|
|
||||||
kv = NULL;
|
|
||||||
} else {
|
|
||||||
j++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
void** p = taosHashGet(rowData, pColSchema->name, strlen(pColSchema->name));
|
void** p = taosHashGet(rowData, pColSchema->name, strlen(pColSchema->name));
|
||||||
if (p) kv = *p;
|
if (p) kv = *p;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (kv) {
|
if (kv == NULL) {
|
||||||
int32_t colLen = kv->length;
|
continue;
|
||||||
|
}
|
||||||
if (pColSchema->type == TSDB_DATA_TYPE_TIMESTAMP) {
|
if (pColSchema->type == TSDB_DATA_TYPE_TIMESTAMP) {
|
||||||
uDebug("SML:data before:%" PRId64 ", precision:%d", kv->i, pTableMeta->tableInfo.precision);
|
|
||||||
kv->i = convertTimePrecision(kv->i, TSDB_TIME_PRECISION_NANO, pTableMeta->tableInfo.precision);
|
kv->i = convertTimePrecision(kv->i, TSDB_TIME_PRECISION_NANO, pTableMeta->tableInfo.precision);
|
||||||
uDebug("SML:data after:%" PRId64 ", precision:%d", kv->i, pTableMeta->tableInfo.precision);
|
|
||||||
}
|
}
|
||||||
|
if (kv->type == TSDB_DATA_TYPE_NCHAR){
|
||||||
if (IS_VAR_DATA_TYPE(kv->type)) {
|
int32_t len = 0;
|
||||||
insMemRowAppend(&pBuf, kv->value, colLen, ¶m);
|
char* pUcs4 = taosMemoryCalloc(1, pColSchema->bytes - VARSTR_HEADER_SIZE);
|
||||||
|
if (NULL == pUcs4) {
|
||||||
|
ret = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
if (!taosMbsToUcs4(kv->value, kv->length, (TdUcs4*)pUcs4, pColSchema->bytes - VARSTR_HEADER_SIZE, &len)) {
|
||||||
|
if (errno == E2BIG) {
|
||||||
|
buildInvalidOperationMsg(&pBuf, "value too long");
|
||||||
|
ret = TSDB_CODE_PAR_VALUE_TOO_LONG;
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
ret = buildInvalidOperationMsg(&pBuf, strerror(errno));
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
pVal->value.pData = pUcs4;
|
||||||
|
pVal->value.nData = len;
|
||||||
|
} else if(kv->type == TSDB_DATA_TYPE_BINARY) {
|
||||||
|
pVal->value.nData = kv->length;
|
||||||
|
pVal->value.pData = (uint8_t *)kv->value;
|
||||||
} else {
|
} else {
|
||||||
insMemRowAppend(&pBuf, &(kv->value), colLen, ¶m);
|
memcpy(&pVal->value.val, &(kv->value), kv->length);
|
||||||
}
|
}
|
||||||
} else {
|
pVal->flag = CV_FLAG_VALUE;
|
||||||
pBuilder->hasNone = true;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (PRIMARYKEY_TIMESTAMP_COL_ID == pColSchema->colId) {
|
SRow** pRow = taosArrayReserve(pTableCxt->pData->aRowP, 1);
|
||||||
TSKEY tsKey = TD_ROW_KEY(row);
|
ret = tRowBuild(pTableCxt->pValues, pTableCxt->pSchema, pRow);
|
||||||
insCheckTimestamp(pDataBlock, (const char*)&tsKey);
|
if (TSDB_CODE_SUCCESS != ret) {
|
||||||
|
buildInvalidOperationMsg(&pBuf, "tRowBuild error");
|
||||||
|
goto end;
|
||||||
}
|
}
|
||||||
|
insCheckTableDataOrder(pTableCxt, TD_ROW_KEY(*pRow));
|
||||||
}
|
}
|
||||||
|
|
||||||
// set the null value for the columns that do not assign values
|
end:
|
||||||
if ((spd->numOfBound < spd->numOfCols) && TD_IS_TP_ROW(row)) {
|
destroyBoundColInfo(&bindTags);
|
||||||
pBuilder->hasNone = true;
|
taosMemoryFree(pCreateTblReq);
|
||||||
|
taosArrayDestroy(tagName);
|
||||||
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
tdSRowEnd(pBuilder);
|
SQuery* smlInitHandle() {
|
||||||
pDataBlock->size += extendedRowSize;
|
SQuery *pQuery = (SQuery *)nodesMakeNode(QUERY_NODE_QUERY);
|
||||||
|
if (NULL == pQuery) {
|
||||||
|
uError("create pQuery error");
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
|
||||||
|
pQuery->haveResultSet = false;
|
||||||
|
pQuery->msgType = TDMT_VND_SUBMIT;
|
||||||
|
SVnodeModifOpStmt *stmt = (SVnodeModifOpStmt*)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT);
|
||||||
|
if (NULL == stmt) {
|
||||||
|
uError("create SVnodeModifOpStmt error");
|
||||||
|
qDestroyQuery(pQuery);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
stmt->pVgroupsHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
|
||||||
|
stmt->pTableBlockHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
|
||||||
|
stmt->freeHashFunc = insDestroyTableDataCxtHashMap;
|
||||||
|
stmt->freeArrayFunc = insDestroyVgroupDataCxtList;
|
||||||
|
|
||||||
|
pQuery->pRoot = (SNode *)stmt;
|
||||||
|
return pQuery;
|
||||||
}
|
}
|
||||||
|
|
||||||
SSubmitBlk* pBlocks = (SSubmitBlk*)(pDataBlock->pData);
|
int32_t smlBuildOutput(SQuery * handle, SHashObj* pVgHash) {
|
||||||
return insSetBlockInfo(pBlocks, pDataBlock, rowNum, &pBuf);
|
SVnodeModifOpStmt *pStmt = (SVnodeModifOpStmt*)(handle)->pRoot;
|
||||||
|
// merge according to vgId
|
||||||
|
int32_t code = insMergeTableDataCxt(pStmt->pTableBlockHashObj, &pStmt->pVgDataBlocks);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
uError("insMergeTableDataCxt failed");
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
code = insBuildVgDataBlocks(pVgHash, pStmt->pVgDataBlocks, &pStmt->pDataBlocks);
|
||||||
void* smlInitHandle(SQuery* pQuery) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
SSmlExecHandle* handle = taosMemoryCalloc(1, sizeof(SSmlExecHandle));
|
uError("insBuildVgDataBlocks failed");
|
||||||
if (!handle) return NULL;
|
return code;
|
||||||
handle->pBlockHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
|
|
||||||
handle->pQuery = pQuery;
|
|
||||||
|
|
||||||
return handle;
|
|
||||||
}
|
}
|
||||||
|
return code;
|
||||||
void smlDestroyHandle(void* pHandle) {
|
|
||||||
if (!pHandle) return;
|
|
||||||
SSmlExecHandle* handle = (SSmlExecHandle*)pHandle;
|
|
||||||
insDestroyBlockHashmap(handle->pBlockHash);
|
|
||||||
smlDestroyTableHandle(&handle->tableExecHandle);
|
|
||||||
taosMemoryFree(handle);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t smlBuildOutput(void* handle, SHashObj* pVgHash) {
|
|
||||||
SSmlExecHandle* smlHandle = (SSmlExecHandle*)handle;
|
|
||||||
return qBuildStmtOutput(smlHandle->pQuery, pVgHash, smlHandle->pBlockHash);
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1371,8 +1371,6 @@ static int32_t parseInsertTableClauseBottom(SInsertParseContext* pCxt, SVnodeMod
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void destroyBoundColInfo(SBoundColInfo* pInfo) { taosMemoryFreeClear(pInfo->pColIndex); }
|
|
||||||
|
|
||||||
static void resetEnvPreTable(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt) {
|
static void resetEnvPreTable(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt) {
|
||||||
destroyBoundColInfo(&pCxt->tags);
|
destroyBoundColInfo(&pCxt->tags);
|
||||||
taosMemoryFreeClear(pStmt->pTableMeta);
|
taosMemoryFreeClear(pStmt->pTableMeta);
|
||||||
|
|
|
@ -22,13 +22,6 @@
|
||||||
#include "ttime.h"
|
#include "ttime.h"
|
||||||
#include "ttypes.h"
|
#include "ttypes.h"
|
||||||
|
|
||||||
typedef struct SKvParam {
|
|
||||||
int16_t pos;
|
|
||||||
SArray* pTagVals;
|
|
||||||
SSchema* schema;
|
|
||||||
char buf[TSDB_MAX_TAGS_LEN];
|
|
||||||
} SKvParam;
|
|
||||||
|
|
||||||
int32_t qBuildStmtOutput(SQuery* pQuery, SHashObj* pVgHash, SHashObj* pBlockHash) {
|
int32_t qBuildStmtOutput(SQuery* pQuery, SHashObj* pVgHash, SHashObj* pBlockHash) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
SArray* pVgDataBlocks = NULL;
|
SArray* pVgDataBlocks = NULL;
|
||||||
|
|
|
@ -340,8 +340,8 @@ void insDestroyBlockHashmap(SHashObj* pDataBlockHash) {
|
||||||
|
|
||||||
void** p1 = taosHashIterate(pDataBlockHash, NULL);
|
void** p1 = taosHashIterate(pDataBlockHash, NULL);
|
||||||
while (p1) {
|
while (p1) {
|
||||||
STableDataBlocks* pBlocks = *p1;
|
SBoundColInfo* pBlocks = *p1;
|
||||||
insDestroyDataBlock(pBlocks);
|
destroyBoundColInfo(pBlocks);
|
||||||
|
|
||||||
p1 = taosHashIterate(pDataBlockHash, p1);
|
p1 = taosHashIterate(pDataBlockHash, p1);
|
||||||
}
|
}
|
||||||
|
@ -1083,8 +1083,9 @@ static int32_t createTableDataCxt(STableMeta* pTableMeta, SVCreateTbReq** pCreat
|
||||||
|
|
||||||
int32_t insGetTableDataCxt(SHashObj* pHash, void* id, int32_t idLen, STableMeta* pTableMeta,
|
int32_t insGetTableDataCxt(SHashObj* pHash, void* id, int32_t idLen, STableMeta* pTableMeta,
|
||||||
SVCreateTbReq** pCreateTbReq, STableDataCxt** pTableCxt, bool colMode) {
|
SVCreateTbReq** pCreateTbReq, STableDataCxt** pTableCxt, bool colMode) {
|
||||||
*pTableCxt = taosHashGet(pHash, id, idLen);
|
STableDataCxt** tmp = (STableDataCxt**)taosHashGet(pHash, id, idLen);
|
||||||
if (NULL != *pTableCxt) {
|
if (NULL != tmp) {
|
||||||
|
*pTableCxt = *tmp;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
int32_t code = createTableDataCxt(pTableMeta, pCreateTbReq, pTableCxt, colMode);
|
int32_t code = createTableDataCxt(pTableMeta, pCreateTbReq, pTableCxt, colMode);
|
||||||
|
|
Loading…
Reference in New Issue