Merge pull request #19720 from taosdata/feat/3.0_insert_explain
feat: explain insert select
This commit is contained in:
commit
c486530d23
|
@ -77,8 +77,8 @@ static void toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData* pIn
|
|||
|
||||
pBuf->useSize = sizeof(SDataCacheEntry);
|
||||
pEntry->dataLen = blockEncode(pInput->pData, pEntry->data, numOfCols);
|
||||
// ASSERT(pEntry->numOfRows == *(int32_t*)(pEntry->data + 8));
|
||||
// ASSERT(pEntry->numOfCols == *(int32_t*)(pEntry->data + 8 + 4));
|
||||
// ASSERT(pEntry->numOfRows == *(int32_t*)(pEntry->data + 8));
|
||||
// ASSERT(pEntry->numOfCols == *(int32_t*)(pEntry->data + 8 + 4));
|
||||
|
||||
pBuf->useSize += pEntry->dataLen;
|
||||
|
||||
|
@ -135,7 +135,7 @@ static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput,
|
|||
taosFreeQitem(pBuf);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
|
||||
toDataCacheEntry(pDispatcher, pInput, pBuf);
|
||||
taosWriteQitem(pDispatcher->pDataBlocks, pBuf);
|
||||
|
||||
|
@ -162,14 +162,16 @@ static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, bool* pQueryE
|
|||
|
||||
SDataDispatchBuf* pBuf = NULL;
|
||||
taosReadQitem(pDispatcher->pDataBlocks, (void**)&pBuf);
|
||||
memcpy(&pDispatcher->nextOutput, pBuf, sizeof(SDataDispatchBuf));
|
||||
taosFreeQitem(pBuf);
|
||||
if (pBuf != NULL) {
|
||||
memcpy(&pDispatcher->nextOutput, pBuf, sizeof(SDataDispatchBuf));
|
||||
taosFreeQitem(pBuf);
|
||||
}
|
||||
|
||||
SDataCacheEntry* pEntry = (SDataCacheEntry*)pDispatcher->nextOutput.pData;
|
||||
*pLen = pEntry->dataLen;
|
||||
|
||||
// ASSERT(pEntry->numOfRows == *(int32_t*)(pEntry->data + 8));
|
||||
// ASSERT(pEntry->numOfCols == *(int32_t*)(pEntry->data + 8 + 4));
|
||||
// ASSERT(pEntry->numOfRows == *(int32_t*)(pEntry->data + 8));
|
||||
// ASSERT(pEntry->numOfCols == *(int32_t*)(pEntry->data + 8 + 4));
|
||||
|
||||
*pQueryEnd = pDispatcher->queryEnd;
|
||||
qDebug("got data len %" PRId64 ", row num %d in sink", *pLen,
|
||||
|
@ -192,8 +194,8 @@ static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) {
|
|||
pOutput->numOfCols = pEntry->numOfCols;
|
||||
pOutput->compressed = pEntry->compressed;
|
||||
|
||||
// ASSERT(pEntry->numOfRows == *(int32_t*)(pEntry->data + 8));
|
||||
// ASSERT(pEntry->numOfCols == *(int32_t*)(pEntry->data + 8 + 4));
|
||||
// ASSERT(pEntry->numOfRows == *(int32_t*)(pEntry->data + 8));
|
||||
// ASSERT(pEntry->numOfCols == *(int32_t*)(pEntry->data + 8 + 4));
|
||||
|
||||
atomic_sub_fetch_64(&pDispatcher->cachedSize, pEntry->dataLen);
|
||||
atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pEntry->dataLen);
|
||||
|
|
|
@ -517,6 +517,7 @@ cmd ::= RESET QUERY CACHE.
|
|||
|
||||
/************************************************ explain *************************************************************/
|
||||
cmd ::= EXPLAIN analyze_opt(A) explain_options(B) query_or_subquery(C). { pCxt->pRootNode = createExplainStmt(pCxt, A, B, C); }
|
||||
cmd ::= EXPLAIN analyze_opt(A) explain_options(B) insert_query(C). { pCxt->pRootNode = createExplainStmt(pCxt, A, B, C); }
|
||||
|
||||
%type analyze_opt { bool }
|
||||
%destructor analyze_opt { }
|
||||
|
@ -599,9 +600,11 @@ cmd ::= DELETE FROM full_table_name(A) where_clause_opt(B).
|
|||
cmd ::= query_or_subquery(A). { pCxt->pRootNode = A; }
|
||||
|
||||
/************************************************ insert **************************************************************/
|
||||
cmd ::= INSERT INTO full_table_name(A)
|
||||
NK_LP col_name_list(B) NK_RP query_or_subquery(C). { pCxt->pRootNode = createInsertStmt(pCxt, A, B, C); }
|
||||
cmd ::= INSERT INTO full_table_name(A) query_or_subquery(B). { pCxt->pRootNode = createInsertStmt(pCxt, A, NULL, B); }
|
||||
cmd ::= insert_query(A). { pCxt->pRootNode = A; }
|
||||
|
||||
insert_query(A) ::= INSERT INTO full_table_name(D)
|
||||
NK_LP col_name_list(B) NK_RP query_or_subquery(C). { A = createInsertStmt(pCxt, D, B, C); }
|
||||
insert_query(A) ::= INSERT INTO full_table_name(C) query_or_subquery(B). { A = createInsertStmt(pCxt, C, NULL, B); }
|
||||
|
||||
/************************************************ literal *************************************************************/
|
||||
literal(A) ::= NK_INTEGER(B). { A = createRawExprNode(pCxt, &B, createValueNode(pCxt, TSDB_DATA_TYPE_UBIGINT, &B)); }
|
||||
|
|
|
@ -290,7 +290,8 @@ int32_t insGetTableDataCxt(SHashObj* pHash, void* id, int32_t idLen, STableMeta*
|
|||
}
|
||||
int32_t code = createTableDataCxt(pTableMeta, pCreateTbReq, pTableCxt, colMode);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = taosHashPut(pHash, id, idLen, pTableCxt, POINTER_BYTES);
|
||||
void* pData = *pTableCxt; // deal scan coverity
|
||||
code = taosHashPut(pHash, id, idLen, &pData, POINTER_BYTES);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -3517,7 +3517,7 @@ static int32_t translateSetOperProject(STranslateContext* pCxt, SSetOperator* pS
|
|||
if (comp > 0) {
|
||||
SNode* pRightFunc = NULL;
|
||||
int32_t code = createCastFunc(pCxt, pRight, pLeftExpr->resType, &pRightFunc);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
if (TSDB_CODE_SUCCESS != code || NULL == pRightFunc) { // deal scan coverity
|
||||
return code;
|
||||
}
|
||||
REPLACE_LIST2_NODE(pRightFunc);
|
||||
|
@ -3525,7 +3525,7 @@ static int32_t translateSetOperProject(STranslateContext* pCxt, SSetOperator* pS
|
|||
} else if (comp < 0) {
|
||||
SNode* pLeftFunc = NULL;
|
||||
int32_t code = createCastFunc(pCxt, pLeft, pRightExpr->resType, &pLeftFunc);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
if (TSDB_CODE_SUCCESS != code || NULL == pLeftFunc) { // deal scan coverity
|
||||
return code;
|
||||
}
|
||||
REPLACE_LIST1_NODE(pLeftFunc);
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -234,7 +234,7 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
|
|||
}
|
||||
|
||||
SSchTask *pTask = taosArrayGet(pLevel->subTasks, 0);
|
||||
if (SUBPLAN_TYPE_MODIFY != pTask->plan->subplanType) {
|
||||
if (SUBPLAN_TYPE_MODIFY != pTask->plan->subplanType || EXPLAIN_MODE_DISABLE != pJob->attr.explainMode) {
|
||||
pJob->attr.needFetch = true;
|
||||
}
|
||||
}
|
||||
|
@ -484,7 +484,7 @@ int32_t schProcessOnJobFailure(SSchJob *pJob, int32_t errCode) {
|
|||
if (TSDB_CODE_SCH_IGNORE_ERROR == errCode) {
|
||||
return TSDB_CODE_SCH_IGNORE_ERROR;
|
||||
}
|
||||
|
||||
|
||||
schUpdateJobErrCode(pJob, errCode);
|
||||
|
||||
int32_t code = atomic_load_32(&pJob->errCode);
|
||||
|
|
Loading…
Reference in New Issue