From 3ace0b150b9ae69b60df98844c1184f22e89468b Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Wed, 3 Aug 2022 19:14:36 +0800 Subject: [PATCH] fix(tmq): race condition --- source/client/src/tmq.c | 186 ++++++++++++++++++++-------------------- 1 file changed, 94 insertions(+), 92 deletions(-) diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 84de7f8de9..994cf9fdc1 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -504,15 +504,16 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT pMsgSendInfo->requestId = generateRequestId(); pMsgSendInfo->requestObjRefId = 0; pMsgSendInfo->param = pParam; - pMsgSendInfo->paramFreeFp = taosMemoryFree; + pMsgSendInfo->paramFreeFp = taosMemoryFree; pMsgSendInfo->fp = tmqCommitCb2; pMsgSendInfo->msgType = TDMT_VND_MQ_COMMIT_OFFSET; // send msg + atomic_add_fetch_32(&pParamSet->waitingRspNum, 1); + atomic_add_fetch_32(&pParamSet->totalRspNum, 1); + int64_t transporterId = 0; asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, pMsgSendInfo); - pParamSet->waitingRspNum++; - pParamSet->totalRspNum++; return 0; } @@ -2196,7 +2197,7 @@ static char* buildCreateCTableJson(STag* pTag, char* sname, char* name, SArray* cJSON* tvalue = NULL; if (IS_VAR_DATA_TYPE(pTagVal->type)) { char* buf = taosMemoryCalloc(pTagVal->nData + 3, 1); - if(!buf) goto end; + if (!buf) goto end; dataConverToStr(buf, pTagVal->type, pTagVal->pData, pTagVal->nData, NULL); tvalue = cJSON_CreateString(buf); taosMemoryFree(buf); @@ -2506,8 +2507,8 @@ static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) { launchQueryImpl(pRequest, &pQuery, true, NULL); - if(pRequest->code == TSDB_CODE_SUCCESS){ - SCatalog* pCatalog = NULL; + if (pRequest->code == TSDB_CODE_SUCCESS) { + SCatalog* pCatalog = NULL; catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog); catalogRemoveTableMeta(pCatalog, &tableName); } @@ -2575,8 +2576,8 @@ static int32_t taosDropStb(TAOS* taos, void* meta, int32_t metaLen) { launchQueryImpl(pRequest, &pQuery, true, NULL); - if(pRequest->code == TSDB_CODE_SUCCESS){ - SCatalog* pCatalog = NULL; + if (pRequest->code == TSDB_CODE_SUCCESS) { + SCatalog* pCatalog = NULL; catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog); catalogRemoveTableMeta(pCatalog, &tableName); } @@ -2695,7 +2696,7 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) { } launchQueryImpl(pRequest, pQuery, true, NULL); - if (pRequest->code == TSDB_CODE_SUCCESS){ + if (pRequest->code == TSDB_CODE_SUCCESS) { removeMeta(pTscObj, pRequest->tableList); } @@ -2812,7 +2813,7 @@ static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) { } launchQueryImpl(pRequest, pQuery, true, NULL); - if (pRequest->code == TSDB_CODE_SUCCESS){ + if (pRequest->code == TSDB_CODE_SUCCESS) { removeMeta(pTscObj, pRequest->tableList); } code = pRequest->code; @@ -2827,7 +2828,7 @@ end: // delete from db.tabl where .. -> delete from tabl where .. // delete from db .tabl where .. -> delete from tabl where .. -//static void getTbName(char *sql){ +// static void getTbName(char *sql){ // char *ch = sql; // // bool inBackQuote = false; @@ -2858,9 +2859,9 @@ end: //} static int32_t taosDeleteData(TAOS* taos, void* meta, int32_t metaLen) { - SDeleteRes req = {0}; - SDecoder coder = {0}; - int32_t code = TSDB_CODE_SUCCESS; + SDeleteRes req = {0}; + SDecoder coder = {0}; + int32_t code = TSDB_CODE_SUCCESS; // decode and process req void* data = POINTER_SHIFT(meta, sizeof(SMsgHead)); @@ -2871,13 +2872,14 @@ static int32_t taosDeleteData(TAOS* taos, void* meta, int32_t metaLen) { goto end; } -// getTbName(req.tableFName); + // getTbName(req.tableFName); char sql[256] = {0}; - sprintf(sql, "delete from `%s` where `%s` >= %" PRId64" and `%s` <= %" PRId64, req.tableFName, req.tsColName, req.skey, req.tsColName, req.ekey); + sprintf(sql, "delete from `%s` where `%s` >= %" PRId64 " and `%s` <= %" PRId64, req.tableFName, req.tsColName, + req.skey, req.tsColName, req.ekey); printf("delete sql:%s\n", sql); - TAOS_RES* res = taos_query(taos, sql); - SRequestObj *pRequest = (SRequestObj *)res; + TAOS_RES* res = taos_query(taos, sql); + SRequestObj* pRequest = (SRequestObj*)res; code = pRequest->code; if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) { code = TSDB_CODE_SUCCESS; @@ -2985,9 +2987,9 @@ static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) { code = TSDB_CODE_SUCCESS; } - if(pRequest->code == TSDB_CODE_SUCCESS){ + if (pRequest->code == TSDB_CODE_SUCCESS) { SExecResult* pRes = &pRequest->body.resInfo.execRes; - if(pRes->res != NULL){ + if (pRes->res != NULL) { code = handleAlterTbExecRes(pRes->res, pCatalog); } } @@ -3001,23 +3003,23 @@ end: return code; } -typedef struct{ +typedef struct { SVgroupInfo vg; - void *data; -}VgData; + void* data; +} VgData; static void destroyVgHash(void* data) { VgData* vgData = (VgData*)data; taosMemoryFreeClear(vgData->data); } -int taos_write_raw_block(TAOS *taos, int rows, char *pData, const char* tbname){ - int32_t code = TSDB_CODE_SUCCESS; +int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname) { + int32_t code = TSDB_CODE_SUCCESS; STableMeta* pTableMeta = NULL; - SQuery *pQuery = NULL; + SQuery* pQuery = NULL; SRequestObj* pRequest = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT); - if(!pRequest){ + if (!pRequest) { uError("WriteRaw:createRequest error request is null"); code = terrno; goto end; @@ -3033,9 +3035,9 @@ int taos_write_raw_block(TAOS *taos, int rows, char *pData, const char* tbname){ strcpy(pName.dbname, pRequest->pDb); strcpy(pName.tname, tbname); - struct SCatalog *pCatalog = NULL; + struct SCatalog* pCatalog = NULL; code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog); - if(code != TSDB_CODE_SUCCESS){ + if (code != TSDB_CODE_SUCCESS) { uError("WriteRaw: get gatlog error"); goto end; } @@ -3060,17 +3062,17 @@ int taos_write_raw_block(TAOS *taos, int rows, char *pData, const char* tbname){ } uint64_t suid = (TSDB_NORMAL_TABLE == pTableMeta->tableType ? 0 : pTableMeta->suid); uint64_t uid = pTableMeta->uid; - int32_t numOfCols = pTableMeta->tableInfo.numOfColumns; + int32_t numOfCols = pTableMeta->tableInfo.numOfColumns; uint16_t fLen = 0; - int32_t rowSize = 0; - int16_t nVar = 0; + int32_t rowSize = 0; + int16_t nVar = 0; for (int i = 0; i < numOfCols; i++) { - SSchema *schema = pTableMeta->schema + i; + SSchema* schema = pTableMeta->schema + i; fLen += TYPE_BYTES[schema->type]; rowSize += schema->bytes; - if(IS_VAR_DATA_TYPE(schema->type)){ - nVar ++; + if (IS_VAR_DATA_TYPE(schema->type)) { + nVar++; } } @@ -3079,22 +3081,22 @@ int taos_write_raw_block(TAOS *taos, int rows, char *pData, const char* tbname){ int32_t schemaLen = 0; int32_t submitLen = sizeof(SSubmitBlk) + schemaLen + rows * extendedRowSize; - int32_t totalLen = sizeof(SSubmitReq) + submitLen; + int32_t totalLen = sizeof(SSubmitReq) + submitLen; SSubmitReq* subReq = taosMemoryCalloc(1, totalLen); SSubmitBlk* blk = POINTER_SHIFT(subReq, sizeof(SSubmitReq)); - void* blkSchema = POINTER_SHIFT(blk, sizeof(SSubmitBlk)); - STSRow* rowData = POINTER_SHIFT(blkSchema, schemaLen); + void* blkSchema = POINTER_SHIFT(blk, sizeof(SSubmitBlk)); + STSRow* rowData = POINTER_SHIFT(blkSchema, schemaLen); SRowBuilder rb = {0}; tdSRowInit(&rb, pTableMeta->sversion); tdSRowSetTpInfo(&rb, numOfCols, fLen); int32_t dataLen = 0; - char* pStart = pData + sizeof(int32_t) + sizeof(uint64_t) + numOfCols * (sizeof(int16_t) + sizeof(int32_t)); + char* pStart = pData + sizeof(int32_t) + sizeof(uint64_t) + numOfCols * (sizeof(int16_t) + sizeof(int32_t)); int32_t* colLength = (int32_t*)pStart; pStart += sizeof(int32_t) * numOfCols; - SResultColumn *pCol = taosMemoryCalloc(numOfCols, sizeof(SResultColumn)); + SResultColumn* pCol = taosMemoryCalloc(numOfCols, sizeof(SResultColumn)); for (int32_t i = 0; i < numOfCols; ++i) { if (IS_VAR_DATA_TYPE(pTableMeta->schema[i].type)) { @@ -3113,7 +3115,7 @@ int taos_write_raw_block(TAOS *taos, int rows, char *pData, const char* tbname){ tdSRowResetBuf(&rb, rowData); int32_t offset = 0; for (int32_t k = 0; k < numOfCols; k++) { - const SSchema* pColumn = &pTableMeta->schema[k]; + const SSchema* pColumn = &pTableMeta->schema[k]; if (IS_VAR_DATA_TYPE(pColumn->type)) { if (pCol[k].offset[j] != -1) { @@ -3159,17 +3161,17 @@ int taos_write_raw_block(TAOS *taos, int rows, char *pData, const char* tbname){ pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE; pQuery->haveResultSet = false; pQuery->msgType = TDMT_VND_SUBMIT; - pQuery->pRoot = (SNode *)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT); + pQuery->pRoot = (SNode*)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT); if (NULL == pQuery->pRoot) { uError("create pQuery->pRoot error"); code = TSDB_CODE_OUT_OF_MEMORY; goto end; } - SVnodeModifOpStmt *nodeStmt = (SVnodeModifOpStmt *)(pQuery->pRoot); + SVnodeModifOpStmt* nodeStmt = (SVnodeModifOpStmt*)(pQuery->pRoot); nodeStmt->payloadType = PAYLOAD_TYPE_KV; nodeStmt->pDataBlocks = taosArrayInit(1, POINTER_BYTES); - SVgDataBlocks *dst = taosMemoryCalloc(1, sizeof(SVgDataBlocks)); + SVgDataBlocks* dst = taosMemoryCalloc(1, sizeof(SVgDataBlocks)); if (NULL == dst) { code = TSDB_CODE_TSC_OUT_OF_MEMORY; goto end; @@ -3183,7 +3185,7 @@ int taos_write_raw_block(TAOS *taos, int rows, char *pData, const char* tbname){ subReq->header.contLen = htonl(subReq->length); subReq->length = htonl(subReq->length); subReq->numOfBlocks = htonl(subReq->numOfBlocks); - subReq = NULL; // no need free + subReq = NULL; // no need free taosArrayPush(nodeStmt->pDataBlocks, &dst); launchQueryImpl(pRequest, pQuery, true, NULL); @@ -3195,16 +3197,16 @@ end: return code; } -static int32_t tmqWriteRaw(TAOS *taos, void* data, int32_t dataLen){ - int32_t code = TSDB_CODE_SUCCESS; - SHashObj *pVgHash = NULL; - SQuery *pQuery = NULL; +static int32_t tmqWriteRaw(TAOS* taos, void* data, int32_t dataLen) { + int32_t code = TSDB_CODE_SUCCESS; + SHashObj* pVgHash = NULL; + SQuery* pQuery = NULL; SMqRspObj rspObj = {0}; - SDecoder decoder = {0}; + SDecoder decoder = {0}; terrno = TSDB_CODE_SUCCESS; SRequestObj* pRequest = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT); - if(!pRequest){ + if (!pRequest) { uError("WriteRaw:createRequest error request is null"); return terrno; } @@ -3214,7 +3216,7 @@ static int32_t tmqWriteRaw(TAOS *taos, void* data, int32_t dataLen){ tDecoderInit(&decoder, data, dataLen); code = tDecodeSMqDataRsp(&decoder, &rspObj.rsp); - if (code != 0){ + if (code != 0) { uError("WriteRaw:decode smqDataRsp error"); code = TSDB_CODE_INVALID_MSG; goto end; @@ -3228,9 +3230,9 @@ static int32_t tmqWriteRaw(TAOS *taos, void* data, int32_t dataLen){ pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); taosHashSetFreeFp(pVgHash, destroyVgHash); - struct SCatalog *pCatalog = NULL; + struct SCatalog* pCatalog = NULL; code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog); - if(code != TSDB_CODE_SUCCESS){ + if (code != TSDB_CODE_SUCCESS) { uError("WriteRaw: get gatlog error"); goto end; } @@ -3252,20 +3254,20 @@ static int32_t tmqWriteRaw(TAOS *taos, void* data, int32_t dataLen){ setResSchemaInfo(&rspObj.resInfo, pSW->pSchema, pSW->nCols); code = setQueryResultFromRsp(&rspObj.resInfo, pRetrieve, false, false); - if(code != TSDB_CODE_SUCCESS){ + if (code != TSDB_CODE_SUCCESS) { uError("WriteRaw: setQueryResultFromRsp error"); goto end; } uint16_t fLen = 0; - int32_t rowSize = 0; - int16_t nVar = 0; + int32_t rowSize = 0; + int16_t nVar = 0; for (int i = 0; i < pSW->nCols; i++) { - SSchema *schema = pSW->pSchema + i; + SSchema* schema = pSW->pSchema + i; fLen += TYPE_BYTES[schema->type]; rowSize += schema->bytes; - if(IS_VAR_DATA_TYPE(schema->type)){ - nVar ++; + if (IS_VAR_DATA_TYPE(schema->type)) { + nVar++; } } @@ -3276,7 +3278,7 @@ static int32_t tmqWriteRaw(TAOS *taos, void* data, int32_t dataLen){ int32_t submitLen = sizeof(SSubmitBlk) + schemaLen + rows * extendedRowSize; const char* tbName = (const char*)taosArrayGetP(rspObj.rsp.blockTbName, rspObj.resIter); - if(!tbName){ + if (!tbName) { uError("WriteRaw: tbname is null"); code = TSDB_CODE_TMQ_INVALID_MSG; goto end; @@ -3296,12 +3298,12 @@ static int32_t tmqWriteRaw(TAOS *taos, void* data, int32_t dataLen){ SSubmitReq* subReq = NULL; SSubmitBlk* blk = NULL; - void *hData = taosHashGet(pVgHash, &vgData.vg.vgId, sizeof(vgData.vg.vgId)); - if(hData){ + void* hData = taosHashGet(pVgHash, &vgData.vg.vgId, sizeof(vgData.vg.vgId)); + if (hData) { vgData = *(VgData*)hData; int32_t totalLen = ((SSubmitReq*)(vgData.data))->length + submitLen; - void *tmp = taosMemoryRealloc(vgData.data, totalLen); + void* tmp = taosMemoryRealloc(vgData.data, totalLen); if (tmp == NULL) { code = TSDB_CODE_TSC_OUT_OF_MEMORY; goto end; @@ -3310,15 +3312,15 @@ static int32_t tmqWriteRaw(TAOS *taos, void* data, int32_t dataLen){ ((VgData*)hData)->data = tmp; subReq = (SSubmitReq*)(vgData.data); blk = POINTER_SHIFT(vgData.data, subReq->length); - }else{ + } else { int32_t totalLen = sizeof(SSubmitReq) + submitLen; - void *tmp = taosMemoryCalloc(1, totalLen); + void* tmp = taosMemoryCalloc(1, totalLen); if (tmp == NULL) { code = TSDB_CODE_TSC_OUT_OF_MEMORY; goto end; } vgData.data = tmp; - taosHashPut(pVgHash, (const char *)&vgData.vg.vgId, sizeof(vgData.vg.vgId), (char *)&vgData, sizeof(vgData)); + taosHashPut(pVgHash, (const char*)&vgData.vg.vgId, sizeof(vgData.vg.vgId), (char*)&vgData, sizeof(vgData)); subReq = (SSubmitReq*)(vgData.data); subReq->length = sizeof(SSubmitReq); subReq->numOfBlocks = 0; @@ -3336,7 +3338,7 @@ static int32_t tmqWriteRaw(TAOS *taos, void* data, int32_t dataLen){ uint64_t uid = pTableMeta->uid; taosMemoryFreeClear(pTableMeta); - void* blkSchema = POINTER_SHIFT(blk, sizeof(SSubmitBlk)); + void* blkSchema = POINTER_SHIFT(blk, sizeof(SSubmitBlk)); STSRow* rowData = POINTER_SHIFT(blkSchema, schemaLen); SRowBuilder rb = {0}; @@ -3352,12 +3354,12 @@ static int32_t tmqWriteRaw(TAOS *taos, void* data, int32_t dataLen){ int32_t offset = 0; for (int32_t k = 0; k < pSW->nCols; k++) { - const SSchema* pColumn = &pSW->pSchema[k]; - char *data = rspObj.resInfo.row[k]; + const SSchema* pColumn = &pSW->pSchema[k]; + char* data = rspObj.resInfo.row[k]; if (!data) { tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, offset, k); } else { - if(IS_VAR_DATA_TYPE(pColumn->type)){ + if (IS_VAR_DATA_TYPE(pColumn->type)) { data -= VARSTR_HEADER_SIZE; } tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, data, true, offset, k); @@ -3389,21 +3391,21 @@ static int32_t tmqWriteRaw(TAOS *taos, void* data, int32_t dataLen){ pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE; pQuery->haveResultSet = false; pQuery->msgType = TDMT_VND_SUBMIT; - pQuery->pRoot = (SNode *)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT); + pQuery->pRoot = (SNode*)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT); if (NULL == pQuery->pRoot) { uError("create pQuery->pRoot error"); code = TSDB_CODE_OUT_OF_MEMORY; goto end; } - SVnodeModifOpStmt *nodeStmt = (SVnodeModifOpStmt *)(pQuery->pRoot); + SVnodeModifOpStmt* nodeStmt = (SVnodeModifOpStmt*)(pQuery->pRoot); nodeStmt->payloadType = PAYLOAD_TYPE_KV; int32_t numOfVg = taosHashGetSize(pVgHash); nodeStmt->pDataBlocks = taosArrayInit(numOfVg, POINTER_BYTES); - VgData *vData = (VgData *)taosHashIterate(pVgHash, NULL); + VgData* vData = (VgData*)taosHashIterate(pVgHash, NULL); while (vData) { - SVgDataBlocks *dst = taosMemoryCalloc(1, sizeof(SVgDataBlocks)); + SVgDataBlocks* dst = taosMemoryCalloc(1, sizeof(SVgDataBlocks)); if (NULL == dst) { code = TSDB_CODE_TSC_OUT_OF_MEMORY; goto end; @@ -3413,14 +3415,14 @@ static int32_t tmqWriteRaw(TAOS *taos, void* data, int32_t dataLen){ dst->numOfTables = subReq->numOfBlocks; dst->size = subReq->length; dst->pData = (char*)subReq; - vData->data = NULL; // no need free + vData->data = NULL; // no need free subReq->header.vgId = htonl(dst->vg.vgId); subReq->version = htonl(1); subReq->header.contLen = htonl(subReq->length); subReq->length = htonl(subReq->length); subReq->numOfBlocks = htonl(subReq->numOfBlocks); taosArrayPush(nodeStmt->pDataBlocks, &dst); - vData = (VgData *)taosHashIterate(pVgHash, vData); + vData = (VgData*)taosHashIterate(pVgHash, vData); } launchQueryImpl(pRequest, pQuery, true, NULL); @@ -3459,8 +3461,8 @@ char* tmq_get_json_meta(TAOS_RES* res) { void tmq_free_json_meta(char* jsonMeta) { taosMemoryFreeClear(jsonMeta); } -int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data *raw) { - if (!raw || !res){ +int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data* raw) { + if (!raw || !res) { return TSDB_CODE_INVALID_PARA; } if (TD_RES_TMQ_META(res)) { @@ -3468,8 +3470,8 @@ int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data *raw) { raw->raw = pMetaRspObj->metaRsp.metaRsp; raw->raw_len = pMetaRspObj->metaRsp.metaRspLen; raw->raw_type = pMetaRspObj->metaRsp.resMsgType; - } else if(TD_RES_TMQ(res)){ - SMqRspObj *rspObj = ((SMqRspObj*)res); + } else if (TD_RES_TMQ(res)) { + SMqRspObj* rspObj = ((SMqRspObj*)res); int32_t len = 0; int32_t code = 0; @@ -3478,7 +3480,7 @@ int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data *raw) { return -1; } - void *buf = taosMemoryCalloc(1, len); + void* buf = taosMemoryCalloc(1, len); SEncoder encoder = {0}; tEncoderInit(&encoder, buf, len); tEncodeSMqDataRsp(&encoder, &rspObj->rsp); @@ -3494,31 +3496,31 @@ int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data *raw) { } void tmq_free_raw(tmq_raw_data raw) { - if (raw.raw_type == RES_TYPE__TMQ){ + if (raw.raw_type == RES_TYPE__TMQ) { taosMemoryFree(raw.raw); } } -int32_t tmq_write_raw(TAOS *taos, tmq_raw_data raw){ +int32_t tmq_write_raw(TAOS* taos, tmq_raw_data raw) { if (!taos) { return TSDB_CODE_INVALID_PARA; } - if(raw.raw_type == TDMT_VND_CREATE_STB) { + if (raw.raw_type == TDMT_VND_CREATE_STB) { return taosCreateStb(taos, raw.raw, raw.raw_len); - }else if(raw.raw_type == TDMT_VND_ALTER_STB){ + } else if (raw.raw_type == TDMT_VND_ALTER_STB) { return taosCreateStb(taos, raw.raw, raw.raw_len); - }else if(raw.raw_type == TDMT_VND_DROP_STB){ + } else if (raw.raw_type == TDMT_VND_DROP_STB) { return taosDropStb(taos, raw.raw, raw.raw_len); - }else if(raw.raw_type == TDMT_VND_CREATE_TABLE){ + } else if (raw.raw_type == TDMT_VND_CREATE_TABLE) { return taosCreateTable(taos, raw.raw, raw.raw_len); - }else if(raw.raw_type == TDMT_VND_ALTER_TABLE){ + } else if (raw.raw_type == TDMT_VND_ALTER_TABLE) { return taosAlterTable(taos, raw.raw, raw.raw_len); - }else if(raw.raw_type == TDMT_VND_DROP_TABLE) { + } else if (raw.raw_type == TDMT_VND_DROP_TABLE) { return taosDropTable(taos, raw.raw, raw.raw_len); - }else if(raw.raw_type == TDMT_VND_DELETE){ + } else if (raw.raw_type == TDMT_VND_DELETE) { return taosDeleteData(taos, raw.raw, raw.raw_len); - }else if(raw.raw_type == RES_TYPE__TMQ){ + } else if (raw.raw_type == RES_TYPE__TMQ) { return tmqWriteRaw(taos, raw.raw, raw.raw_len); } return TSDB_CODE_INVALID_PARA;