Merge pull request #11985 from taosdata/feature/qnode
fix: fix taos shell crash issue
This commit is contained in:
commit
e79850164b
|
@ -82,6 +82,8 @@ extern char *qtypeStr[];
|
|||
|
||||
#define TSDB_PORT_HTTP 11
|
||||
|
||||
#undef TD_DEBUG_PRINT_ROW
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -214,6 +214,7 @@ STSRow *tdRowDup(STSRow *row);
|
|||
static FORCE_INLINE SKvRowIdx *tdKvRowColIdxAt(STSRow *pRow, col_id_t idx) {
|
||||
return (SKvRowIdx *)TD_ROW_COL_IDX(pRow) + idx;
|
||||
}
|
||||
|
||||
static FORCE_INLINE int16_t tdKvRowColIdAt(STSRow *pRow, col_id_t idx) {
|
||||
ASSERT(idx >= 0);
|
||||
if (idx == 0) {
|
||||
|
@ -222,6 +223,7 @@ static FORCE_INLINE int16_t tdKvRowColIdAt(STSRow *pRow, col_id_t idx) {
|
|||
|
||||
return ((SKvRowIdx *)TD_ROW_COL_IDX(pRow) + idx - 1)->colId;
|
||||
}
|
||||
|
||||
static FORCE_INLINE void *tdKVRowColVal(STSRow *pRow, SKvRowIdx *pIdx) { return POINTER_SHIFT(pRow, pIdx->offset); }
|
||||
|
||||
#define TD_ROW_OFFSET(p) ((p)->toffset); // During ParseInsert when without STSchema, how to get the offset for STpRow?
|
||||
|
@ -1117,7 +1119,7 @@ static FORCE_INLINE bool tdGetKvRowValOfColEx(STSRowIter *pIter, col_id_t colId,
|
|||
}
|
||||
|
||||
if (!colFound) {
|
||||
if(colId <= pIter->maxColId) {
|
||||
if (colId <= pIter->maxColId) {
|
||||
pVal->valType = TD_VTYPE_NONE;
|
||||
return true;
|
||||
} else {
|
||||
|
@ -1367,14 +1369,14 @@ static void tdSCellValPrint(SCellVal *pVal, int8_t colType) {
|
|||
}
|
||||
}
|
||||
|
||||
static void tdSRowPrint(STSRow *row, STSchema *pSchema) {
|
||||
static void tdSRowPrint(STSRow *row, STSchema *pSchema, const char* tag) {
|
||||
STSRowIter iter = {0};
|
||||
tdSTSRowIterInit(&iter, pSchema);
|
||||
tdSTSRowIterReset(&iter, row);
|
||||
printf(">>>");
|
||||
printf("%s >>>", tag);
|
||||
for (int i = 0; i < pSchema->numOfCols; ++i) {
|
||||
STColumn *stCol = pSchema->columns + i;
|
||||
SCellVal sVal = { 255, NULL};
|
||||
SCellVal sVal = {.valType = 255, .val = NULL};
|
||||
if (!tdSTSRowIterNext(&iter, stCol->colId, stCol->type, &sVal)) {
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -340,7 +340,7 @@ typedef struct SQueryPlan {
|
|||
int32_t numOfSubplans;
|
||||
SNodeList* pSubplans; // Element is SNodeListNode. The execution level of subplan, starting from 0.
|
||||
SExplainInfo explainInfo;
|
||||
SNodeList* pPlaceholderValues;
|
||||
SArray* pPlaceholderValues;
|
||||
} SQueryPlan;
|
||||
|
||||
void nodesWalkPhysiPlan(SNode* pNode, FNodeWalker walker, void* pContext);
|
||||
|
|
|
@ -73,6 +73,7 @@ typedef struct SQuery {
|
|||
SArray* pDbList;
|
||||
SArray* pTableList;
|
||||
bool showRewrite;
|
||||
int32_t placeholderNum;
|
||||
} SQuery;
|
||||
|
||||
int32_t qParseQuerySql(SParseContext* pCxt, SQuery** pQuery);
|
||||
|
|
|
@ -34,7 +34,7 @@ typedef struct SPlanContext {
|
|||
bool showRewrite;
|
||||
int8_t triggerType;
|
||||
int64_t watermark;
|
||||
bool isStmtQuery;
|
||||
int32_t placeholderNum;
|
||||
void* pTransporter;
|
||||
struct SCatalog* pCatalog;
|
||||
char* pMsg;
|
||||
|
@ -50,7 +50,7 @@ int32_t qCreateQueryPlan(SPlanContext* pCxt, SQueryPlan** pPlan, SArray* pExecNo
|
|||
// @pSource one execution location of this group of datasource subplans
|
||||
int32_t qSetSubplanExecutionNode(SSubplan* pSubplan, int32_t groupId, SDownstreamSourceNode* pSource);
|
||||
|
||||
int32_t qStmtBindParam(SQueryPlan* pPlan, TAOS_MULTI_BIND* pParams, int32_t colIdx);
|
||||
int32_t qStmtBindParam(SQueryPlan* pPlan, TAOS_MULTI_BIND* pParams, int32_t colIdx, uint64_t queryId);
|
||||
|
||||
// Convert to subplan to string for the scheduler to send to the executor
|
||||
int32_t qSubPlanToString(const SSubplan* pSubplan, char** pStr, int32_t* pLen);
|
||||
|
|
|
@ -46,6 +46,12 @@ typedef struct SStmtTableCache {
|
|||
void* boundTags;
|
||||
} SStmtTableCache;
|
||||
|
||||
typedef struct SQueryFields {
|
||||
TAOS_FIELD* fields;
|
||||
TAOS_FIELD* userFields;
|
||||
uint32_t numOfCols;
|
||||
} SQueryFields;
|
||||
|
||||
typedef struct SStmtBindInfo {
|
||||
bool needParse;
|
||||
uint64_t tbUid;
|
||||
|
@ -66,16 +72,17 @@ typedef struct SStmtExecInfo {
|
|||
} SStmtExecInfo;
|
||||
|
||||
typedef struct SStmtSQLInfo {
|
||||
STMT_TYPE type;
|
||||
STMT_STATUS status;
|
||||
bool autoCreate;
|
||||
uint64_t runTimes;
|
||||
SHashObj* pTableCache; //SHash<SStmtTableCache>
|
||||
SQuery* pQuery;
|
||||
char* sqlStr;
|
||||
int32_t sqlLen;
|
||||
SArray* nodeList;
|
||||
SQueryPlan* pQueryPlan;
|
||||
STMT_TYPE type;
|
||||
STMT_STATUS status;
|
||||
bool autoCreate;
|
||||
uint64_t runTimes;
|
||||
SHashObj* pTableCache; //SHash<SStmtTableCache>
|
||||
SQuery* pQuery;
|
||||
char* sqlStr;
|
||||
int32_t sqlLen;
|
||||
SArray* nodeList;
|
||||
SQueryPlan* pQueryPlan;
|
||||
SQueryFields fields;
|
||||
} SStmtSQLInfo;
|
||||
|
||||
typedef struct STscStmt {
|
||||
|
|
|
@ -234,7 +234,8 @@ int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArra
|
|||
.showRewrite = pQuery->showRewrite,
|
||||
.pTransporter = pRequest->pTscObj->pAppInfo->pTransporter,
|
||||
.pMsg = pRequest->msgBuf,
|
||||
.msgLen = ERROR_MSG_BUF_DEFAULT_SIZE};
|
||||
.msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
|
||||
.placeholderNum = pQuery->placeholderNum};
|
||||
int32_t code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &cxt.pCatalog);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = qCreateQueryPlan(&cxt, pPlan, pNodeList);
|
||||
|
|
|
@ -73,6 +73,22 @@ int32_t stmtGetTbName(TAOS_STMT *stmt, char **tbName) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t stmtBackupQueryFields(STscStmt* pStmt) {
|
||||
SQueryFields *pFields = &pStmt->sql.fields;
|
||||
int32_t size = pFields->numOfCols * sizeof(TAOS_FIELD);
|
||||
|
||||
pFields->numOfCols = pStmt->exec.pRequest->body.resInfo.numOfCols;
|
||||
pFields->fields = taosMemoryMalloc(size);
|
||||
pFields->userFields = taosMemoryMalloc(size);
|
||||
if (NULL == pFields->fields || NULL == pFields->userFields) {
|
||||
STMT_ERR_RET(TSDB_CODE_TSC_OUT_OF_MEMORY);
|
||||
}
|
||||
memcpy(pFields->fields, pStmt->exec.pRequest->body.resInfo.fields, size);
|
||||
memcpy(pFields->userFields, pStmt->exec.pRequest->body.resInfo.userFields, size);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t stmtSetBindInfo(TAOS_STMT* stmt, STableMeta* pTableMeta, void* tags) {
|
||||
STscStmt* pStmt = (STscStmt*)stmt;
|
||||
|
||||
|
@ -258,37 +274,42 @@ int32_t stmtGetFromCache(STscStmt* pStmt) {
|
|||
STableMeta *pTableMeta = NULL;
|
||||
SEpSet ep = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp);
|
||||
STMT_ERR_RET(catalogGetTableMeta(pStmt->pCatalog, pStmt->taos->pAppInfo->pTransporter, &ep, &pStmt->bInfo.sname, &pTableMeta));
|
||||
|
||||
if (pTableMeta->uid == pStmt->bInfo.tbUid) {
|
||||
uint64_t uid = pTableMeta->uid;
|
||||
uint64_t suid = pTableMeta->suid;
|
||||
int8_t tableType = pTableMeta->tableType;
|
||||
taosMemoryFree(pTableMeta);
|
||||
|
||||
if (uid == pStmt->bInfo.tbUid) {
|
||||
pStmt->bInfo.needParse = false;
|
||||
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (taosHashGet(pStmt->exec.pBlockHash, &pTableMeta->uid, sizeof(pTableMeta->uid))) {
|
||||
SStmtTableCache* pCache = taosHashGet(pStmt->sql.pTableCache, &pTableMeta->uid, sizeof(pTableMeta->uid));
|
||||
if (taosHashGet(pStmt->exec.pBlockHash, &uid, sizeof(uid))) {
|
||||
SStmtTableCache* pCache = taosHashGet(pStmt->sql.pTableCache, &uid, sizeof(uid));
|
||||
if (NULL == pCache) {
|
||||
tscError("table uid %" PRIx64 "found in exec blockHash, but not in sql blockHash", pTableMeta->uid);
|
||||
tscError("table uid %" PRIx64 "found in exec blockHash, but not in sql blockHash", uid);
|
||||
|
||||
STMT_ERR_RET(TSDB_CODE_TSC_APP_ERROR);
|
||||
}
|
||||
|
||||
pStmt->bInfo.needParse = false;
|
||||
|
||||
pStmt->bInfo.tbUid = pTableMeta->uid;
|
||||
pStmt->bInfo.tbSuid = pTableMeta->suid;
|
||||
pStmt->bInfo.tbType = pTableMeta->tableType;
|
||||
pStmt->bInfo.tbUid = uid;
|
||||
pStmt->bInfo.tbSuid = suid;
|
||||
pStmt->bInfo.tbType = tableType;
|
||||
pStmt->bInfo.boundTags = pCache->boundTags;
|
||||
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SStmtTableCache* pCache = taosHashGet(pStmt->sql.pTableCache, &pTableMeta->uid, sizeof(pTableMeta->uid));
|
||||
SStmtTableCache* pCache = taosHashGet(pStmt->sql.pTableCache, &uid, sizeof(uid));
|
||||
if (pCache) {
|
||||
pStmt->bInfo.needParse = false;
|
||||
|
||||
pStmt->bInfo.tbUid = pTableMeta->uid;
|
||||
pStmt->bInfo.tbSuid = pTableMeta->suid;
|
||||
pStmt->bInfo.tbType = pTableMeta->tableType;
|
||||
pStmt->bInfo.tbUid = uid;
|
||||
pStmt->bInfo.tbSuid = suid;
|
||||
pStmt->bInfo.tbType = tableType;
|
||||
pStmt->bInfo.boundTags = pCache->boundTags;
|
||||
|
||||
STableDataBlocks* pNewBlock = NULL;
|
||||
|
@ -475,9 +496,10 @@ int stmtBindBatch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind, int32_t colIdx) {
|
|||
STMT_ERR_RET(getQueryPlan(pStmt->exec.pRequest, pStmt->sql.pQuery, &pStmt->sql.nodeList));
|
||||
pStmt->sql.pQueryPlan = pStmt->exec.pRequest->body.pDag;
|
||||
pStmt->exec.pRequest->body.pDag = NULL;
|
||||
STMT_ERR_RET(stmtBackupQueryFields(pStmt));
|
||||
}
|
||||
|
||||
STMT_RET(qStmtBindParam(pStmt->sql.pQueryPlan, bind, colIdx));
|
||||
STMT_RET(qStmtBindParam(pStmt->sql.pQueryPlan, bind, colIdx, pStmt->exec.pRequest->requestId));
|
||||
}
|
||||
|
||||
STableDataBlocks **pDataBlock = (STableDataBlocks**)taosHashGet(pStmt->exec.pBlockHash, (const char*)&pStmt->bInfo.tbUid, sizeof(pStmt->bInfo.tbUid));
|
||||
|
@ -549,6 +571,8 @@ int stmtClose(TAOS_STMT *stmt) {
|
|||
STscStmt* pStmt = (STscStmt*)stmt;
|
||||
|
||||
STMT_RET(stmtCleanSQLInfo(pStmt));
|
||||
|
||||
taosMemoryFree(stmt);
|
||||
}
|
||||
|
||||
const char *stmtErrstr(TAOS_STMT *stmt) {
|
||||
|
@ -601,7 +625,7 @@ int stmtGetParamNum(TAOS_STMT *stmt, int *nums) {
|
|||
pStmt->exec.pRequest->body.pDag = NULL;
|
||||
}
|
||||
|
||||
*nums = (pStmt->sql.pQueryPlan->pPlaceholderValues) ? pStmt->sql.pQueryPlan->pPlaceholderValues->length : 0;
|
||||
*nums = taosArrayGetSize(pStmt->sql.pQueryPlan->pPlaceholderValues);
|
||||
} else {
|
||||
STMT_ERR_RET(stmtFetchColFields(stmt, nums, NULL));
|
||||
}
|
||||
|
|
|
@ -1490,6 +1490,10 @@ static void mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capacit
|
|||
pSchema1 = metaGetTbTSchema(REPO_META(pTsdbReadHandle->pTsdb), uid, TD_ROW_SVER(row1));
|
||||
}
|
||||
|
||||
#ifdef TD_DEBUG_PRINT_ROW
|
||||
tdSRowPrint(row1, pSchema1, __func__);
|
||||
#endif
|
||||
|
||||
if (isRow1DataRow) {
|
||||
numOfColsOfRow1 = schemaNCols(pSchema1);
|
||||
} else {
|
||||
|
|
|
@ -423,6 +423,9 @@ static EDealRes dispatchPhysiPlan(SNode* pNode, ETraversalOrder order, FNodeWalk
|
|||
EDealRes res = DEAL_RES_CONTINUE;
|
||||
|
||||
switch (nodeType(pNode)) {
|
||||
case QUERY_NODE_NODE_LIST:
|
||||
res = walkPhysiPlans(((SNodeListNode*)pNode)->pNodeList, order, walker, pContext);
|
||||
break;
|
||||
case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN:
|
||||
res = walkScanPhysi((SScanPhysiNode*)pNode, order, walker, pContext);
|
||||
break;
|
||||
|
@ -534,10 +537,7 @@ static EDealRes dispatchPhysiPlan(SNode* pNode, ETraversalOrder order, FNodeWalk
|
|||
break;
|
||||
case QUERY_NODE_PHYSICAL_SUBPLAN: {
|
||||
SSubplan* pSubplan = (SSubplan*)pNode;
|
||||
res = walkPhysiNode((SPhysiNode*)pNode, order, walker, pContext);
|
||||
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
|
||||
res = walkPhysiPlans(pSubplan->pChildren, order, walker, pContext);
|
||||
}
|
||||
res = walkPhysiPlans(pSubplan->pChildren, order, walker, pContext);
|
||||
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
|
||||
res = walkPhysiPlan((SNode*)pSubplan->pNode, order, walker, pContext);
|
||||
}
|
||||
|
|
|
@ -44,7 +44,7 @@ void initAstCreateContext(SParseContext* pParseCxt, SAstCreateContext* pCxt) {
|
|||
pCxt->notSupport = false;
|
||||
pCxt->valid = true;
|
||||
pCxt->pRootNode = NULL;
|
||||
pCxt->placeholderNo = 1;
|
||||
pCxt->placeholderNo = 0;
|
||||
}
|
||||
|
||||
static void copyStringFormStringToken(SToken* pToken, char* pBuf, int32_t len) {
|
||||
|
@ -315,7 +315,7 @@ SNode* createPlaceholderValueNode(SAstCreateContext* pCxt, const SToken* pLitera
|
|||
CHECK_OUT_OF_MEM(val);
|
||||
val->literal = strndup(pLiteral->z, pLiteral->n);
|
||||
CHECK_OUT_OF_MEM(val->literal);
|
||||
val->placeholderNo = pCxt->placeholderNo++;
|
||||
val->placeholderNo = ++pCxt->placeholderNo;
|
||||
return (SNode*)val;
|
||||
}
|
||||
|
||||
|
@ -380,6 +380,11 @@ SNode* createCastFunctionNode(SAstCreateContext* pCxt, SNode* pExpr, SDataType d
|
|||
CHECK_OUT_OF_MEM(func);
|
||||
strcpy(func->functionName, "cast");
|
||||
func->node.resType = dt;
|
||||
if (TSDB_DATA_TYPE_BINARY == dt.type) {
|
||||
func->node.resType.bytes += 2;
|
||||
} else if (TSDB_DATA_TYPE_NCHAR == dt.type) {
|
||||
func->node.resType.bytes = func->node.resType.bytes * TSDB_NCHAR_SIZE + 2;
|
||||
}
|
||||
nodesListMakeAppend(&func->pParameterList, pExpr);
|
||||
return (SNode*)func;
|
||||
}
|
||||
|
|
|
@ -80,6 +80,7 @@ abort_parse:
|
|||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
(*pQuery)->pRoot = cxt.pRootNode;
|
||||
(*pQuery)->placeholderNum = cxt.placeholderNo;
|
||||
}
|
||||
return cxt.valid ? TSDB_CODE_SUCCESS : TSDB_CODE_FAILED;
|
||||
}
|
||||
|
|
|
@ -937,6 +937,11 @@ static int parseOneRow(SInsertParseContext* pCxt, STableDataBlocks* pDataBlocks,
|
|||
}
|
||||
|
||||
*gotRow = true;
|
||||
#ifdef TD_DEBUG_PRINT_ROW
|
||||
STSchema* pSTSchema = tdGetSTSChemaFromSSChema(&schema, spd->numOfCols);
|
||||
tdSRowPrint(row, pSTSchema, __func__);
|
||||
taosMemoryFree(pSTSchema);
|
||||
#endif
|
||||
}
|
||||
|
||||
// *len = pBuilder->extendedRowSize;
|
||||
|
@ -1328,10 +1333,6 @@ int32_t qBindStmtColsValue(void *pBlock, TAOS_MULTI_BIND *bind, char *msgBuf, in
|
|||
for (int c = 0; c < spd->numOfBound; ++c) {
|
||||
SSchema* pColSchema = &pSchema[spd->boundColumns[c] - 1];
|
||||
|
||||
if (bind[c].buffer_type != pColSchema->type) {
|
||||
return buildInvalidOperationMsg(&pBuf, "column type mis-match with buffer type");
|
||||
}
|
||||
|
||||
if (bind[c].num != rowNum) {
|
||||
return buildInvalidOperationMsg(&pBuf, "row number in each bind param should be the same");
|
||||
}
|
||||
|
@ -1346,6 +1347,10 @@ int32_t qBindStmtColsValue(void *pBlock, TAOS_MULTI_BIND *bind, char *msgBuf, in
|
|||
|
||||
CHECK_CODE(MemRowAppend(&pBuf, NULL, 0, ¶m));
|
||||
} else {
|
||||
if (bind[c].buffer_type != pColSchema->type) {
|
||||
return buildInvalidOperationMsg(&pBuf, "column type mis-match with buffer type");
|
||||
}
|
||||
|
||||
int32_t colLen = pColSchema->bytes;
|
||||
if (IS_VAR_DATA_TYPE(pColSchema->type)) {
|
||||
colLen = bind[c].length[r];
|
||||
|
@ -1359,7 +1364,6 @@ int32_t qBindStmtColsValue(void *pBlock, TAOS_MULTI_BIND *bind, char *msgBuf, in
|
|||
checkTimestamp(pDataBlock, (const char*)&tsKey);
|
||||
}
|
||||
}
|
||||
|
||||
// set the null value for the columns that do not assign values
|
||||
if ((spd->numOfBound < spd->numOfCols) && TD_IS_TP_ROW(row)) {
|
||||
for (int32_t i = 0; i < spd->numOfCols; ++i) {
|
||||
|
@ -1369,6 +1373,11 @@ int32_t qBindStmtColsValue(void *pBlock, TAOS_MULTI_BIND *bind, char *msgBuf, in
|
|||
}
|
||||
}
|
||||
}
|
||||
#ifdef TD_DEBUG_PRINT_ROW
|
||||
STSchema* pSTSchema = tdGetSTSChemaFromSSChema(&pSchema, spd->numOfCols);
|
||||
tdSRowPrint(row, pSTSchema, __func__);
|
||||
taosMemoryFree(pSTSchema);
|
||||
#endif
|
||||
|
||||
pDataBlock->size += extendedRowSize;
|
||||
}
|
||||
|
@ -1447,6 +1456,14 @@ int32_t qBindStmtSingleColValue(void *pBlock, TAOS_MULTI_BIND *bind, char *msgBu
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
#ifdef TD_DEBUG_PRINT_ROW
|
||||
if(rowEnd) {
|
||||
STSchema* pSTSchema = tdGetSTSChemaFromSSChema(&pSchema, spd->numOfCols);
|
||||
tdSRowPrint(row, pSTSchema, __func__);
|
||||
taosMemoryFree(pSTSchema);
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
if (rowEnd) {
|
||||
|
|
|
@ -19,26 +19,23 @@
|
|||
|
||||
typedef struct SCollectPlaceholderValuesCxt {
|
||||
int32_t errCode;
|
||||
SNodeList* pValues;
|
||||
SArray* pValues;
|
||||
} SCollectPlaceholderValuesCxt;
|
||||
|
||||
static EDealRes collectPlaceholderValuesImpl(SNode* pNode, void* pContext) {
|
||||
if (QUERY_NODE_VALUE == nodeType(pNode) && ((SValueNode*)pNode)->placeholderNo > 0) {
|
||||
SCollectPlaceholderValuesCxt* pCxt = pContext;
|
||||
pCxt->errCode = nodesListMakeAppend(&pCxt->pValues, pNode);
|
||||
taosArrayInsert(pCxt->pValues, ((SValueNode*)pNode)->placeholderNo - 1, &pNode);
|
||||
return TSDB_CODE_SUCCESS == pCxt->errCode ? DEAL_RES_IGNORE_CHILD : DEAL_RES_ERROR;
|
||||
}
|
||||
return DEAL_RES_CONTINUE;
|
||||
}
|
||||
|
||||
static int32_t collectPlaceholderValues(SPlanContext* pCxt, SQueryPlan* pPlan) {
|
||||
SCollectPlaceholderValuesCxt cxt = {.errCode = TSDB_CODE_SUCCESS, .pValues = NULL};
|
||||
pPlan->pPlaceholderValues = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES);
|
||||
|
||||
SCollectPlaceholderValuesCxt cxt = {.errCode = TSDB_CODE_SUCCESS, .pValues = pPlan->pPlaceholderValues};
|
||||
nodesWalkPhysiPlan((SNode*)pPlan, collectPlaceholderValuesImpl, &cxt);
|
||||
if (TSDB_CODE_SUCCESS == cxt.errCode) {
|
||||
pPlan->pPlaceholderValues = cxt.pValues;
|
||||
} else {
|
||||
nodesDestroyList(cxt.pValues);
|
||||
}
|
||||
return cxt.errCode;
|
||||
}
|
||||
|
||||
|
@ -60,7 +57,7 @@ int32_t qCreateQueryPlan(SPlanContext* pCxt, SQueryPlan** pPlan, SArray* pExecNo
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = createPhysiPlan(pCxt, pLogicPlan, pPlan, pExecNodeList);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code && pCxt->isStmtQuery) {
|
||||
if (TSDB_CODE_SUCCESS == code && pCxt->placeholderNum > 0) {
|
||||
code = collectPlaceholderValues(pCxt, *pPlan);
|
||||
}
|
||||
|
||||
|
@ -108,7 +105,7 @@ static int32_t setValueByBindParam(SValueNode* pVal, TAOS_MULTI_BIND* pParam) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
pVal->node.resType.type = pParam->buffer_type;
|
||||
pVal->node.resType.bytes = *(pParam->length);
|
||||
pVal->node.resType.bytes = NULL != pParam->length ? *(pParam->length) : tDataTypes[pParam->buffer_type].bytes;
|
||||
switch (pParam->buffer_type) {
|
||||
case TSDB_DATA_TYPE_BOOL:
|
||||
pVal->datum.b = *((bool*)pParam->buffer);
|
||||
|
@ -133,6 +130,7 @@ static int32_t setValueByBindParam(SValueNode* pVal, TAOS_MULTI_BIND* pParam) {
|
|||
break;
|
||||
case TSDB_DATA_TYPE_VARCHAR:
|
||||
case TSDB_DATA_TYPE_VARBINARY:
|
||||
case TSDB_DATA_TYPE_NCHAR:
|
||||
pVal->datum.p = taosMemoryCalloc(1, pVal->node.resType.bytes + VARSTR_HEADER_SIZE + 1);
|
||||
if (NULL == pVal->datum.p) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -155,7 +153,6 @@ static int32_t setValueByBindParam(SValueNode* pVal, TAOS_MULTI_BIND* pParam) {
|
|||
case TSDB_DATA_TYPE_UBIGINT:
|
||||
pVal->datum.u = *((uint64_t*)pParam->buffer);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_NCHAR:
|
||||
case TSDB_DATA_TYPE_JSON:
|
||||
case TSDB_DATA_TYPE_DECIMAL:
|
||||
case TSDB_DATA_TYPE_BLOB:
|
||||
|
@ -168,18 +165,35 @@ static int32_t setValueByBindParam(SValueNode* pVal, TAOS_MULTI_BIND* pParam) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qStmtBindParam(SQueryPlan* pPlan, TAOS_MULTI_BIND* pParams, int32_t colIdx) {
|
||||
if (colIdx < 0) {
|
||||
int32_t index = 0;
|
||||
SNode* pNode = NULL;
|
||||
static EDealRes updatePlanQueryId(SNode* pNode, void* pContext) {
|
||||
int64_t queryId = *(uint64_t *)pContext;
|
||||
|
||||
if (QUERY_NODE_PHYSICAL_PLAN == nodeType(pNode)) {
|
||||
SQueryPlan* planNode = (SQueryPlan*)pNode;
|
||||
planNode->queryId = queryId;
|
||||
} else if (QUERY_NODE_PHYSICAL_SUBPLAN == nodeType(pNode)) {
|
||||
SSubplan* subplanNode = (SSubplan*)pNode;
|
||||
subplanNode->id.queryId = queryId;
|
||||
}
|
||||
|
||||
FOREACH(pNode, pPlan->pPlaceholderValues) {
|
||||
setValueByBindParam((SValueNode*)pNode, pParams + index);
|
||||
++index;
|
||||
return DEAL_RES_CONTINUE;
|
||||
}
|
||||
|
||||
int32_t qStmtBindParam(SQueryPlan* pPlan, TAOS_MULTI_BIND* pParams, int32_t colIdx, uint64_t queryId) {
|
||||
int32_t size = taosArrayGetSize(pPlan->pPlaceholderValues);
|
||||
|
||||
if (colIdx < 0) {
|
||||
for (int32_t i = 0; i < size; ++i) {
|
||||
setValueByBindParam((SValueNode*)taosArrayGetP(pPlan->pPlaceholderValues, i), pParams + i);
|
||||
}
|
||||
} else {
|
||||
setValueByBindParam((SValueNode*)nodesListGetNode(pPlan->pPlaceholderValues, colIdx), pParams);
|
||||
setValueByBindParam((SValueNode*)taosArrayGetP(pPlan->pPlaceholderValues, colIdx), pParams);
|
||||
}
|
||||
|
||||
if (colIdx < 0 || ((colIdx + 1) == size)) {
|
||||
nodesWalkPhysiPlan((SNode*)pPlan, updatePlanQueryId, &queryId);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
|
@ -34,7 +34,7 @@ typedef struct SScalarCtx {
|
|||
|
||||
#define SCL_IS_CONST_NODE(_node) ((NULL == (_node)) || (QUERY_NODE_VALUE == (_node)->type) || (QUERY_NODE_NODE_LIST == (_node)->type))
|
||||
#define SCL_IS_CONST_CALC(_ctx) (NULL == (_ctx)->pBlockList)
|
||||
#define SCL_IS_NULL_VALUE_NODE(_node) ((QUERY_NODE_VALUE == nodeType(_node)) && (TSDB_DATA_TYPE_NULL == ((SValueNode *)_node)->node.resType.type))
|
||||
#define SCL_IS_NULL_VALUE_NODE(_node) ((QUERY_NODE_VALUE == nodeType(_node)) && (TSDB_DATA_TYPE_NULL == ((SValueNode *)_node)->node.resType.type) && (((SValueNode *)_node)->placeholderNo <= 0))
|
||||
|
||||
#define sclFatal(...) qFatal(__VA_ARGS__)
|
||||
#define sclError(...) qError(__VA_ARGS__)
|
||||
|
|
|
@ -3541,11 +3541,16 @@ EDealRes fltReviseRewriter(SNode** pNode, void* pContext) {
|
|||
}
|
||||
|
||||
if (QUERY_NODE_VALUE == nodeType(*pNode)) {
|
||||
SValueNode *valueNode = (SValueNode *)*pNode;
|
||||
if (valueNode->placeholderNo >= 1) {
|
||||
stat->scalarMode = true;
|
||||
return DEAL_RES_CONTINUE;
|
||||
}
|
||||
|
||||
if (!FILTER_GET_FLAG(stat->info->options, FLT_OPTION_TIMESTAMP)) {
|
||||
return DEAL_RES_CONTINUE;
|
||||
}
|
||||
|
||||
SValueNode *valueNode = (SValueNode *)*pNode;
|
||||
if (TSDB_DATA_TYPE_BINARY != valueNode->node.resType.type && TSDB_DATA_TYPE_NCHAR != valueNode->node.resType.type) {
|
||||
return DEAL_RES_CONTINUE;
|
||||
}
|
||||
|
@ -3587,7 +3592,7 @@ EDealRes fltReviseRewriter(SNode** pNode, void* pContext) {
|
|||
return DEAL_RES_CONTINUE;
|
||||
}
|
||||
|
||||
if (node->opType == OP_TYPE_NOT_IN || node->opType == OP_TYPE_NOT_LIKE || node->opType > OP_TYPE_IS_NOT_NULL) {
|
||||
if (node->opType == OP_TYPE_NOT_IN || node->opType == OP_TYPE_NOT_LIKE || node->opType > OP_TYPE_IS_NOT_NULL || node->opType == OP_TYPE_NOT_EQUAL) {
|
||||
stat->scalarMode = true;
|
||||
return DEAL_RES_CONTINUE;
|
||||
}
|
||||
|
|
|
@ -505,6 +505,7 @@ EDealRes sclRewriteBasedOnOptr(SNode** pNode, SScalarCtx *ctx, EOperatorType opT
|
|||
}
|
||||
|
||||
res->node.resType.type = TSDB_DATA_TYPE_BOOL;
|
||||
res->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_BOOL].bytes;
|
||||
res->datum.b = false;
|
||||
|
||||
nodesDestroyNode(*pNode);
|
||||
|
@ -520,14 +521,14 @@ EDealRes sclRewriteOperatorForNullValue(SNode** pNode, SScalarCtx *ctx) {
|
|||
|
||||
if (node->pLeft && (QUERY_NODE_VALUE == nodeType(node->pLeft))) {
|
||||
SValueNode *valueNode = (SValueNode *)node->pLeft;
|
||||
if (TSDB_DATA_TYPE_NULL == valueNode->node.resType.type && (node->opType != OP_TYPE_IS_NULL && node->opType != OP_TYPE_IS_NOT_NULL)) {
|
||||
if (SCL_IS_NULL_VALUE_NODE(valueNode) && (node->opType != OP_TYPE_IS_NULL && node->opType != OP_TYPE_IS_NOT_NULL)) {
|
||||
return sclRewriteBasedOnOptr(pNode, ctx, node->opType);
|
||||
}
|
||||
}
|
||||
|
||||
if (node->pRight && (QUERY_NODE_VALUE == nodeType(node->pRight))) {
|
||||
SValueNode *valueNode = (SValueNode *)node->pRight;
|
||||
if (TSDB_DATA_TYPE_NULL == valueNode->node.resType.type && (node->opType != OP_TYPE_IS_NULL && node->opType != OP_TYPE_IS_NOT_NULL)) {
|
||||
if (SCL_IS_NULL_VALUE_NODE(valueNode) && (node->opType != OP_TYPE_IS_NULL && node->opType != OP_TYPE_IS_NOT_NULL)) {
|
||||
return sclRewriteBasedOnOptr(pNode, ctx, node->opType);
|
||||
}
|
||||
}
|
||||
|
@ -589,7 +590,10 @@ EDealRes sclRewriteFunction(SNode** pNode, SScalarCtx *ctx) {
|
|||
if (colDataIsNull_s(output.columnData, 0)) {
|
||||
res->node.resType.type = TSDB_DATA_TYPE_NULL;
|
||||
} else {
|
||||
res->node.resType = node->node.resType;
|
||||
res->node.resType.type = output.columnData->info.type;
|
||||
res->node.resType.bytes = output.columnData->info.bytes;
|
||||
res->node.resType.scale = output.columnData->info.scale;
|
||||
res->node.resType.precision = output.columnData->info.precision;
|
||||
int32_t type = output.columnData->info.type;
|
||||
if (IS_VAR_DATA_TYPE(type)) {
|
||||
res->datum.p = taosMemoryCalloc(res->node.resType.bytes + VARSTR_HEADER_SIZE + 1, 1);
|
||||
|
|
|
@ -658,11 +658,6 @@ int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp
|
|||
int16_t outputType = GET_PARAM_TYPE(&pOutput[0]);
|
||||
int64_t outputLen = GET_PARAM_BYTES(&pOutput[0]);
|
||||
|
||||
if (IS_VAR_DATA_TYPE(outputType)) {
|
||||
int32_t factor = (TSDB_DATA_TYPE_NCHAR == outputType) ? TSDB_NCHAR_SIZE : 1;
|
||||
outputLen = outputLen * factor + VARSTR_HEADER_SIZE;
|
||||
}
|
||||
|
||||
char *outputBuf = taosMemoryCalloc(outputLen * pInput[0].numOfRows, 1);
|
||||
char *output = outputBuf;
|
||||
|
||||
|
|
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue