enh: support multiple groups in exchange operator
This commit is contained in:
parent
c65dc5d3e9
commit
65adb259ff
|
@ -29,7 +29,7 @@ struct SMetaData;
|
||||||
typedef struct SStmtCallback {
|
typedef struct SStmtCallback {
|
||||||
TAOS_STMT* pStmt;
|
TAOS_STMT* pStmt;
|
||||||
int32_t (*getTbNameFn)(TAOS_STMT*, char**);
|
int32_t (*getTbNameFn)(TAOS_STMT*, char**);
|
||||||
int32_t (*setInfoFn)(TAOS_STMT*, STableMeta*, void*, char*, bool, SHashObj*, SHashObj*, const char*);
|
int32_t (*setInfoFn)(TAOS_STMT*, STableMeta*, void*, SName*, bool, SHashObj*, SHashObj*, const char*);
|
||||||
int32_t (*getExecInfoFn)(TAOS_STMT*, SHashObj**, SHashObj**);
|
int32_t (*getExecInfoFn)(TAOS_STMT*, SHashObj**, SHashObj**);
|
||||||
} SStmtCallback;
|
} SStmtCallback;
|
||||||
|
|
||||||
|
|
|
@ -2215,11 +2215,16 @@ void syncQueryFn(void* param, void* res, int32_t code) {
|
||||||
SSyncQueryParam* pParam = param;
|
SSyncQueryParam* pParam = param;
|
||||||
pParam->pRequest = res;
|
pParam->pRequest = res;
|
||||||
|
|
||||||
if (pParam->pRequest) {
|
if (pParam->pRequest != NULL) {
|
||||||
|
pParam->pRequest->syncQuery = true;
|
||||||
pParam->pRequest->code = code;
|
pParam->pRequest->code = code;
|
||||||
}
|
}
|
||||||
|
|
||||||
tsem_post(&pParam->sem);
|
tsem_post(&pParam->sem);
|
||||||
|
|
||||||
|
if (NULL == res) {
|
||||||
|
taosMemoryFree(param);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void taosAsyncQueryImpl(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly) {
|
void taosAsyncQueryImpl(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly) {
|
||||||
|
@ -2293,9 +2298,7 @@ TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly) {
|
||||||
|
|
||||||
taosAsyncQueryImpl(*(int64_t*)taos, sql, syncQueryFn, param, validateOnly);
|
taosAsyncQueryImpl(*(int64_t*)taos, sql, syncQueryFn, param, validateOnly);
|
||||||
tsem_wait(¶m->sem);
|
tsem_wait(¶m->sem);
|
||||||
if (param->pRequest != NULL) {
|
|
||||||
param->pRequest->syncQuery = true;
|
|
||||||
}
|
|
||||||
return param->pRequest;
|
return param->pRequest;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2310,8 +2313,6 @@ TAOS_RES* taosQueryImplWithReqid(TAOS* taos, const char* sql, bool validateOnly,
|
||||||
|
|
||||||
taosAsyncQueryImplWithReqid(*(int64_t*)taos, sql, syncQueryFn, param, validateOnly, reqid);
|
taosAsyncQueryImplWithReqid(*(int64_t*)taos, sql, syncQueryFn, param, validateOnly, reqid);
|
||||||
tsem_wait(¶m->sem);
|
tsem_wait(¶m->sem);
|
||||||
if (param->pRequest != NULL) {
|
|
||||||
param->pRequest->syncQuery = true;
|
|
||||||
}
|
|
||||||
return param->pRequest;
|
return param->pRequest;
|
||||||
}
|
}
|
||||||
|
|
|
@ -152,9 +152,12 @@ int32_t stmtRestoreQueryFields(STscStmt* pStmt) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t stmtUpdateBindInfo(TAOS_STMT* stmt, STableMeta* pTableMeta, void* tags, char* tbFName, const char* sTableName, bool autoCreateTbl) {
|
int32_t stmtUpdateBindInfo(TAOS_STMT* stmt, STableMeta* pTableMeta, void* tags, SName* tbName, const char* sTableName, bool autoCreateTbl) {
|
||||||
STscStmt* pStmt = (STscStmt*)stmt;
|
STscStmt* pStmt = (STscStmt*)stmt;
|
||||||
|
char tbFName[TSDB_TABLE_FNAME_LEN];
|
||||||
|
tNameExtractFullName(tbName, tbFName);
|
||||||
|
|
||||||
|
memcpy(&pStmt->bInfo.sname, tbName, sizeof(*tbName));
|
||||||
strncpy(pStmt->bInfo.tbFName, tbFName, sizeof(pStmt->bInfo.tbFName) - 1);
|
strncpy(pStmt->bInfo.tbFName, tbFName, sizeof(pStmt->bInfo.tbFName) - 1);
|
||||||
pStmt->bInfo.tbFName[sizeof(pStmt->bInfo.tbFName) - 1] = 0;
|
pStmt->bInfo.tbFName[sizeof(pStmt->bInfo.tbFName) - 1] = 0;
|
||||||
|
|
||||||
|
@ -178,11 +181,11 @@ int32_t stmtUpdateExecInfo(TAOS_STMT* stmt, SHashObj* pVgHash, SHashObj* pBlockH
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t stmtUpdateInfo(TAOS_STMT* stmt, STableMeta* pTableMeta, void* tags, char* tbFName, bool autoCreateTbl,
|
int32_t stmtUpdateInfo(TAOS_STMT* stmt, STableMeta* pTableMeta, void* tags, SName* tbName, bool autoCreateTbl,
|
||||||
SHashObj* pVgHash, SHashObj* pBlockHash, const char* sTableName) {
|
SHashObj* pVgHash, SHashObj* pBlockHash, const char* sTableName) {
|
||||||
STscStmt* pStmt = (STscStmt*)stmt;
|
STscStmt* pStmt = (STscStmt*)stmt;
|
||||||
|
|
||||||
STMT_ERR_RET(stmtUpdateBindInfo(stmt, pTableMeta, tags, tbFName, sTableName, autoCreateTbl));
|
STMT_ERR_RET(stmtUpdateBindInfo(stmt, pTableMeta, tags, tbName, sTableName, autoCreateTbl));
|
||||||
STMT_ERR_RET(stmtUpdateExecInfo(stmt, pVgHash, pBlockHash, autoCreateTbl));
|
STMT_ERR_RET(stmtUpdateExecInfo(stmt, pVgHash, pBlockHash, autoCreateTbl));
|
||||||
|
|
||||||
pStmt->sql.autoCreateTbl = autoCreateTbl;
|
pStmt->sql.autoCreateTbl = autoCreateTbl;
|
||||||
|
@ -773,7 +776,9 @@ int stmtAddBatch(TAOS_STMT* stmt) {
|
||||||
int stmtUpdateTableUid(STscStmt* pStmt, SSubmitRsp* pRsp) {
|
int stmtUpdateTableUid(STscStmt* pStmt, SSubmitRsp* pRsp) {
|
||||||
tscDebug("stmt start to update tbUid, blockNum: %d", pRsp->nBlocks);
|
tscDebug("stmt start to update tbUid, blockNum: %d", pRsp->nBlocks);
|
||||||
|
|
||||||
size_t keyLen = 0;
|
int32_t code = 0;
|
||||||
|
int32_t finalCode = 0;
|
||||||
|
size_t keyLen = 0;
|
||||||
STableDataBlocks** pIter = taosHashIterate(pStmt->exec.pBlockHash, NULL);
|
STableDataBlocks** pIter = taosHashIterate(pStmt->exec.pBlockHash, NULL);
|
||||||
while (pIter) {
|
while (pIter) {
|
||||||
STableDataBlocks* pBlock = *pIter;
|
STableDataBlocks* pBlock = *pIter;
|
||||||
|
@ -809,10 +814,20 @@ int stmtUpdateTableUid(STscStmt* pStmt, SSubmitRsp* pRsp) {
|
||||||
} else {
|
} else {
|
||||||
tscDebug("table %s not found in submit rsp, will update from catalog", pStmt->bInfo.tbFName);
|
tscDebug("table %s not found in submit rsp, will update from catalog", pStmt->bInfo.tbFName);
|
||||||
if (NULL == pStmt->pCatalog) {
|
if (NULL == pStmt->pCatalog) {
|
||||||
STMT_ERR_RET(catalogGetHandle(pStmt->taos->pAppInfo->clusterId, &pStmt->pCatalog));
|
code = catalogGetHandle(pStmt->taos->pAppInfo->clusterId, &pStmt->pCatalog);
|
||||||
|
if (code) {
|
||||||
|
pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
|
||||||
|
finalCode = code;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
STMT_ERR_RET(stmtCreateRequest(pStmt));
|
code = stmtCreateRequest(pStmt);
|
||||||
|
if (code) {
|
||||||
|
pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
|
||||||
|
finalCode = code;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
STableMeta* pTableMeta = NULL;
|
STableMeta* pTableMeta = NULL;
|
||||||
SRequestConnInfo conn = {.pTrans = pStmt->taos->pAppInfo->pTransporter,
|
SRequestConnInfo conn = {.pTrans = pStmt->taos->pAppInfo->pTransporter,
|
||||||
|
@ -824,9 +839,10 @@ int stmtUpdateTableUid(STscStmt* pStmt, SSubmitRsp* pRsp) {
|
||||||
taos_free_result(pStmt->exec.pRequest);
|
taos_free_result(pStmt->exec.pRequest);
|
||||||
pStmt->exec.pRequest = NULL;
|
pStmt->exec.pRequest = NULL;
|
||||||
|
|
||||||
if (TSDB_CODE_PAR_TABLE_NOT_EXIST == code) {
|
if (code || NULL == pTableMeta) {
|
||||||
tscDebug("tb %s not exist", pStmt->bInfo.tbFName);
|
pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
|
||||||
return TSDB_CODE_SUCCESS;
|
finalCode = code;
|
||||||
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
pMeta->uid = pTableMeta->uid;
|
pMeta->uid = pTableMeta->uid;
|
||||||
|
@ -836,7 +852,7 @@ int stmtUpdateTableUid(STscStmt* pStmt, SSubmitRsp* pRsp) {
|
||||||
pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
|
pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return finalCode;
|
||||||
}
|
}
|
||||||
|
|
||||||
int stmtExec(TAOS_STMT* stmt) {
|
int stmtExec(TAOS_STMT* stmt) {
|
||||||
|
|
|
@ -1205,7 +1205,7 @@ int32_t ctgHandleGetTbMetasRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu
|
||||||
stbCtx.pName = &stbName;
|
stbCtx.pName = &stbName;
|
||||||
|
|
||||||
STableMeta* stbMeta = NULL;
|
STableMeta* stbMeta = NULL;
|
||||||
ctgReadTbMetaFromCache(pCtg, &stbCtx, &stbMeta);
|
(void)ctgReadTbMetaFromCache(pCtg, &stbCtx, &stbMeta);
|
||||||
if (stbMeta && stbMeta->sversion >= pOut->tbMeta->sversion) {
|
if (stbMeta && stbMeta->sversion >= pOut->tbMeta->sversion) {
|
||||||
ctgDebug("use cached stb meta, tbName:%s", tNameGetTableName(pName));
|
ctgDebug("use cached stb meta, tbName:%s", tNameGetTableName(pName));
|
||||||
exist = 1;
|
exist = 1;
|
||||||
|
|
|
@ -782,13 +782,18 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
|
||||||
}
|
}
|
||||||
case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE: {
|
case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE: {
|
||||||
SExchangePhysiNode *pExchNode = (SExchangePhysiNode *)pNode;
|
SExchangePhysiNode *pExchNode = (SExchangePhysiNode *)pNode;
|
||||||
SExplainGroup *group = taosHashGet(ctx->groupHash, &pExchNode->srcStartGroupId, sizeof(pExchNode->srcStartGroupId));
|
int32_t nodeNum = 0;
|
||||||
if (NULL == group) {
|
for (int32_t i = pExchNode->srcStartGroupId; i <= pExchNode->srcEndGroupId; ++i) {
|
||||||
qError("exchange src group %d not in groupHash", pExchNode->srcStartGroupId);
|
SExplainGroup *group = taosHashGet(ctx->groupHash, &pExchNode->srcStartGroupId, sizeof(pExchNode->srcStartGroupId));
|
||||||
QRY_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
|
if (NULL == group) {
|
||||||
|
qError("exchange src group %d not in groupHash", pExchNode->srcStartGroupId);
|
||||||
|
QRY_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
|
||||||
|
}
|
||||||
|
|
||||||
|
nodeNum += group->nodeNum;
|
||||||
}
|
}
|
||||||
|
|
||||||
EXPLAIN_ROW_NEW(level, EXPLAIN_EXCHANGE_FORMAT, pExchNode->singleChannel ? 1 : group->nodeNum);
|
EXPLAIN_ROW_NEW(level, EXPLAIN_EXCHANGE_FORMAT, pExchNode->singleChannel ? 1 : nodeNum);
|
||||||
EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT);
|
EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT);
|
||||||
if (pResNode->pExecInfo) {
|
if (pResNode->pExecInfo) {
|
||||||
QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen));
|
QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen));
|
||||||
|
@ -819,7 +824,9 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
QRY_ERR_RET(qExplainAppendGroupResRows(ctx, pExchNode->srcStartGroupId, level + 1, pExchNode->singleChannel));
|
for (int32_t i = pExchNode->srcStartGroupId; i <= pExchNode->srcEndGroupId; ++i) {
|
||||||
|
QRY_ERR_RET(qExplainAppendGroupResRows(ctx, i, level + 1, pExchNode->singleChannel));
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case QUERY_NODE_PHYSICAL_PLAN_SORT: {
|
case QUERY_NODE_PHYSICAL_PLAN_SORT: {
|
||||||
|
|
|
@ -1407,7 +1407,7 @@ SNode* createShowTableTagsStmt(SAstCreateContext* pCxt, SNode* pTbName, SNode* p
|
||||||
|
|
||||||
SNode* createCreateUserStmt(SAstCreateContext* pCxt, SToken* pUserName, const SToken* pPassword, int8_t sysinfo) {
|
SNode* createCreateUserStmt(SAstCreateContext* pCxt, SToken* pUserName, const SToken* pPassword, int8_t sysinfo) {
|
||||||
CHECK_PARSER_STATUS(pCxt);
|
CHECK_PARSER_STATUS(pCxt);
|
||||||
char password[TSDB_USET_PASSWORD_LEN] = {0};
|
char password[TSDB_USET_PASSWORD_LEN + 3] = {0};
|
||||||
if (!checkUserName(pCxt, pUserName) || !checkPassword(pCxt, pPassword, password)) {
|
if (!checkUserName(pCxt, pUserName) || !checkPassword(pCxt, pPassword, password)) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1529,9 +1529,7 @@ static int32_t setStmtInfo(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt)
|
||||||
memcpy(tags, &pCxt->tags, sizeof(pCxt->tags));
|
memcpy(tags, &pCxt->tags, sizeof(pCxt->tags));
|
||||||
|
|
||||||
SStmtCallback* pStmtCb = pCxt->pComCxt->pStmtCb;
|
SStmtCallback* pStmtCb = pCxt->pComCxt->pStmtCb;
|
||||||
char tbFName[TSDB_TABLE_FNAME_LEN];
|
int32_t code = (*pStmtCb->setInfoFn)(pStmtCb->pStmt, pStmt->pTableMeta, tags, &pStmt->targetTableName, pStmt->usingTableProcessing,
|
||||||
tNameExtractFullName(&pStmt->targetTableName, tbFName);
|
|
||||||
int32_t code = (*pStmtCb->setInfoFn)(pStmtCb->pStmt, pStmt->pTableMeta, tags, tbFName, pStmt->usingTableProcessing,
|
|
||||||
pStmt->pVgroupsHashObj, pStmt->pTableBlockHashObj, pStmt->usingTableName.tname);
|
pStmt->pVgroupsHashObj, pStmt->pTableBlockHashObj, pStmt->usingTableName.tname);
|
||||||
|
|
||||||
memset(&pCxt->tags, 0, sizeof(pCxt->tags));
|
memset(&pCxt->tags, 0, sizeof(pCxt->tags));
|
||||||
|
|
|
@ -192,6 +192,9 @@ void* taosArrayPop(SArray* pArray) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void* taosArrayGet(const SArray* pArray, size_t index) {
|
void* taosArrayGet(const SArray* pArray, size_t index) {
|
||||||
|
if (NULL == pArray) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
assert(index < pArray->size);
|
assert(index < pArray->size);
|
||||||
return TARRAY_GET_ELEM(pArray, index);
|
return TARRAY_GET_ELEM(pArray, index);
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,7 +21,7 @@
|
||||||
|
|
||||||
#define LOG_MAX_LINE_SIZE (1024)
|
#define LOG_MAX_LINE_SIZE (1024)
|
||||||
#define LOG_MAX_LINE_BUFFER_SIZE (LOG_MAX_LINE_SIZE + 3)
|
#define LOG_MAX_LINE_BUFFER_SIZE (LOG_MAX_LINE_SIZE + 3)
|
||||||
#define LOG_MAX_LINE_DUMP_SIZE (65 * 1024)
|
#define LOG_MAX_LINE_DUMP_SIZE (1024 * 1024)
|
||||||
#define LOG_MAX_LINE_DUMP_BUFFER_SIZE (LOG_MAX_LINE_DUMP_SIZE + 3)
|
#define LOG_MAX_LINE_DUMP_BUFFER_SIZE (LOG_MAX_LINE_DUMP_SIZE + 3)
|
||||||
|
|
||||||
#define LOG_FILE_NAME_LEN 300
|
#define LOG_FILE_NAME_LEN 300
|
||||||
|
@ -496,7 +496,7 @@ void taosPrintLongString(const char *flags, ELogLevel level, int32_t dflag, cons
|
||||||
if (!osLogSpaceAvailable()) return;
|
if (!osLogSpaceAvailable()) return;
|
||||||
if (!(dflag & DEBUG_FILE) && !(dflag & DEBUG_SCREEN)) return;
|
if (!(dflag & DEBUG_FILE) && !(dflag & DEBUG_SCREEN)) return;
|
||||||
|
|
||||||
char buffer[LOG_MAX_LINE_DUMP_BUFFER_SIZE];
|
char *buffer = taosMemoryMalloc(LOG_MAX_LINE_DUMP_BUFFER_SIZE);
|
||||||
int32_t len = taosBuildLogHead(buffer, flags);
|
int32_t len = taosBuildLogHead(buffer, flags);
|
||||||
|
|
||||||
va_list argpointer;
|
va_list argpointer;
|
||||||
|
@ -509,6 +509,7 @@ void taosPrintLongString(const char *flags, ELogLevel level, int32_t dflag, cons
|
||||||
buffer[len] = 0;
|
buffer[len] = 0;
|
||||||
|
|
||||||
taosPrintLogImp(level, dflag, buffer, len);
|
taosPrintLogImp(level, dflag, buffer, len);
|
||||||
|
taosMemoryFree(buffer);
|
||||||
}
|
}
|
||||||
|
|
||||||
void taosDumpData(unsigned char *msg, int32_t len) {
|
void taosDumpData(unsigned char *msg, int32_t len) {
|
||||||
|
|
Loading…
Reference in New Issue