feat: sql command 'drop table'
This commit is contained in:
parent
18a12ae3c7
commit
8818ed48f9
|
@ -1605,6 +1605,7 @@ int32_t tDeserializeSVCreateTbBatchRsp(void* buf, int32_t bufLen, SVCreateTbBatc
|
|||
// TDMT_VND_DROP_TABLE =================
|
||||
typedef struct {
|
||||
const char* name;
|
||||
int8_t igNotExists;
|
||||
} SVDropTbReq;
|
||||
|
||||
typedef struct {
|
||||
|
|
|
@ -616,6 +616,7 @@ int32_t* taosGetErrno();
|
|||
#define TSDB_CODE_PAR_OFFSET_LESS_ZERO TAOS_DEF_ERROR_CODE(0, 0x2637)
|
||||
#define TSDB_CODE_PAR_SLIMIT_LEAK_PARTITION_BY TAOS_DEF_ERROR_CODE(0, 0x2638)
|
||||
#define TSDB_CODE_PAR_INVALID_TOPIC_QUERY TAOS_DEF_ERROR_CODE(0, 0x2639)
|
||||
#define TSDB_CODE_PAR_INVALID_DROP_STABLE TAOS_DEF_ERROR_CODE(0, 0x263A)
|
||||
|
||||
//planner
|
||||
#define TSDB_CODE_PLAN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2700)
|
||||
|
|
|
@ -3800,6 +3800,7 @@ static int32_t tEncodeSVDropTbReq(SCoder *pCoder, const SVDropTbReq *pReq) {
|
|||
if (tStartEncode(pCoder) < 0) return -1;
|
||||
|
||||
if (tEncodeCStr(pCoder, pReq->name) < 0) return -1;
|
||||
if (tEncodeI8(pCoder, pReq->igNotExists) < 0) return -1;
|
||||
|
||||
tEndEncode(pCoder);
|
||||
return 0;
|
||||
|
@ -3809,6 +3810,7 @@ static int32_t tDecodeSVDropTbReq(SCoder *pCoder, SVDropTbReq *pReq) {
|
|||
if (tStartDecode(pCoder) < 0) return -1;
|
||||
|
||||
if (tDecodeCStr(pCoder, &pReq->name) < 0) return -1;
|
||||
if (tDecodeI8(pCoder, &pReq->igNotExists) < 0) return -1;
|
||||
|
||||
tEndDecode(pCoder);
|
||||
return 0;
|
||||
|
|
|
@ -3134,11 +3134,11 @@ static int32_t rewriteShow(STranslateContext* pCxt, SQuery* pQuery) {
|
|||
return code;
|
||||
}
|
||||
|
||||
typedef struct SVgroupTablesBatch {
|
||||
typedef struct SVgroupCreateTableBatch {
|
||||
SVCreateTbBatchReq req;
|
||||
SVgroupInfo info;
|
||||
char dbName[TSDB_DB_NAME_LEN];
|
||||
} SVgroupTablesBatch;
|
||||
} SVgroupCreateTableBatch;
|
||||
|
||||
static void destroyCreateTbReq(SVCreateTbReq* pReq) {
|
||||
taosMemoryFreeClear(pReq->name);
|
||||
|
@ -3146,7 +3146,7 @@ static void destroyCreateTbReq(SVCreateTbReq* pReq) {
|
|||
}
|
||||
|
||||
static int32_t buildNormalTableBatchReq(int32_t acctId, const SCreateTableStmt* pStmt, const SVgroupInfo* pVgroupInfo,
|
||||
SVgroupTablesBatch* pBatch) {
|
||||
SVgroupCreateTableBatch* pBatch) {
|
||||
char dbFName[TSDB_DB_FNAME_LEN] = {0};
|
||||
SName name = {.type = TSDB_DB_NAME_T, .acctId = acctId};
|
||||
strcpy(name.dbname, pStmt->dbName);
|
||||
|
@ -3180,13 +3180,13 @@ static int32_t buildNormalTableBatchReq(int32_t acctId, const SCreateTableStmt*
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t serializeVgroupTablesBatch(SVgroupTablesBatch* pTbBatch, SArray* pBufArray) {
|
||||
static int32_t serializeVgroupCreateTableBatch(SVgroupCreateTableBatch* pTbBatch, SArray* pBufArray) {
|
||||
int tlen;
|
||||
SCoder coder = {0};
|
||||
|
||||
int32_t ret = 0;
|
||||
tEncodeSize(tEncodeSVCreateTbBatchReq, &pTbBatch->req, tlen, ret);
|
||||
tlen += sizeof(SMsgHead); //+ tSerializeSVCreateTbBatchReq(NULL, &(pTbBatch->req));
|
||||
tlen += sizeof(SMsgHead);
|
||||
void* buf = taosMemoryMalloc(tlen);
|
||||
if (NULL == buf) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -3212,7 +3212,7 @@ static int32_t serializeVgroupTablesBatch(SVgroupTablesBatch* pTbBatch, SArray*
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static void destroyCreateTbReqBatch(SVgroupTablesBatch* pTbBatch) {
|
||||
static void destroyCreateTbReqBatch(SVgroupCreateTableBatch* pTbBatch) {
|
||||
size_t size = taosArrayGetSize(pTbBatch->req.pArray);
|
||||
for (int32_t i = 0; i < size; ++i) {
|
||||
SVCreateTbReq* pTableReq = taosArrayGet(pTbBatch->req.pArray, i);
|
||||
|
@ -3257,10 +3257,10 @@ static int32_t buildCreateTableDataBlock(int32_t acctId, const SCreateTableStmt*
|
|||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
SVgroupTablesBatch tbatch = {0};
|
||||
int32_t code = buildNormalTableBatchReq(acctId, pStmt, pInfo, &tbatch);
|
||||
SVgroupCreateTableBatch tbatch = {0};
|
||||
int32_t code = buildNormalTableBatchReq(acctId, pStmt, pInfo, &tbatch);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = serializeVgroupTablesBatch(&tbatch, *pBufArray);
|
||||
code = serializeVgroupCreateTableBatch(&tbatch, *pBufArray);
|
||||
}
|
||||
|
||||
destroyCreateTbReqBatch(&tbatch);
|
||||
|
@ -3305,9 +3305,9 @@ static void addCreateTbReqIntoVgroup(int32_t acctId, SHashObj* pVgroupHashmap, c
|
|||
req.ctb.suid = suid;
|
||||
req.ctb.pTag = row;
|
||||
|
||||
SVgroupTablesBatch* pTableBatch = taosHashGet(pVgroupHashmap, &pVgInfo->vgId, sizeof(pVgInfo->vgId));
|
||||
SVgroupCreateTableBatch* pTableBatch = taosHashGet(pVgroupHashmap, &pVgInfo->vgId, sizeof(pVgInfo->vgId));
|
||||
if (pTableBatch == NULL) {
|
||||
SVgroupTablesBatch tBatch = {0};
|
||||
SVgroupCreateTableBatch tBatch = {0};
|
||||
tBatch.info = *pVgInfo;
|
||||
strcpy(tBatch.dbName, pDbName);
|
||||
|
||||
|
@ -3480,21 +3480,21 @@ static int32_t rewriteCreateSubTable(STranslateContext* pCxt, SCreateSubTableCla
|
|||
return code;
|
||||
}
|
||||
|
||||
static SArray* serializeVgroupsTablesBatch(int32_t acctId, SHashObj* pVgroupHashmap) {
|
||||
static SArray* serializeVgroupsCreateTableBatch(int32_t acctId, SHashObj* pVgroupHashmap) {
|
||||
SArray* pBufArray = taosArrayInit(taosHashGetSize(pVgroupHashmap), sizeof(void*));
|
||||
if (NULL == pBufArray) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SVgroupTablesBatch* pTbBatch = NULL;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SVgroupCreateTableBatch* pTbBatch = NULL;
|
||||
do {
|
||||
pTbBatch = taosHashIterate(pVgroupHashmap, pTbBatch);
|
||||
if (pTbBatch == NULL) {
|
||||
break;
|
||||
}
|
||||
|
||||
serializeVgroupTablesBatch(pTbBatch, pBufArray);
|
||||
serializeVgroupCreateTableBatch(pTbBatch, pBufArray);
|
||||
destroyCreateTbReqBatch(pTbBatch);
|
||||
} while (true);
|
||||
|
||||
|
@ -3519,7 +3519,143 @@ static int32_t rewriteCreateMultiTable(STranslateContext* pCxt, SQuery* pQuery)
|
|||
}
|
||||
}
|
||||
|
||||
SArray* pBufArray = serializeVgroupsTablesBatch(pCxt->pParseCxt->acctId, pVgroupHashmap);
|
||||
SArray* pBufArray = serializeVgroupsCreateTableBatch(pCxt->pParseCxt->acctId, pVgroupHashmap);
|
||||
taosHashCleanup(pVgroupHashmap);
|
||||
if (NULL == pBufArray) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
return rewriteToVnodeModifOpStmt(pQuery, pBufArray);
|
||||
}
|
||||
|
||||
typedef struct SVgroupDropTableBatch {
|
||||
SVDropTbBatchReq req;
|
||||
SVgroupInfo info;
|
||||
char dbName[TSDB_DB_NAME_LEN];
|
||||
} SVgroupDropTableBatch;
|
||||
|
||||
static void addDropTbReqIntoVgroup(SHashObj* pVgroupHashmap, SDropTableClause* pClause, SVgroupInfo* pVgInfo) {
|
||||
SVDropTbReq req = {.name = pClause->tableName, .igNotExists = pClause->ignoreNotExists};
|
||||
SVgroupDropTableBatch* pTableBatch = taosHashGet(pVgroupHashmap, &pVgInfo->vgId, sizeof(pVgInfo->vgId));
|
||||
if (NULL == pTableBatch) {
|
||||
SVgroupDropTableBatch tBatch = {0};
|
||||
tBatch.info = *pVgInfo;
|
||||
tBatch.req.pArray = taosArrayInit(TARRAY_MIN_SIZE, sizeof(SVDropTbReq));
|
||||
taosArrayPush(tBatch.req.pArray, &req);
|
||||
|
||||
taosHashPut(pVgroupHashmap, &pVgInfo->vgId, sizeof(pVgInfo->vgId), &tBatch, sizeof(tBatch));
|
||||
} else { // add to the correct vgroup
|
||||
taosArrayPush(pTableBatch->req.pArray, &req);
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t buildDropTableVgroupHashmap(STranslateContext* pCxt, SDropTableClause* pClause, bool* pIsSuperTable,
|
||||
SHashObj* pVgroupHashmap) {
|
||||
STableMeta* pTableMeta = NULL;
|
||||
int32_t code = getTableMeta(pCxt, pClause->dbName, pClause->tableName, &pTableMeta);
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code && TSDB_SUPER_TABLE == pTableMeta->tableType) {
|
||||
*pIsSuperTable = true;
|
||||
goto over;
|
||||
}
|
||||
|
||||
*pIsSuperTable = false;
|
||||
|
||||
SVgroupInfo info = {0};
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = getTableHashVgroup(pCxt, pClause->dbName, pClause->tableName, &info);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
addDropTbReqIntoVgroup(pVgroupHashmap, pClause, &info);
|
||||
}
|
||||
|
||||
over:
|
||||
taosMemoryFreeClear(pTableMeta);
|
||||
return code;
|
||||
}
|
||||
|
||||
static void destroyDropTbReqBatch(SVgroupDropTableBatch* pTbBatch) { taosArrayDestroy(pTbBatch->req.pArray); }
|
||||
|
||||
static int32_t serializeVgroupDropTableBatch(SVgroupDropTableBatch* pTbBatch, SArray* pBufArray) {
|
||||
int tlen;
|
||||
SCoder coder = {0};
|
||||
|
||||
int32_t ret = 0;
|
||||
tEncodeSize(tEncodeSVDropTbBatchReq, &pTbBatch->req, tlen, ret);
|
||||
tlen += sizeof(SMsgHead);
|
||||
void* buf = taosMemoryMalloc(tlen);
|
||||
if (NULL == buf) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
((SMsgHead*)buf)->vgId = htonl(pTbBatch->info.vgId);
|
||||
((SMsgHead*)buf)->contLen = htonl(tlen);
|
||||
void* pBuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
||||
|
||||
tCoderInit(&coder, TD_LITTLE_ENDIAN, pBuf, tlen - sizeof(SMsgHead), TD_ENCODER);
|
||||
tEncodeSVDropTbBatchReq(&coder, &pTbBatch->req);
|
||||
tCoderClear(&coder);
|
||||
|
||||
SVgDataBlocks* pVgData = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
|
||||
if (NULL == pVgData) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
pVgData->vg = pTbBatch->info;
|
||||
pVgData->pData = buf;
|
||||
pVgData->size = tlen;
|
||||
pVgData->numOfTables = (int32_t)taosArrayGetSize(pTbBatch->req.pArray);
|
||||
taosArrayPush(pBufArray, &pVgData);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static SArray* serializeVgroupsDropTableBatch(int32_t acctId, SHashObj* pVgroupHashmap) {
|
||||
SArray* pBufArray = taosArrayInit(taosHashGetSize(pVgroupHashmap), sizeof(void*));
|
||||
if (NULL == pBufArray) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SVgroupDropTableBatch* pTbBatch = NULL;
|
||||
do {
|
||||
pTbBatch = taosHashIterate(pVgroupHashmap, pTbBatch);
|
||||
if (pTbBatch == NULL) {
|
||||
break;
|
||||
}
|
||||
|
||||
serializeVgroupDropTableBatch(pTbBatch, pBufArray);
|
||||
destroyDropTbReqBatch(pTbBatch);
|
||||
} while (true);
|
||||
|
||||
return pBufArray;
|
||||
}
|
||||
|
||||
static int32_t rewriteDropTable(STranslateContext* pCxt, SQuery* pQuery) {
|
||||
SDropTableStmt* pStmt = (SDropTableStmt*)pQuery->pRoot;
|
||||
|
||||
SHashObj* pVgroupHashmap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
|
||||
if (NULL == pVgroupHashmap) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
bool isSuperTable = false;
|
||||
SNode* pNode;
|
||||
FOREACH(pNode, pStmt->pTables) {
|
||||
int32_t code = buildDropTableVgroupHashmap(pCxt, (SDropTableClause*)pNode, &isSuperTable, pVgroupHashmap);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
taosHashCleanup(pVgroupHashmap);
|
||||
return code;
|
||||
}
|
||||
if (isSuperTable && LIST_LENGTH(pStmt->pTables) > 1) {
|
||||
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_DROP_STABLE);
|
||||
}
|
||||
}
|
||||
|
||||
if (isSuperTable) {
|
||||
taosHashCleanup(pVgroupHashmap);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SArray* pBufArray = serializeVgroupsDropTableBatch(pCxt->pParseCxt->acctId, pVgroupHashmap);
|
||||
taosHashCleanup(pVgroupHashmap);
|
||||
if (NULL == pBufArray) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -3565,6 +3701,9 @@ static int32_t rewriteQuery(STranslateContext* pCxt, SQuery* pQuery) {
|
|||
case QUERY_NODE_CREATE_MULTI_TABLE_STMT:
|
||||
code = rewriteCreateMultiTable(pCxt, pQuery);
|
||||
break;
|
||||
case QUERY_NODE_DROP_TABLE_STMT:
|
||||
code = rewriteDropTable(pCxt, pQuery);
|
||||
break;
|
||||
case QUERY_NODE_ALTER_TABLE_STMT:
|
||||
if (TSDB_ALTER_TABLE_UPDATE_TAG_VAL == ((SAlterTableStmt*)pQuery->pRoot)->alterType) {
|
||||
code = rewriteAlterTable(pCxt, pQuery);
|
||||
|
|
|
@ -126,6 +126,8 @@ static char* getSyntaxErrFormat(int32_t errCode) {
|
|||
return "slimit/soffset only available for PARTITION BY query";
|
||||
case TSDB_CODE_PAR_INVALID_TOPIC_QUERY:
|
||||
return "Invalid topic query";
|
||||
case TSDB_CODE_PAR_INVALID_DROP_STABLE:
|
||||
return "Cannot drop super table in batch";
|
||||
case TSDB_CODE_OUT_OF_MEMORY:
|
||||
return "Out of memory";
|
||||
default:
|
||||
|
|
|
@ -944,9 +944,16 @@ static int32_t createSetOperatorLogicNode(SLogicPlanContext* pCxt, SSetOperator*
|
|||
}
|
||||
|
||||
static int32_t getMsgType(ENodeType sqlType) {
|
||||
return (QUERY_NODE_CREATE_TABLE_STMT == sqlType || QUERY_NODE_CREATE_MULTI_TABLE_STMT == sqlType)
|
||||
? TDMT_VND_CREATE_TABLE
|
||||
: TDMT_VND_SUBMIT;
|
||||
switch (sqlType) {
|
||||
case QUERY_NODE_CREATE_TABLE_STMT:
|
||||
case QUERY_NODE_CREATE_MULTI_TABLE_STMT:
|
||||
return TDMT_VND_CREATE_TABLE;
|
||||
case QUERY_NODE_DROP_TABLE_STMT:
|
||||
return TDMT_VND_DROP_TABLE;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
return TDMT_VND_SUBMIT;
|
||||
}
|
||||
|
||||
static int32_t createVnodeModifLogicNode(SLogicPlanContext* pCxt, SVnodeModifOpStmt* pStmt, SLogicNode** pLogicNode) {
|
||||
|
|
|
@ -245,6 +245,7 @@ int32_t schValidateTaskReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t m
|
|||
SCH_SET_TASK_LASTMSG_TYPE(pTask, -1);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
case TDMT_VND_CREATE_TABLE_RSP:
|
||||
case TDMT_VND_DROP_TABLE_RSP:
|
||||
case TDMT_VND_SUBMIT_RSP:
|
||||
break;
|
||||
default:
|
||||
|
@ -369,7 +370,7 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
|
|||
}
|
||||
|
||||
for (int32_t n = 0; n < childNum; ++n) {
|
||||
SSubplan * child = (SSubplan *)nodesListGetNode(pPlan->pChildren, n);
|
||||
SSubplan *child = (SSubplan *)nodesListGetNode(pPlan->pChildren, n);
|
||||
SSchTask **childTask = taosHashGet(planToTask, &child, POINTER_BYTES);
|
||||
if (NULL == childTask || NULL == *childTask) {
|
||||
SCH_TASK_ELOG("subplan children relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
|
||||
|
@ -401,7 +402,7 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
|
|||
}
|
||||
|
||||
for (int32_t n = 0; n < parentNum; ++n) {
|
||||
SSubplan * parent = (SSubplan *)nodesListGetNode(pPlan->pParents, n);
|
||||
SSubplan *parent = (SSubplan *)nodesListGetNode(pPlan->pParents, n);
|
||||
SSchTask **parentTask = taosHashGet(planToTask, &parent, POINTER_BYTES);
|
||||
if (NULL == parentTask || NULL == *parentTask) {
|
||||
SCH_TASK_ELOG("subplan parent relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
|
||||
|
@ -491,7 +492,7 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
|
|||
SSchLevel level = {0};
|
||||
SNodeListNode *plans = NULL;
|
||||
int32_t taskNum = 0;
|
||||
SSchLevel * pLevel = NULL;
|
||||
SSchLevel *pLevel = NULL;
|
||||
|
||||
level.status = JOB_TASK_STATUS_NOT_START;
|
||||
|
||||
|
@ -1094,6 +1095,30 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
|
|||
SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
|
||||
break;
|
||||
}
|
||||
case TDMT_VND_DROP_TABLE_RSP: {
|
||||
SVDropTbBatchRsp batchRsp = {0};
|
||||
if (msg) {
|
||||
SCoder coder = {0};
|
||||
tCoderInit(&coder, TD_LITTLE_ENDIAN, msg, msgSize, TD_DECODER);
|
||||
code = tDecodeSVDropTbBatchRsp(&coder, &batchRsp);
|
||||
if (TSDB_CODE_SUCCESS == code && batchRsp.pArray) {
|
||||
int32_t num = taosArrayGetSize(batchRsp.pArray);
|
||||
for (int32_t i = 0; i < num; ++i) {
|
||||
SVDropTbRsp *rsp = taosArrayGet(batchRsp.pArray, i);
|
||||
if (NEED_CLIENT_HANDLE_ERROR(rsp->code)) {
|
||||
tCoderClear(&coder);
|
||||
SCH_ERR_JRET(rsp->code);
|
||||
}
|
||||
}
|
||||
}
|
||||
tCoderClear(&coder);
|
||||
SCH_ERR_JRET(code);
|
||||
}
|
||||
|
||||
SCH_ERR_JRET(rspCode);
|
||||
SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
|
||||
break;
|
||||
}
|
||||
case TDMT_VND_SUBMIT_RSP: {
|
||||
if (msg) {
|
||||
SSubmitRsp *rsp = (SSubmitRsp *)msg;
|
||||
|
@ -1267,7 +1292,7 @@ int32_t schUpdateTaskExecNodeHandle(SSchTask *pTask, void *handle, int32_t rspCo
|
|||
int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t msgType, int32_t rspCode) {
|
||||
int32_t code = 0;
|
||||
SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param;
|
||||
SSchTask * pTask = NULL;
|
||||
SSchTask *pTask = NULL;
|
||||
|
||||
SSchJob *pJob = schAcquireJob(pParam->refId);
|
||||
if (NULL == pJob) {
|
||||
|
@ -1316,6 +1341,10 @@ int32_t schHandleCreateTableCallback(void *param, const SDataBuf *pMsg, int32_t
|
|||
return schHandleCallback(param, pMsg, TDMT_VND_CREATE_TABLE_RSP, code);
|
||||
}
|
||||
|
||||
int32_t schHandleDropTableCallback(void *param, const SDataBuf *pMsg, int32_t code) {
|
||||
return schHandleCallback(param, pMsg, TDMT_VND_DROP_TABLE_RSP, code);
|
||||
}
|
||||
|
||||
int32_t schHandleQueryCallback(void *param, const SDataBuf *pMsg, int32_t code) {
|
||||
return schHandleCallback(param, pMsg, TDMT_VND_QUERY_RSP, code);
|
||||
}
|
||||
|
@ -1412,6 +1441,9 @@ int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) {
|
|||
case TDMT_VND_CREATE_TABLE:
|
||||
*fp = schHandleCreateTableCallback;
|
||||
break;
|
||||
case TDMT_VND_DROP_TABLE:
|
||||
*fp = schHandleDropTableCallback;
|
||||
break;
|
||||
case TDMT_VND_SUBMIT:
|
||||
*fp = schHandleSubmitCallback;
|
||||
break;
|
||||
|
@ -1617,8 +1649,8 @@ _return:
|
|||
int32_t schMakeHbRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) {
|
||||
int32_t code = 0;
|
||||
SSchHbCallbackParam *param = NULL;
|
||||
SMsgSendInfo * pMsgSendInfo = NULL;
|
||||
SQueryNodeAddr * addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
|
||||
SMsgSendInfo *pMsgSendInfo = NULL;
|
||||
SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
|
||||
SQueryNodeEpId epId = {0};
|
||||
|
||||
epId.nodeId = addr->nodeId;
|
||||
|
@ -1759,10 +1791,10 @@ int32_t schCloneHbRpcCtx(SRpcCtx *pSrc, SRpcCtx *pDst) {
|
|||
}
|
||||
|
||||
SRpcCtxVal dst = {0};
|
||||
void * pIter = taosHashIterate(pSrc->args, NULL);
|
||||
void *pIter = taosHashIterate(pSrc->args, NULL);
|
||||
while (pIter) {
|
||||
SRpcCtxVal *pVal = (SRpcCtxVal *)pIter;
|
||||
int32_t * msgType = taosHashGetKey(pIter, NULL);
|
||||
int32_t *msgType = taosHashGetKey(pIter, NULL);
|
||||
|
||||
dst = *pVal;
|
||||
dst.val = NULL;
|
||||
|
@ -1916,7 +1948,7 @@ _return:
|
|||
|
||||
int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, int32_t msgType) {
|
||||
uint32_t msgSize = 0;
|
||||
void * msg = NULL;
|
||||
void *msg = NULL;
|
||||
int32_t code = 0;
|
||||
bool isCandidateAddr = false;
|
||||
bool persistHandle = false;
|
||||
|
@ -1931,6 +1963,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
|
|||
|
||||
switch (msgType) {
|
||||
case TDMT_VND_CREATE_TABLE:
|
||||
case TDMT_VND_DROP_TABLE:
|
||||
case TDMT_VND_SUBMIT: {
|
||||
msgSize = pTask->msgLen;
|
||||
msg = taosMemoryCalloc(1, msgSize);
|
||||
|
@ -2673,7 +2706,7 @@ int32_t schedulerGetTasksStatus(int64_t job, SArray *pSub) {
|
|||
SSchLevel *pLevel = taosArrayGet(pJob->levels, i);
|
||||
|
||||
for (int32_t m = 0; m < pLevel->taskNum; ++m) {
|
||||
SSchTask * pTask = taosArrayGet(pLevel->subTasks, m);
|
||||
SSchTask *pTask = taosArrayGet(pLevel->subTasks, m);
|
||||
SQuerySubDesc subDesc = {.tid = pTask->taskId, .status = pTask->status};
|
||||
|
||||
taosArrayPush(pSub, &subDesc);
|
||||
|
|
Loading…
Reference in New Issue