Merge pull request #15718 from taosdata/feature/stream
fix(tmq): race condition
This commit is contained in:
commit
9fc8cfc3b5
|
@ -504,15 +504,16 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT
|
||||||
pMsgSendInfo->requestId = generateRequestId();
|
pMsgSendInfo->requestId = generateRequestId();
|
||||||
pMsgSendInfo->requestObjRefId = 0;
|
pMsgSendInfo->requestObjRefId = 0;
|
||||||
pMsgSendInfo->param = pParam;
|
pMsgSendInfo->param = pParam;
|
||||||
pMsgSendInfo->paramFreeFp = taosMemoryFree;
|
pMsgSendInfo->paramFreeFp = taosMemoryFree;
|
||||||
pMsgSendInfo->fp = tmqCommitCb2;
|
pMsgSendInfo->fp = tmqCommitCb2;
|
||||||
pMsgSendInfo->msgType = TDMT_VND_MQ_COMMIT_OFFSET;
|
pMsgSendInfo->msgType = TDMT_VND_MQ_COMMIT_OFFSET;
|
||||||
// send msg
|
// send msg
|
||||||
|
|
||||||
|
atomic_add_fetch_32(&pParamSet->waitingRspNum, 1);
|
||||||
|
atomic_add_fetch_32(&pParamSet->totalRspNum, 1);
|
||||||
|
|
||||||
int64_t transporterId = 0;
|
int64_t transporterId = 0;
|
||||||
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, pMsgSendInfo);
|
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, pMsgSendInfo);
|
||||||
pParamSet->waitingRspNum++;
|
|
||||||
pParamSet->totalRspNum++;
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2196,7 +2197,7 @@ static char* buildCreateCTableJson(STag* pTag, char* sname, char* name, SArray*
|
||||||
cJSON* tvalue = NULL;
|
cJSON* tvalue = NULL;
|
||||||
if (IS_VAR_DATA_TYPE(pTagVal->type)) {
|
if (IS_VAR_DATA_TYPE(pTagVal->type)) {
|
||||||
char* buf = taosMemoryCalloc(pTagVal->nData + 3, 1);
|
char* buf = taosMemoryCalloc(pTagVal->nData + 3, 1);
|
||||||
if(!buf) goto end;
|
if (!buf) goto end;
|
||||||
dataConverToStr(buf, pTagVal->type, pTagVal->pData, pTagVal->nData, NULL);
|
dataConverToStr(buf, pTagVal->type, pTagVal->pData, pTagVal->nData, NULL);
|
||||||
tvalue = cJSON_CreateString(buf);
|
tvalue = cJSON_CreateString(buf);
|
||||||
taosMemoryFree(buf);
|
taosMemoryFree(buf);
|
||||||
|
@ -2506,8 +2507,8 @@ static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) {
|
||||||
|
|
||||||
launchQueryImpl(pRequest, &pQuery, true, NULL);
|
launchQueryImpl(pRequest, &pQuery, true, NULL);
|
||||||
|
|
||||||
if(pRequest->code == TSDB_CODE_SUCCESS){
|
if (pRequest->code == TSDB_CODE_SUCCESS) {
|
||||||
SCatalog* pCatalog = NULL;
|
SCatalog* pCatalog = NULL;
|
||||||
catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
|
catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
|
||||||
catalogRemoveTableMeta(pCatalog, &tableName);
|
catalogRemoveTableMeta(pCatalog, &tableName);
|
||||||
}
|
}
|
||||||
|
@ -2575,8 +2576,8 @@ static int32_t taosDropStb(TAOS* taos, void* meta, int32_t metaLen) {
|
||||||
|
|
||||||
launchQueryImpl(pRequest, &pQuery, true, NULL);
|
launchQueryImpl(pRequest, &pQuery, true, NULL);
|
||||||
|
|
||||||
if(pRequest->code == TSDB_CODE_SUCCESS){
|
if (pRequest->code == TSDB_CODE_SUCCESS) {
|
||||||
SCatalog* pCatalog = NULL;
|
SCatalog* pCatalog = NULL;
|
||||||
catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
|
catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
|
||||||
catalogRemoveTableMeta(pCatalog, &tableName);
|
catalogRemoveTableMeta(pCatalog, &tableName);
|
||||||
}
|
}
|
||||||
|
@ -2695,7 +2696,7 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) {
|
||||||
}
|
}
|
||||||
|
|
||||||
launchQueryImpl(pRequest, pQuery, true, NULL);
|
launchQueryImpl(pRequest, pQuery, true, NULL);
|
||||||
if (pRequest->code == TSDB_CODE_SUCCESS){
|
if (pRequest->code == TSDB_CODE_SUCCESS) {
|
||||||
removeMeta(pTscObj, pRequest->tableList);
|
removeMeta(pTscObj, pRequest->tableList);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2812,7 +2813,7 @@ static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) {
|
||||||
}
|
}
|
||||||
|
|
||||||
launchQueryImpl(pRequest, pQuery, true, NULL);
|
launchQueryImpl(pRequest, pQuery, true, NULL);
|
||||||
if (pRequest->code == TSDB_CODE_SUCCESS){
|
if (pRequest->code == TSDB_CODE_SUCCESS) {
|
||||||
removeMeta(pTscObj, pRequest->tableList);
|
removeMeta(pTscObj, pRequest->tableList);
|
||||||
}
|
}
|
||||||
code = pRequest->code;
|
code = pRequest->code;
|
||||||
|
@ -2827,7 +2828,7 @@ end:
|
||||||
|
|
||||||
// delete from db.tabl where .. -> delete from tabl where ..
|
// delete from db.tabl where .. -> delete from tabl where ..
|
||||||
// delete from db .tabl where .. -> delete from tabl where ..
|
// delete from db .tabl where .. -> delete from tabl where ..
|
||||||
//static void getTbName(char *sql){
|
// static void getTbName(char *sql){
|
||||||
// char *ch = sql;
|
// char *ch = sql;
|
||||||
//
|
//
|
||||||
// bool inBackQuote = false;
|
// bool inBackQuote = false;
|
||||||
|
@ -2858,9 +2859,9 @@ end:
|
||||||
//}
|
//}
|
||||||
|
|
||||||
static int32_t taosDeleteData(TAOS* taos, void* meta, int32_t metaLen) {
|
static int32_t taosDeleteData(TAOS* taos, void* meta, int32_t metaLen) {
|
||||||
SDeleteRes req = {0};
|
SDeleteRes req = {0};
|
||||||
SDecoder coder = {0};
|
SDecoder coder = {0};
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
// decode and process req
|
// decode and process req
|
||||||
void* data = POINTER_SHIFT(meta, sizeof(SMsgHead));
|
void* data = POINTER_SHIFT(meta, sizeof(SMsgHead));
|
||||||
|
@ -2871,13 +2872,14 @@ static int32_t taosDeleteData(TAOS* taos, void* meta, int32_t metaLen) {
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
|
|
||||||
// getTbName(req.tableFName);
|
// getTbName(req.tableFName);
|
||||||
char sql[256] = {0};
|
char sql[256] = {0};
|
||||||
sprintf(sql, "delete from `%s` where `%s` >= %" PRId64" and `%s` <= %" PRId64, req.tableFName, req.tsColName, req.skey, req.tsColName, req.ekey);
|
sprintf(sql, "delete from `%s` where `%s` >= %" PRId64 " and `%s` <= %" PRId64, req.tableFName, req.tsColName,
|
||||||
|
req.skey, req.tsColName, req.ekey);
|
||||||
printf("delete sql:%s\n", sql);
|
printf("delete sql:%s\n", sql);
|
||||||
|
|
||||||
TAOS_RES* res = taos_query(taos, sql);
|
TAOS_RES* res = taos_query(taos, sql);
|
||||||
SRequestObj *pRequest = (SRequestObj *)res;
|
SRequestObj* pRequest = (SRequestObj*)res;
|
||||||
code = pRequest->code;
|
code = pRequest->code;
|
||||||
if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
|
if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
|
||||||
code = TSDB_CODE_SUCCESS;
|
code = TSDB_CODE_SUCCESS;
|
||||||
|
@ -2985,9 +2987,9 @@ static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) {
|
||||||
code = TSDB_CODE_SUCCESS;
|
code = TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
if(pRequest->code == TSDB_CODE_SUCCESS){
|
if (pRequest->code == TSDB_CODE_SUCCESS) {
|
||||||
SExecResult* pRes = &pRequest->body.resInfo.execRes;
|
SExecResult* pRes = &pRequest->body.resInfo.execRes;
|
||||||
if(pRes->res != NULL){
|
if (pRes->res != NULL) {
|
||||||
code = handleAlterTbExecRes(pRes->res, pCatalog);
|
code = handleAlterTbExecRes(pRes->res, pCatalog);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -3001,23 +3003,23 @@ end:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
typedef struct{
|
typedef struct {
|
||||||
SVgroupInfo vg;
|
SVgroupInfo vg;
|
||||||
void *data;
|
void* data;
|
||||||
}VgData;
|
} VgData;
|
||||||
|
|
||||||
static void destroyVgHash(void* data) {
|
static void destroyVgHash(void* data) {
|
||||||
VgData* vgData = (VgData*)data;
|
VgData* vgData = (VgData*)data;
|
||||||
taosMemoryFreeClear(vgData->data);
|
taosMemoryFreeClear(vgData->data);
|
||||||
}
|
}
|
||||||
|
|
||||||
int taos_write_raw_block(TAOS *taos, int rows, char *pData, const char* tbname){
|
int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
STableMeta* pTableMeta = NULL;
|
STableMeta* pTableMeta = NULL;
|
||||||
SQuery *pQuery = NULL;
|
SQuery* pQuery = NULL;
|
||||||
|
|
||||||
SRequestObj* pRequest = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT);
|
SRequestObj* pRequest = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT);
|
||||||
if(!pRequest){
|
if (!pRequest) {
|
||||||
uError("WriteRaw:createRequest error request is null");
|
uError("WriteRaw:createRequest error request is null");
|
||||||
code = terrno;
|
code = terrno;
|
||||||
goto end;
|
goto end;
|
||||||
|
@ -3033,9 +3035,9 @@ int taos_write_raw_block(TAOS *taos, int rows, char *pData, const char* tbname){
|
||||||
strcpy(pName.dbname, pRequest->pDb);
|
strcpy(pName.dbname, pRequest->pDb);
|
||||||
strcpy(pName.tname, tbname);
|
strcpy(pName.tname, tbname);
|
||||||
|
|
||||||
struct SCatalog *pCatalog = NULL;
|
struct SCatalog* pCatalog = NULL;
|
||||||
code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
|
code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
|
||||||
if(code != TSDB_CODE_SUCCESS){
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
uError("WriteRaw: get gatlog error");
|
uError("WriteRaw: get gatlog error");
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
|
@ -3060,17 +3062,17 @@ int taos_write_raw_block(TAOS *taos, int rows, char *pData, const char* tbname){
|
||||||
}
|
}
|
||||||
uint64_t suid = (TSDB_NORMAL_TABLE == pTableMeta->tableType ? 0 : pTableMeta->suid);
|
uint64_t suid = (TSDB_NORMAL_TABLE == pTableMeta->tableType ? 0 : pTableMeta->suid);
|
||||||
uint64_t uid = pTableMeta->uid;
|
uint64_t uid = pTableMeta->uid;
|
||||||
int32_t numOfCols = pTableMeta->tableInfo.numOfColumns;
|
int32_t numOfCols = pTableMeta->tableInfo.numOfColumns;
|
||||||
|
|
||||||
uint16_t fLen = 0;
|
uint16_t fLen = 0;
|
||||||
int32_t rowSize = 0;
|
int32_t rowSize = 0;
|
||||||
int16_t nVar = 0;
|
int16_t nVar = 0;
|
||||||
for (int i = 0; i < numOfCols; i++) {
|
for (int i = 0; i < numOfCols; i++) {
|
||||||
SSchema *schema = pTableMeta->schema + i;
|
SSchema* schema = pTableMeta->schema + i;
|
||||||
fLen += TYPE_BYTES[schema->type];
|
fLen += TYPE_BYTES[schema->type];
|
||||||
rowSize += schema->bytes;
|
rowSize += schema->bytes;
|
||||||
if(IS_VAR_DATA_TYPE(schema->type)){
|
if (IS_VAR_DATA_TYPE(schema->type)) {
|
||||||
nVar ++;
|
nVar++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3079,22 +3081,22 @@ int taos_write_raw_block(TAOS *taos, int rows, char *pData, const char* tbname){
|
||||||
int32_t schemaLen = 0;
|
int32_t schemaLen = 0;
|
||||||
int32_t submitLen = sizeof(SSubmitBlk) + schemaLen + rows * extendedRowSize;
|
int32_t submitLen = sizeof(SSubmitBlk) + schemaLen + rows * extendedRowSize;
|
||||||
|
|
||||||
int32_t totalLen = sizeof(SSubmitReq) + submitLen;
|
int32_t totalLen = sizeof(SSubmitReq) + submitLen;
|
||||||
SSubmitReq* subReq = taosMemoryCalloc(1, totalLen);
|
SSubmitReq* subReq = taosMemoryCalloc(1, totalLen);
|
||||||
SSubmitBlk* blk = POINTER_SHIFT(subReq, sizeof(SSubmitReq));
|
SSubmitBlk* blk = POINTER_SHIFT(subReq, sizeof(SSubmitReq));
|
||||||
void* blkSchema = POINTER_SHIFT(blk, sizeof(SSubmitBlk));
|
void* blkSchema = POINTER_SHIFT(blk, sizeof(SSubmitBlk));
|
||||||
STSRow* rowData = POINTER_SHIFT(blkSchema, schemaLen);
|
STSRow* rowData = POINTER_SHIFT(blkSchema, schemaLen);
|
||||||
|
|
||||||
SRowBuilder rb = {0};
|
SRowBuilder rb = {0};
|
||||||
tdSRowInit(&rb, pTableMeta->sversion);
|
tdSRowInit(&rb, pTableMeta->sversion);
|
||||||
tdSRowSetTpInfo(&rb, numOfCols, fLen);
|
tdSRowSetTpInfo(&rb, numOfCols, fLen);
|
||||||
int32_t dataLen = 0;
|
int32_t dataLen = 0;
|
||||||
|
|
||||||
char* pStart = pData + sizeof(int32_t) + sizeof(uint64_t) + numOfCols * (sizeof(int16_t) + sizeof(int32_t));
|
char* pStart = pData + sizeof(int32_t) + sizeof(uint64_t) + numOfCols * (sizeof(int16_t) + sizeof(int32_t));
|
||||||
int32_t* colLength = (int32_t*)pStart;
|
int32_t* colLength = (int32_t*)pStart;
|
||||||
pStart += sizeof(int32_t) * numOfCols;
|
pStart += sizeof(int32_t) * numOfCols;
|
||||||
|
|
||||||
SResultColumn *pCol = taosMemoryCalloc(numOfCols, sizeof(SResultColumn));
|
SResultColumn* pCol = taosMemoryCalloc(numOfCols, sizeof(SResultColumn));
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||||
if (IS_VAR_DATA_TYPE(pTableMeta->schema[i].type)) {
|
if (IS_VAR_DATA_TYPE(pTableMeta->schema[i].type)) {
|
||||||
|
@ -3113,7 +3115,7 @@ int taos_write_raw_block(TAOS *taos, int rows, char *pData, const char* tbname){
|
||||||
tdSRowResetBuf(&rb, rowData);
|
tdSRowResetBuf(&rb, rowData);
|
||||||
int32_t offset = 0;
|
int32_t offset = 0;
|
||||||
for (int32_t k = 0; k < numOfCols; k++) {
|
for (int32_t k = 0; k < numOfCols; k++) {
|
||||||
const SSchema* pColumn = &pTableMeta->schema[k];
|
const SSchema* pColumn = &pTableMeta->schema[k];
|
||||||
|
|
||||||
if (IS_VAR_DATA_TYPE(pColumn->type)) {
|
if (IS_VAR_DATA_TYPE(pColumn->type)) {
|
||||||
if (pCol[k].offset[j] != -1) {
|
if (pCol[k].offset[j] != -1) {
|
||||||
|
@ -3159,17 +3161,17 @@ int taos_write_raw_block(TAOS *taos, int rows, char *pData, const char* tbname){
|
||||||
pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
|
pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
|
||||||
pQuery->haveResultSet = false;
|
pQuery->haveResultSet = false;
|
||||||
pQuery->msgType = TDMT_VND_SUBMIT;
|
pQuery->msgType = TDMT_VND_SUBMIT;
|
||||||
pQuery->pRoot = (SNode *)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT);
|
pQuery->pRoot = (SNode*)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT);
|
||||||
if (NULL == pQuery->pRoot) {
|
if (NULL == pQuery->pRoot) {
|
||||||
uError("create pQuery->pRoot error");
|
uError("create pQuery->pRoot error");
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
SVnodeModifOpStmt *nodeStmt = (SVnodeModifOpStmt *)(pQuery->pRoot);
|
SVnodeModifOpStmt* nodeStmt = (SVnodeModifOpStmt*)(pQuery->pRoot);
|
||||||
nodeStmt->payloadType = PAYLOAD_TYPE_KV;
|
nodeStmt->payloadType = PAYLOAD_TYPE_KV;
|
||||||
nodeStmt->pDataBlocks = taosArrayInit(1, POINTER_BYTES);
|
nodeStmt->pDataBlocks = taosArrayInit(1, POINTER_BYTES);
|
||||||
|
|
||||||
SVgDataBlocks *dst = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
|
SVgDataBlocks* dst = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
|
||||||
if (NULL == dst) {
|
if (NULL == dst) {
|
||||||
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||||
goto end;
|
goto end;
|
||||||
|
@ -3183,7 +3185,7 @@ int taos_write_raw_block(TAOS *taos, int rows, char *pData, const char* tbname){
|
||||||
subReq->header.contLen = htonl(subReq->length);
|
subReq->header.contLen = htonl(subReq->length);
|
||||||
subReq->length = htonl(subReq->length);
|
subReq->length = htonl(subReq->length);
|
||||||
subReq->numOfBlocks = htonl(subReq->numOfBlocks);
|
subReq->numOfBlocks = htonl(subReq->numOfBlocks);
|
||||||
subReq = NULL; // no need free
|
subReq = NULL; // no need free
|
||||||
taosArrayPush(nodeStmt->pDataBlocks, &dst);
|
taosArrayPush(nodeStmt->pDataBlocks, &dst);
|
||||||
|
|
||||||
launchQueryImpl(pRequest, pQuery, true, NULL);
|
launchQueryImpl(pRequest, pQuery, true, NULL);
|
||||||
|
@ -3195,16 +3197,16 @@ end:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tmqWriteRaw(TAOS *taos, void* data, int32_t dataLen){
|
static int32_t tmqWriteRaw(TAOS* taos, void* data, int32_t dataLen) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
SHashObj *pVgHash = NULL;
|
SHashObj* pVgHash = NULL;
|
||||||
SQuery *pQuery = NULL;
|
SQuery* pQuery = NULL;
|
||||||
SMqRspObj rspObj = {0};
|
SMqRspObj rspObj = {0};
|
||||||
SDecoder decoder = {0};
|
SDecoder decoder = {0};
|
||||||
|
|
||||||
terrno = TSDB_CODE_SUCCESS;
|
terrno = TSDB_CODE_SUCCESS;
|
||||||
SRequestObj* pRequest = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT);
|
SRequestObj* pRequest = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT);
|
||||||
if(!pRequest){
|
if (!pRequest) {
|
||||||
uError("WriteRaw:createRequest error request is null");
|
uError("WriteRaw:createRequest error request is null");
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
@ -3214,7 +3216,7 @@ static int32_t tmqWriteRaw(TAOS *taos, void* data, int32_t dataLen){
|
||||||
|
|
||||||
tDecoderInit(&decoder, data, dataLen);
|
tDecoderInit(&decoder, data, dataLen);
|
||||||
code = tDecodeSMqDataRsp(&decoder, &rspObj.rsp);
|
code = tDecodeSMqDataRsp(&decoder, &rspObj.rsp);
|
||||||
if (code != 0){
|
if (code != 0) {
|
||||||
uError("WriteRaw:decode smqDataRsp error");
|
uError("WriteRaw:decode smqDataRsp error");
|
||||||
code = TSDB_CODE_INVALID_MSG;
|
code = TSDB_CODE_INVALID_MSG;
|
||||||
goto end;
|
goto end;
|
||||||
|
@ -3228,9 +3230,9 @@ static int32_t tmqWriteRaw(TAOS *taos, void* data, int32_t dataLen){
|
||||||
|
|
||||||
pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
|
pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
|
||||||
taosHashSetFreeFp(pVgHash, destroyVgHash);
|
taosHashSetFreeFp(pVgHash, destroyVgHash);
|
||||||
struct SCatalog *pCatalog = NULL;
|
struct SCatalog* pCatalog = NULL;
|
||||||
code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
|
code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
|
||||||
if(code != TSDB_CODE_SUCCESS){
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
uError("WriteRaw: get gatlog error");
|
uError("WriteRaw: get gatlog error");
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
|
@ -3252,20 +3254,20 @@ static int32_t tmqWriteRaw(TAOS *taos, void* data, int32_t dataLen){
|
||||||
setResSchemaInfo(&rspObj.resInfo, pSW->pSchema, pSW->nCols);
|
setResSchemaInfo(&rspObj.resInfo, pSW->pSchema, pSW->nCols);
|
||||||
|
|
||||||
code = setQueryResultFromRsp(&rspObj.resInfo, pRetrieve, false, false);
|
code = setQueryResultFromRsp(&rspObj.resInfo, pRetrieve, false, false);
|
||||||
if(code != TSDB_CODE_SUCCESS){
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
uError("WriteRaw: setQueryResultFromRsp error");
|
uError("WriteRaw: setQueryResultFromRsp error");
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
|
|
||||||
uint16_t fLen = 0;
|
uint16_t fLen = 0;
|
||||||
int32_t rowSize = 0;
|
int32_t rowSize = 0;
|
||||||
int16_t nVar = 0;
|
int16_t nVar = 0;
|
||||||
for (int i = 0; i < pSW->nCols; i++) {
|
for (int i = 0; i < pSW->nCols; i++) {
|
||||||
SSchema *schema = pSW->pSchema + i;
|
SSchema* schema = pSW->pSchema + i;
|
||||||
fLen += TYPE_BYTES[schema->type];
|
fLen += TYPE_BYTES[schema->type];
|
||||||
rowSize += schema->bytes;
|
rowSize += schema->bytes;
|
||||||
if(IS_VAR_DATA_TYPE(schema->type)){
|
if (IS_VAR_DATA_TYPE(schema->type)) {
|
||||||
nVar ++;
|
nVar++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3276,7 +3278,7 @@ static int32_t tmqWriteRaw(TAOS *taos, void* data, int32_t dataLen){
|
||||||
int32_t submitLen = sizeof(SSubmitBlk) + schemaLen + rows * extendedRowSize;
|
int32_t submitLen = sizeof(SSubmitBlk) + schemaLen + rows * extendedRowSize;
|
||||||
|
|
||||||
const char* tbName = (const char*)taosArrayGetP(rspObj.rsp.blockTbName, rspObj.resIter);
|
const char* tbName = (const char*)taosArrayGetP(rspObj.rsp.blockTbName, rspObj.resIter);
|
||||||
if(!tbName){
|
if (!tbName) {
|
||||||
uError("WriteRaw: tbname is null");
|
uError("WriteRaw: tbname is null");
|
||||||
code = TSDB_CODE_TMQ_INVALID_MSG;
|
code = TSDB_CODE_TMQ_INVALID_MSG;
|
||||||
goto end;
|
goto end;
|
||||||
|
@ -3296,12 +3298,12 @@ static int32_t tmqWriteRaw(TAOS *taos, void* data, int32_t dataLen){
|
||||||
|
|
||||||
SSubmitReq* subReq = NULL;
|
SSubmitReq* subReq = NULL;
|
||||||
SSubmitBlk* blk = NULL;
|
SSubmitBlk* blk = NULL;
|
||||||
void *hData = taosHashGet(pVgHash, &vgData.vg.vgId, sizeof(vgData.vg.vgId));
|
void* hData = taosHashGet(pVgHash, &vgData.vg.vgId, sizeof(vgData.vg.vgId));
|
||||||
if(hData){
|
if (hData) {
|
||||||
vgData = *(VgData*)hData;
|
vgData = *(VgData*)hData;
|
||||||
|
|
||||||
int32_t totalLen = ((SSubmitReq*)(vgData.data))->length + submitLen;
|
int32_t totalLen = ((SSubmitReq*)(vgData.data))->length + submitLen;
|
||||||
void *tmp = taosMemoryRealloc(vgData.data, totalLen);
|
void* tmp = taosMemoryRealloc(vgData.data, totalLen);
|
||||||
if (tmp == NULL) {
|
if (tmp == NULL) {
|
||||||
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||||
goto end;
|
goto end;
|
||||||
|
@ -3310,15 +3312,15 @@ static int32_t tmqWriteRaw(TAOS *taos, void* data, int32_t dataLen){
|
||||||
((VgData*)hData)->data = tmp;
|
((VgData*)hData)->data = tmp;
|
||||||
subReq = (SSubmitReq*)(vgData.data);
|
subReq = (SSubmitReq*)(vgData.data);
|
||||||
blk = POINTER_SHIFT(vgData.data, subReq->length);
|
blk = POINTER_SHIFT(vgData.data, subReq->length);
|
||||||
}else{
|
} else {
|
||||||
int32_t totalLen = sizeof(SSubmitReq) + submitLen;
|
int32_t totalLen = sizeof(SSubmitReq) + submitLen;
|
||||||
void *tmp = taosMemoryCalloc(1, totalLen);
|
void* tmp = taosMemoryCalloc(1, totalLen);
|
||||||
if (tmp == NULL) {
|
if (tmp == NULL) {
|
||||||
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
vgData.data = tmp;
|
vgData.data = tmp;
|
||||||
taosHashPut(pVgHash, (const char *)&vgData.vg.vgId, sizeof(vgData.vg.vgId), (char *)&vgData, sizeof(vgData));
|
taosHashPut(pVgHash, (const char*)&vgData.vg.vgId, sizeof(vgData.vg.vgId), (char*)&vgData, sizeof(vgData));
|
||||||
subReq = (SSubmitReq*)(vgData.data);
|
subReq = (SSubmitReq*)(vgData.data);
|
||||||
subReq->length = sizeof(SSubmitReq);
|
subReq->length = sizeof(SSubmitReq);
|
||||||
subReq->numOfBlocks = 0;
|
subReq->numOfBlocks = 0;
|
||||||
|
@ -3336,7 +3338,7 @@ static int32_t tmqWriteRaw(TAOS *taos, void* data, int32_t dataLen){
|
||||||
uint64_t uid = pTableMeta->uid;
|
uint64_t uid = pTableMeta->uid;
|
||||||
taosMemoryFreeClear(pTableMeta);
|
taosMemoryFreeClear(pTableMeta);
|
||||||
|
|
||||||
void* blkSchema = POINTER_SHIFT(blk, sizeof(SSubmitBlk));
|
void* blkSchema = POINTER_SHIFT(blk, sizeof(SSubmitBlk));
|
||||||
STSRow* rowData = POINTER_SHIFT(blkSchema, schemaLen);
|
STSRow* rowData = POINTER_SHIFT(blkSchema, schemaLen);
|
||||||
|
|
||||||
SRowBuilder rb = {0};
|
SRowBuilder rb = {0};
|
||||||
|
@ -3352,12 +3354,12 @@ static int32_t tmqWriteRaw(TAOS *taos, void* data, int32_t dataLen){
|
||||||
|
|
||||||
int32_t offset = 0;
|
int32_t offset = 0;
|
||||||
for (int32_t k = 0; k < pSW->nCols; k++) {
|
for (int32_t k = 0; k < pSW->nCols; k++) {
|
||||||
const SSchema* pColumn = &pSW->pSchema[k];
|
const SSchema* pColumn = &pSW->pSchema[k];
|
||||||
char *data = rspObj.resInfo.row[k];
|
char* data = rspObj.resInfo.row[k];
|
||||||
if (!data) {
|
if (!data) {
|
||||||
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, offset, k);
|
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, offset, k);
|
||||||
} else {
|
} else {
|
||||||
if(IS_VAR_DATA_TYPE(pColumn->type)){
|
if (IS_VAR_DATA_TYPE(pColumn->type)) {
|
||||||
data -= VARSTR_HEADER_SIZE;
|
data -= VARSTR_HEADER_SIZE;
|
||||||
}
|
}
|
||||||
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, data, true, offset, k);
|
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, data, true, offset, k);
|
||||||
|
@ -3389,21 +3391,21 @@ static int32_t tmqWriteRaw(TAOS *taos, void* data, int32_t dataLen){
|
||||||
pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
|
pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
|
||||||
pQuery->haveResultSet = false;
|
pQuery->haveResultSet = false;
|
||||||
pQuery->msgType = TDMT_VND_SUBMIT;
|
pQuery->msgType = TDMT_VND_SUBMIT;
|
||||||
pQuery->pRoot = (SNode *)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT);
|
pQuery->pRoot = (SNode*)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT);
|
||||||
if (NULL == pQuery->pRoot) {
|
if (NULL == pQuery->pRoot) {
|
||||||
uError("create pQuery->pRoot error");
|
uError("create pQuery->pRoot error");
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
SVnodeModifOpStmt *nodeStmt = (SVnodeModifOpStmt *)(pQuery->pRoot);
|
SVnodeModifOpStmt* nodeStmt = (SVnodeModifOpStmt*)(pQuery->pRoot);
|
||||||
nodeStmt->payloadType = PAYLOAD_TYPE_KV;
|
nodeStmt->payloadType = PAYLOAD_TYPE_KV;
|
||||||
|
|
||||||
int32_t numOfVg = taosHashGetSize(pVgHash);
|
int32_t numOfVg = taosHashGetSize(pVgHash);
|
||||||
nodeStmt->pDataBlocks = taosArrayInit(numOfVg, POINTER_BYTES);
|
nodeStmt->pDataBlocks = taosArrayInit(numOfVg, POINTER_BYTES);
|
||||||
|
|
||||||
VgData *vData = (VgData *)taosHashIterate(pVgHash, NULL);
|
VgData* vData = (VgData*)taosHashIterate(pVgHash, NULL);
|
||||||
while (vData) {
|
while (vData) {
|
||||||
SVgDataBlocks *dst = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
|
SVgDataBlocks* dst = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
|
||||||
if (NULL == dst) {
|
if (NULL == dst) {
|
||||||
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||||
goto end;
|
goto end;
|
||||||
|
@ -3413,14 +3415,14 @@ static int32_t tmqWriteRaw(TAOS *taos, void* data, int32_t dataLen){
|
||||||
dst->numOfTables = subReq->numOfBlocks;
|
dst->numOfTables = subReq->numOfBlocks;
|
||||||
dst->size = subReq->length;
|
dst->size = subReq->length;
|
||||||
dst->pData = (char*)subReq;
|
dst->pData = (char*)subReq;
|
||||||
vData->data = NULL; // no need free
|
vData->data = NULL; // no need free
|
||||||
subReq->header.vgId = htonl(dst->vg.vgId);
|
subReq->header.vgId = htonl(dst->vg.vgId);
|
||||||
subReq->version = htonl(1);
|
subReq->version = htonl(1);
|
||||||
subReq->header.contLen = htonl(subReq->length);
|
subReq->header.contLen = htonl(subReq->length);
|
||||||
subReq->length = htonl(subReq->length);
|
subReq->length = htonl(subReq->length);
|
||||||
subReq->numOfBlocks = htonl(subReq->numOfBlocks);
|
subReq->numOfBlocks = htonl(subReq->numOfBlocks);
|
||||||
taosArrayPush(nodeStmt->pDataBlocks, &dst);
|
taosArrayPush(nodeStmt->pDataBlocks, &dst);
|
||||||
vData = (VgData *)taosHashIterate(pVgHash, vData);
|
vData = (VgData*)taosHashIterate(pVgHash, vData);
|
||||||
}
|
}
|
||||||
|
|
||||||
launchQueryImpl(pRequest, pQuery, true, NULL);
|
launchQueryImpl(pRequest, pQuery, true, NULL);
|
||||||
|
@ -3459,8 +3461,8 @@ char* tmq_get_json_meta(TAOS_RES* res) {
|
||||||
|
|
||||||
void tmq_free_json_meta(char* jsonMeta) { taosMemoryFreeClear(jsonMeta); }
|
void tmq_free_json_meta(char* jsonMeta) { taosMemoryFreeClear(jsonMeta); }
|
||||||
|
|
||||||
int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data *raw) {
|
int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data* raw) {
|
||||||
if (!raw || !res){
|
if (!raw || !res) {
|
||||||
return TSDB_CODE_INVALID_PARA;
|
return TSDB_CODE_INVALID_PARA;
|
||||||
}
|
}
|
||||||
if (TD_RES_TMQ_META(res)) {
|
if (TD_RES_TMQ_META(res)) {
|
||||||
|
@ -3468,8 +3470,8 @@ int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data *raw) {
|
||||||
raw->raw = pMetaRspObj->metaRsp.metaRsp;
|
raw->raw = pMetaRspObj->metaRsp.metaRsp;
|
||||||
raw->raw_len = pMetaRspObj->metaRsp.metaRspLen;
|
raw->raw_len = pMetaRspObj->metaRsp.metaRspLen;
|
||||||
raw->raw_type = pMetaRspObj->metaRsp.resMsgType;
|
raw->raw_type = pMetaRspObj->metaRsp.resMsgType;
|
||||||
} else if(TD_RES_TMQ(res)){
|
} else if (TD_RES_TMQ(res)) {
|
||||||
SMqRspObj *rspObj = ((SMqRspObj*)res);
|
SMqRspObj* rspObj = ((SMqRspObj*)res);
|
||||||
|
|
||||||
int32_t len = 0;
|
int32_t len = 0;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
@ -3478,7 +3480,7 @@ int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data *raw) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
void *buf = taosMemoryCalloc(1, len);
|
void* buf = taosMemoryCalloc(1, len);
|
||||||
SEncoder encoder = {0};
|
SEncoder encoder = {0};
|
||||||
tEncoderInit(&encoder, buf, len);
|
tEncoderInit(&encoder, buf, len);
|
||||||
tEncodeSMqDataRsp(&encoder, &rspObj->rsp);
|
tEncodeSMqDataRsp(&encoder, &rspObj->rsp);
|
||||||
|
@ -3494,31 +3496,31 @@ int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data *raw) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void tmq_free_raw(tmq_raw_data raw) {
|
void tmq_free_raw(tmq_raw_data raw) {
|
||||||
if (raw.raw_type == RES_TYPE__TMQ){
|
if (raw.raw_type == RES_TYPE__TMQ) {
|
||||||
taosMemoryFree(raw.raw);
|
taosMemoryFree(raw.raw);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tmq_write_raw(TAOS *taos, tmq_raw_data raw){
|
int32_t tmq_write_raw(TAOS* taos, tmq_raw_data raw) {
|
||||||
if (!taos) {
|
if (!taos) {
|
||||||
return TSDB_CODE_INVALID_PARA;
|
return TSDB_CODE_INVALID_PARA;
|
||||||
}
|
}
|
||||||
|
|
||||||
if(raw.raw_type == TDMT_VND_CREATE_STB) {
|
if (raw.raw_type == TDMT_VND_CREATE_STB) {
|
||||||
return taosCreateStb(taos, raw.raw, raw.raw_len);
|
return taosCreateStb(taos, raw.raw, raw.raw_len);
|
||||||
}else if(raw.raw_type == TDMT_VND_ALTER_STB){
|
} else if (raw.raw_type == TDMT_VND_ALTER_STB) {
|
||||||
return taosCreateStb(taos, raw.raw, raw.raw_len);
|
return taosCreateStb(taos, raw.raw, raw.raw_len);
|
||||||
}else if(raw.raw_type == TDMT_VND_DROP_STB){
|
} else if (raw.raw_type == TDMT_VND_DROP_STB) {
|
||||||
return taosDropStb(taos, raw.raw, raw.raw_len);
|
return taosDropStb(taos, raw.raw, raw.raw_len);
|
||||||
}else if(raw.raw_type == TDMT_VND_CREATE_TABLE){
|
} else if (raw.raw_type == TDMT_VND_CREATE_TABLE) {
|
||||||
return taosCreateTable(taos, raw.raw, raw.raw_len);
|
return taosCreateTable(taos, raw.raw, raw.raw_len);
|
||||||
}else if(raw.raw_type == TDMT_VND_ALTER_TABLE){
|
} else if (raw.raw_type == TDMT_VND_ALTER_TABLE) {
|
||||||
return taosAlterTable(taos, raw.raw, raw.raw_len);
|
return taosAlterTable(taos, raw.raw, raw.raw_len);
|
||||||
}else if(raw.raw_type == TDMT_VND_DROP_TABLE) {
|
} else if (raw.raw_type == TDMT_VND_DROP_TABLE) {
|
||||||
return taosDropTable(taos, raw.raw, raw.raw_len);
|
return taosDropTable(taos, raw.raw, raw.raw_len);
|
||||||
}else if(raw.raw_type == TDMT_VND_DELETE){
|
} else if (raw.raw_type == TDMT_VND_DELETE) {
|
||||||
return taosDeleteData(taos, raw.raw, raw.raw_len);
|
return taosDeleteData(taos, raw.raw, raw.raw_len);
|
||||||
}else if(raw.raw_type == RES_TYPE__TMQ){
|
} else if (raw.raw_type == RES_TYPE__TMQ) {
|
||||||
return tmqWriteRaw(taos, raw.raw, raw.raw_len);
|
return tmqWriteRaw(taos, raw.raw, raw.raw_len);
|
||||||
}
|
}
|
||||||
return TSDB_CODE_INVALID_PARA;
|
return TSDB_CODE_INVALID_PARA;
|
||||||
|
|
Loading…
Reference in New Issue