feat:add ts,table name from SDeleterRes

This commit is contained in:
wangmm0220 2022-07-27 17:11:57 +08:00
parent 0952c98609
commit 2ba6ac5fba
11 changed files with 51 additions and 47 deletions

View File

@ -3044,7 +3044,8 @@ typedef struct SDeleteRes {
int64_t skey; int64_t skey;
int64_t ekey; int64_t ekey;
int64_t affectedRows; int64_t affectedRows;
char tableFName[TSDB_TABLE_FNAME_LEN]; char tableFName[TSDB_TABLE_NAME_LEN];
char tsColName[TSDB_COL_NAME_LEN];
} SDeleteRes; } SDeleteRes;
int32_t tEncodeDeleteRes(SEncoder* pCoder, const SDeleteRes* pRes); int32_t tEncodeDeleteRes(SEncoder* pCoder, const SDeleteRes* pRes);

View File

@ -38,7 +38,8 @@ typedef struct SDeleterRes {
int64_t skey; int64_t skey;
int64_t ekey; int64_t ekey;
int64_t affectedRows; int64_t affectedRows;
char tableFName[TSDB_TABLE_FNAME_LEN]; char tableName[TSDB_TABLE_NAME_LEN];
char tsColName[TSDB_COL_NAME_LEN];
} SDeleterRes; } SDeleterRes;
typedef struct SDeleterParam { typedef struct SDeleterParam {

View File

@ -151,7 +151,7 @@ typedef struct SVnodeModifyLogicNode {
uint64_t tableId; uint64_t tableId;
uint64_t stableId; uint64_t stableId;
int8_t tableType; // table type int8_t tableType; // table type
char tableFName[TSDB_TABLE_FNAME_LEN]; char tableName[TSDB_TABLE_NAME_LEN];
char tsColName[TSDB_COL_NAME_LEN]; char tsColName[TSDB_COL_NAME_LEN];
STimeWindow deleteTimeRange; STimeWindow deleteTimeRange;
SVgroupsInfo* pVgroupList; SVgroupsInfo* pVgroupList;
@ -494,7 +494,7 @@ typedef struct SQueryInserterNode {
uint64_t tableId; uint64_t tableId;
uint64_t stableId; uint64_t stableId;
int8_t tableType; // table type int8_t tableType; // table type
char tableFName[TSDB_TABLE_FNAME_LEN]; char tableName[TSDB_TABLE_NAME_LEN];
int32_t vgId; int32_t vgId;
SEpSet epSet; SEpSet epSet;
} SQueryInserterNode; } SQueryInserterNode;
@ -503,7 +503,7 @@ typedef struct SDataDeleterNode {
SDataSinkNode sink; SDataSinkNode sink;
uint64_t tableId; uint64_t tableId;
int8_t tableType; // table type int8_t tableType; // table type
char tableFName[TSDB_TABLE_FNAME_LEN]; char tableFName[TSDB_TABLE_NAME_LEN];
char tsColName[TSDB_COL_NAME_LEN]; char tsColName[TSDB_COL_NAME_LEN];
STimeWindow deleteTimeRange; STimeWindow deleteTimeRange;
SNode* pAffectedRows; SNode* pAffectedRows;

View File

@ -2823,35 +2823,35 @@ end:
// delete from db.tabl where .. -> delete from tabl where .. // delete from db.tabl where .. -> delete from tabl where ..
// delete from db .tabl where .. -> delete from tabl where .. // delete from db .tabl where .. -> delete from tabl where ..
static void getTbName(char *sql){ //static void getTbName(char *sql){
char *ch = sql; // char *ch = sql;
//
bool inBackQuote = false; // bool inBackQuote = false;
int8_t dotIndex = 0; // int8_t dotIndex = 0;
while(*ch != '\0'){ // while(*ch != '\0'){
if(!inBackQuote && *ch == '`'){ // if(!inBackQuote && *ch == '`'){
inBackQuote = true; // inBackQuote = true;
ch++; // ch++;
continue; // continue;
} // }
//
if(inBackQuote && *ch == '`'){ // if(inBackQuote && *ch == '`'){
inBackQuote = false; // inBackQuote = false;
ch++; // ch++;
//
continue; // continue;
} // }
//
if(!inBackQuote && *ch == '.'){ // if(!inBackQuote && *ch == '.'){
dotIndex ++; // dotIndex ++;
if(dotIndex == 2){ // if(dotIndex == 2){
memmove(sql, ch + 1, strlen(ch + 1) + 1); // memmove(sql, ch + 1, strlen(ch + 1) + 1);
break; // break;
} // }
} // }
ch++; // ch++;
} // }
} //}
static int32_t taosDeleteData(TAOS* taos, void* meta, int32_t metaLen) { static int32_t taosDeleteData(TAOS* taos, void* meta, int32_t metaLen) {
SDeleteRes req = {0}; SDeleteRes req = {0};
@ -2867,9 +2867,9 @@ static int32_t taosDeleteData(TAOS* taos, void* meta, int32_t metaLen) {
goto end; goto end;
} }
getTbName(req.tableFName); // getTbName(req.tableFName);
char sql[256] = {0}; char sql[256] = {0};
sprintf(sql, "delete from `%s` where `%s` >= %" PRId64" and `%s` <= %" PRId64, req.tableFName, "ts", req.skey, "ts", req.ekey); sprintf(sql, "delete from `%s` where `%s` >= %" PRId64" and `%s` <= %" PRId64, req.tableFName, req.tsColName, req.skey, req.tsColName, req.ekey);
printf("delete sql:%s\n", sql); printf("delete sql:%s\n", sql);
TAOS_RES* res = taos_query(taos, sql); TAOS_RES* res = taos_query(taos, sql);

View File

@ -5682,6 +5682,7 @@ int32_t tEncodeDeleteRes(SEncoder *pCoder, const SDeleteRes *pRes) {
if (tEncodeI64v(pCoder, pRes->affectedRows) < 0) return -1; if (tEncodeI64v(pCoder, pRes->affectedRows) < 0) return -1;
if (tEncodeCStr(pCoder, pRes->tableFName) < 0) return -1; if (tEncodeCStr(pCoder, pRes->tableFName) < 0) return -1;
if (tEncodeCStr(pCoder, pRes->tsColName) < 0) return -1;
return 0; return 0;
} }
@ -5700,6 +5701,7 @@ int32_t tDecodeDeleteRes(SDecoder *pCoder, SDeleteRes *pRes) {
if (tDecodeI64v(pCoder, &pRes->affectedRows) < 0) return -1; if (tDecodeI64v(pCoder, &pRes->affectedRows) < 0) return -1;
if (tDecodeCStrTo(pCoder, pRes->tableFName) < 0) return -1; if (tDecodeCStrTo(pCoder, pRes->tableFName) < 0) return -1;
if (tDecodeCStrTo(pCoder, pRes->tsColName) < 0) return -1;
return 0; return 0;
} }
int32_t tEncodeSMqDataRsp(SEncoder *pEncoder, const SMqDataRsp *pRsp) { int32_t tEncodeSMqDataRsp(SEncoder *pEncoder, const SMqDataRsp *pRsp) {

View File

@ -90,7 +90,8 @@ static void toDataCacheEntry(SDataDeleterHandle* pHandle, const SInputData* pInp
pRes->uidList = pHandle->pParam->pUidList; pRes->uidList = pHandle->pParam->pUidList;
pRes->skey = pHandle->pDeleter->deleteTimeRange.skey; pRes->skey = pHandle->pDeleter->deleteTimeRange.skey;
pRes->ekey = pHandle->pDeleter->deleteTimeRange.ekey; pRes->ekey = pHandle->pDeleter->deleteTimeRange.ekey;
strcpy(pRes->tableFName, pHandle->pDeleter->tableFName); strcpy(pRes->tableName, pHandle->pDeleter->tableFName);
strcpy(pRes->tsColName, pHandle->pDeleter->tsColName);
pRes->affectedRows = *(int64_t*)pColRes->pData; pRes->affectedRows = *(int64_t*)pColRes->pData;
pBuf->useSize += pEntry->dataLen; pBuf->useSize += pEntry->dataLen;

View File

@ -401,7 +401,7 @@ static int32_t logicVnodeModifCopy(const SVnodeModifyLogicNode* pSrc, SVnodeModi
COPY_SCALAR_FIELD(tableId); COPY_SCALAR_FIELD(tableId);
COPY_SCALAR_FIELD(stableId); COPY_SCALAR_FIELD(stableId);
COPY_SCALAR_FIELD(tableType); COPY_SCALAR_FIELD(tableType);
COPY_CHAR_ARRAY_FIELD(tableFName); COPY_CHAR_ARRAY_FIELD(tableName);
COPY_CHAR_ARRAY_FIELD(tsColName); COPY_CHAR_ARRAY_FIELD(tsColName);
COPY_OBJECT_FIELD(deleteTimeRange, sizeof(STimeWindow)); COPY_OBJECT_FIELD(deleteTimeRange, sizeof(STimeWindow));
CLONE_OBJECT_FIELD(pVgroupList, vgroupsInfoClone); CLONE_OBJECT_FIELD(pVgroupList, vgroupsInfoClone);

View File

@ -2326,7 +2326,7 @@ static int32_t physiQueryInsertNodeToJson(const void* pObj, SJson* pJson) {
code = tjsonAddIntegerToObject(pJson, jkQueryInsertPhysiPlanTableType, pNode->tableType); code = tjsonAddIntegerToObject(pJson, jkQueryInsertPhysiPlanTableType, pNode->tableType);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddStringToObject(pJson, jkQueryInsertPhysiPlanTableFName, pNode->tableFName); code = tjsonAddStringToObject(pJson, jkQueryInsertPhysiPlanTableFName, pNode->tableName);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkQueryInsertPhysiPlanVgId, pNode->vgId); code = tjsonAddIntegerToObject(pJson, jkQueryInsertPhysiPlanVgId, pNode->vgId);
@ -2355,7 +2355,7 @@ static int32_t jsonToPhysiQueryInsertNode(const SJson* pJson, void* pObj) {
code = tjsonGetTinyIntValue(pJson, jkQueryInsertPhysiPlanTableType, &pNode->tableType); code = tjsonGetTinyIntValue(pJson, jkQueryInsertPhysiPlanTableType, &pNode->tableType);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetStringValue(pJson, jkQueryInsertPhysiPlanTableFName, pNode->tableFName); code = tjsonGetStringValue(pJson, jkQueryInsertPhysiPlanTableFName, pNode->tableName);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetIntValue(pJson, jkQueryInsertPhysiPlanVgId, &pNode->vgId); code = tjsonGetIntValue(pJson, jkQueryInsertPhysiPlanVgId, &pNode->vgId);

View File

@ -1292,8 +1292,7 @@ static int32_t createVnodeModifLogicNodeByDelete(SLogicPlanContext* pCxt, SDelet
pModify->modifyType = MODIFY_TABLE_TYPE_DELETE; pModify->modifyType = MODIFY_TABLE_TYPE_DELETE;
pModify->tableId = pRealTable->pMeta->uid; pModify->tableId = pRealTable->pMeta->uid;
pModify->tableType = pRealTable->pMeta->tableType; pModify->tableType = pRealTable->pMeta->tableType;
snprintf(pModify->tableFName, sizeof(pModify->tableFName), "%d.%s.%s", pCxt->pPlanCxt->acctId, snprintf(pModify->tableName, sizeof(pModify->tableName), "%s", pRealTable->table.tableName);
pRealTable->table.dbName, pRealTable->table.tableName);
strcpy(pModify->tsColName, pRealTable->pMeta->schema->name); strcpy(pModify->tsColName, pRealTable->pMeta->schema->name);
pModify->deleteTimeRange = pDelete->timeRange; pModify->deleteTimeRange = pDelete->timeRange;
pModify->pAffectedRows = nodesCloneNode(pDelete->pCountFunc); pModify->pAffectedRows = nodesCloneNode(pDelete->pCountFunc);
@ -1343,8 +1342,7 @@ static int32_t createVnodeModifLogicNodeByInsert(SLogicPlanContext* pCxt, SInser
pModify->tableId = pRealTable->pMeta->uid; pModify->tableId = pRealTable->pMeta->uid;
pModify->stableId = pRealTable->pMeta->suid; pModify->stableId = pRealTable->pMeta->suid;
pModify->tableType = pRealTable->pMeta->tableType; pModify->tableType = pRealTable->pMeta->tableType;
snprintf(pModify->tableFName, sizeof(pModify->tableFName), "%d.%s.%s", pCxt->pPlanCxt->acctId, snprintf(pModify->tableName, sizeof(pModify->tableName), "%s", pRealTable->table.tableName);
pRealTable->table.dbName, pRealTable->table.tableName);
TSWAP(pModify->pVgroupList, pRealTable->pVgroupList); TSWAP(pModify->pVgroupList, pRealTable->pVgroupList);
pModify->pInsertCols = nodesCloneList(pInsert->pCols); pModify->pInsertCols = nodesCloneList(pInsert->pCols);
if (NULL == pModify->pInsertCols) { if (NULL == pModify->pInsertCols) {

View File

@ -1586,7 +1586,7 @@ static int32_t createQueryInserter(SPhysiPlanContext* pCxt, SVnodeModifyLogicNod
pInserter->tableId = pModify->tableId; pInserter->tableId = pModify->tableId;
pInserter->stableId = pModify->stableId; pInserter->stableId = pModify->stableId;
pInserter->tableType = pModify->tableType; pInserter->tableType = pModify->tableType;
strcpy(pInserter->tableFName, pModify->tableFName); strcpy(pInserter->tableName, pModify->tableName);
pInserter->vgId = pModify->pVgroupList->vgroups[0].vgId; pInserter->vgId = pModify->pVgroupList->vgroups[0].vgId;
pInserter->epSet = pModify->pVgroupList->vgroups[0].epSet; pInserter->epSet = pModify->pVgroupList->vgroups[0].epSet;
vgroupInfoToNodeAddr(pModify->pVgroupList->vgroups, &pSubplan->execNode); vgroupInfoToNodeAddr(pModify->pVgroupList->vgroups, &pSubplan->execNode);
@ -1636,7 +1636,7 @@ static int32_t createDataDeleter(SPhysiPlanContext* pCxt, SVnodeModifyLogicNode*
pDeleter->tableId = pModify->tableId; pDeleter->tableId = pModify->tableId;
pDeleter->tableType = pModify->tableType; pDeleter->tableType = pModify->tableType;
strcpy(pDeleter->tableFName, pModify->tableFName); strcpy(pDeleter->tableFName, pModify->tableName);
strcpy(pDeleter->tsColName, pModify->tsColName); strcpy(pDeleter->tsColName, pModify->tsColName);
pDeleter->deleteTimeRange = pModify->deleteTimeRange; pDeleter->deleteTimeRange = pModify->deleteTimeRange;

View File

@ -283,7 +283,8 @@ int32_t qwGetDeleteResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SDeleteRes *pRes
pRes->skey = pDelRes->skey; pRes->skey = pDelRes->skey;
pRes->ekey = pDelRes->ekey; pRes->ekey = pDelRes->ekey;
pRes->affectedRows = pDelRes->affectedRows; pRes->affectedRows = pDelRes->affectedRows;
strcpy(pRes->tableFName, pDelRes->tableFName); strcpy(pRes->tableFName, pDelRes->tableName);
strcpy(pRes->tsColName, pDelRes->tsColName);
taosMemoryFree(output.pData); taosMemoryFree(output.pData);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;