Merge pull request #14635 from taosdata/feature/insertselect

feat: support insert from query res
This commit is contained in:
dapan1121 2022-07-08 16:59:21 +08:00 committed by GitHub
commit 50a6ef7f39
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
49 changed files with 3777 additions and 3473 deletions

View File

@ -57,8 +57,8 @@ extern int32_t tMsgDict[];
#define TMSG_SEG_SEQ(TYPE) ((TYPE)&0xff) #define TMSG_SEG_SEQ(TYPE) ((TYPE)&0xff)
#define TMSG_INFO(TYPE) \ #define TMSG_INFO(TYPE) \
((TYPE) >= 0 && \ ((TYPE) >= 0 && \
((TYPE) < TDMT_DND_MAX_MSG | (TYPE) < TDMT_MND_MAX_MSG | (TYPE) < TDMT_VND_MAX_MSG | (TYPE) < TDMT_SCH_MAX_MSG | \ ((TYPE) < TDMT_DND_MAX_MSG || (TYPE) < TDMT_MND_MAX_MSG || (TYPE) < TDMT_VND_MAX_MSG || (TYPE) < TDMT_SCH_MAX_MSG || \
(TYPE) < TDMT_STREAM_MAX_MSG | (TYPE) < TDMT_MON_MAX_MSG | (TYPE) < TDMT_SYNC_MAX_MSG)) \ (TYPE) < TDMT_STREAM_MAX_MSG || (TYPE) < TDMT_MON_MAX_MSG || (TYPE) < TDMT_SYNC_MAX_MSG)) \
? tMsgInfo[tMsgDict[TMSG_SEG_CODE(TYPE)] + TMSG_SEG_SEQ(TYPE)] \ ? tMsgInfo[tMsgDict[TMSG_SEG_CODE(TYPE)] + TMSG_SEG_SEQ(TYPE)] \
: 0 : 0
#define TMSG_INDEX(TYPE) (tMsgDict[TMSG_SEG_CODE(TYPE)] + TMSG_SEG_SEQ(TYPE)) #define TMSG_INDEX(TYPE) (tMsgDict[TMSG_SEG_CODE(TYPE)] + TMSG_SEG_SEQ(TYPE))
@ -665,6 +665,7 @@ typedef struct {
char tbFName[TSDB_TABLE_FNAME_LEN]; char tbFName[TSDB_TABLE_FNAME_LEN];
int32_t sversion; int32_t sversion;
int32_t tversion; int32_t tversion;
int64_t affectedRows;
} SQueryTableRsp; } SQueryTableRsp;
int32_t tSerializeSQueryTableRsp(void* buf, int32_t bufLen, SQueryTableRsp* pRsp); int32_t tSerializeSQueryTableRsp(void* buf, int32_t bufLen, SQueryTableRsp* pRsp);
@ -1510,6 +1511,7 @@ typedef struct SSubQueryMsg {
int32_t execId; int32_t execId;
int8_t taskType; int8_t taskType;
int8_t explain; int8_t explain;
int8_t needFetch;
uint32_t sqlLen; // the query sql, uint32_t sqlLen; // the query sql,
uint32_t phyLen; uint32_t phyLen;
char msg[]; char msg[];

View File

@ -79,195 +79,196 @@
#define TK_EXISTS 61 #define TK_EXISTS 61
#define TK_BUFFER 62 #define TK_BUFFER 62
#define TK_CACHELAST 63 #define TK_CACHELAST 63
#define TK_COMP 64 #define TK_CACHELASTSIZE 64
#define TK_DURATION 65 #define TK_COMP 65
#define TK_NK_VARIABLE 66 #define TK_DURATION 66
#define TK_FSYNC 67 #define TK_NK_VARIABLE 67
#define TK_MAXROWS 68 #define TK_FSYNC 68
#define TK_MINROWS 69 #define TK_MAXROWS 69
#define TK_KEEP 70 #define TK_MINROWS 70
#define TK_PAGES 71 #define TK_KEEP 71
#define TK_PAGESIZE 72 #define TK_PAGES 72
#define TK_PRECISION 73 #define TK_PAGESIZE 73
#define TK_REPLICA 74 #define TK_PRECISION 74
#define TK_STRICT 75 #define TK_REPLICA 75
#define TK_WAL 76 #define TK_STRICT 76
#define TK_VGROUPS 77 #define TK_WAL 77
#define TK_SINGLE_STABLE 78 #define TK_VGROUPS 78
#define TK_RETENTIONS 79 #define TK_SINGLE_STABLE 79
#define TK_SCHEMALESS 80 #define TK_RETENTIONS 80
#define TK_NK_COLON 81 #define TK_SCHEMALESS 81
#define TK_TABLE 82 #define TK_NK_COLON 82
#define TK_NK_LP 83 #define TK_TABLE 83
#define TK_NK_RP 84 #define TK_NK_LP 84
#define TK_STABLE 85 #define TK_NK_RP 85
#define TK_ADD 86 #define TK_STABLE 86
#define TK_COLUMN 87 #define TK_ADD 87
#define TK_MODIFY 88 #define TK_COLUMN 88
#define TK_RENAME 89 #define TK_MODIFY 89
#define TK_TAG 90 #define TK_RENAME 90
#define TK_SET 91 #define TK_TAG 91
#define TK_NK_EQ 92 #define TK_SET 92
#define TK_USING 93 #define TK_NK_EQ 93
#define TK_TAGS 94 #define TK_USING 94
#define TK_COMMENT 95 #define TK_TAGS 95
#define TK_BOOL 96 #define TK_COMMENT 96
#define TK_TINYINT 97 #define TK_BOOL 97
#define TK_SMALLINT 98 #define TK_TINYINT 98
#define TK_INT 99 #define TK_SMALLINT 99
#define TK_INTEGER 100 #define TK_INT 100
#define TK_BIGINT 101 #define TK_INTEGER 101
#define TK_FLOAT 102 #define TK_BIGINT 102
#define TK_DOUBLE 103 #define TK_FLOAT 103
#define TK_BINARY 104 #define TK_DOUBLE 104
#define TK_TIMESTAMP 105 #define TK_BINARY 105
#define TK_NCHAR 106 #define TK_TIMESTAMP 106
#define TK_UNSIGNED 107 #define TK_NCHAR 107
#define TK_JSON 108 #define TK_UNSIGNED 108
#define TK_VARCHAR 109 #define TK_JSON 109
#define TK_MEDIUMBLOB 110 #define TK_VARCHAR 110
#define TK_BLOB 111 #define TK_MEDIUMBLOB 111
#define TK_VARBINARY 112 #define TK_BLOB 112
#define TK_DECIMAL 113 #define TK_VARBINARY 113
#define TK_MAX_DELAY 114 #define TK_DECIMAL 114
#define TK_WATERMARK 115 #define TK_MAX_DELAY 115
#define TK_ROLLUP 116 #define TK_WATERMARK 116
#define TK_TTL 117 #define TK_ROLLUP 117
#define TK_SMA 118 #define TK_TTL 118
#define TK_FIRST 119 #define TK_SMA 119
#define TK_LAST 120 #define TK_FIRST 120
#define TK_SHOW 121 #define TK_LAST 121
#define TK_DATABASES 122 #define TK_SHOW 122
#define TK_TABLES 123 #define TK_DATABASES 123
#define TK_STABLES 124 #define TK_TABLES 124
#define TK_MNODES 125 #define TK_STABLES 125
#define TK_MODULES 126 #define TK_MNODES 126
#define TK_QNODES 127 #define TK_MODULES 127
#define TK_FUNCTIONS 128 #define TK_QNODES 128
#define TK_INDEXES 129 #define TK_FUNCTIONS 129
#define TK_ACCOUNTS 130 #define TK_INDEXES 130
#define TK_APPS 131 #define TK_ACCOUNTS 131
#define TK_CONNECTIONS 132 #define TK_APPS 132
#define TK_LICENCE 133 #define TK_CONNECTIONS 133
#define TK_GRANTS 134 #define TK_LICENCE 134
#define TK_QUERIES 135 #define TK_GRANTS 135
#define TK_SCORES 136 #define TK_QUERIES 136
#define TK_TOPICS 137 #define TK_SCORES 137
#define TK_VARIABLES 138 #define TK_TOPICS 138
#define TK_BNODES 139 #define TK_VARIABLES 139
#define TK_SNODES 140 #define TK_BNODES 140
#define TK_CLUSTER 141 #define TK_SNODES 141
#define TK_TRANSACTIONS 142 #define TK_CLUSTER 142
#define TK_DISTRIBUTED 143 #define TK_TRANSACTIONS 143
#define TK_CONSUMERS 144 #define TK_DISTRIBUTED 144
#define TK_SUBSCRIPTIONS 145 #define TK_CONSUMERS 145
#define TK_LIKE 146 #define TK_SUBSCRIPTIONS 146
#define TK_INDEX 147 #define TK_LIKE 147
#define TK_FUNCTION 148 #define TK_INDEX 148
#define TK_INTERVAL 149 #define TK_FUNCTION 149
#define TK_TOPIC 150 #define TK_INTERVAL 150
#define TK_AS 151 #define TK_TOPIC 151
#define TK_WITH 152 #define TK_AS 152
#define TK_META 153 #define TK_WITH 153
#define TK_CONSUMER 154 #define TK_META 154
#define TK_GROUP 155 #define TK_CONSUMER 155
#define TK_DESC 156 #define TK_GROUP 156
#define TK_DESCRIBE 157 #define TK_DESC 157
#define TK_RESET 158 #define TK_DESCRIBE 158
#define TK_QUERY 159 #define TK_RESET 159
#define TK_CACHE 160 #define TK_QUERY 160
#define TK_EXPLAIN 161 #define TK_CACHE 161
#define TK_ANALYZE 162 #define TK_EXPLAIN 162
#define TK_VERBOSE 163 #define TK_ANALYZE 163
#define TK_NK_BOOL 164 #define TK_VERBOSE 164
#define TK_RATIO 165 #define TK_NK_BOOL 165
#define TK_NK_FLOAT 166 #define TK_RATIO 166
#define TK_COMPACT 167 #define TK_NK_FLOAT 167
#define TK_VNODES 168 #define TK_COMPACT 168
#define TK_IN 169 #define TK_VNODES 169
#define TK_OUTPUTTYPE 170 #define TK_IN 170
#define TK_AGGREGATE 171 #define TK_OUTPUTTYPE 171
#define TK_BUFSIZE 172 #define TK_AGGREGATE 172
#define TK_STREAM 173 #define TK_BUFSIZE 173
#define TK_INTO 174 #define TK_STREAM 174
#define TK_TRIGGER 175 #define TK_INTO 175
#define TK_AT_ONCE 176 #define TK_TRIGGER 176
#define TK_WINDOW_CLOSE 177 #define TK_AT_ONCE 177
#define TK_IGNORE 178 #define TK_WINDOW_CLOSE 178
#define TK_EXPIRED 179 #define TK_IGNORE 179
#define TK_KILL 180 #define TK_EXPIRED 180
#define TK_CONNECTION 181 #define TK_KILL 181
#define TK_TRANSACTION 182 #define TK_CONNECTION 182
#define TK_BALANCE 183 #define TK_TRANSACTION 183
#define TK_VGROUP 184 #define TK_BALANCE 184
#define TK_MERGE 185 #define TK_VGROUP 185
#define TK_REDISTRIBUTE 186 #define TK_MERGE 186
#define TK_SPLIT 187 #define TK_REDISTRIBUTE 187
#define TK_SYNCDB 188 #define TK_SPLIT 188
#define TK_DELETE 189 #define TK_SYNCDB 189
#define TK_INSERT 190 #define TK_DELETE 190
#define TK_NULL 191 #define TK_INSERT 191
#define TK_NK_QUESTION 192 #define TK_NULL 192
#define TK_NK_ARROW 193 #define TK_NK_QUESTION 193
#define TK_ROWTS 194 #define TK_NK_ARROW 194
#define TK_TBNAME 195 #define TK_ROWTS 195
#define TK_QSTARTTS 196 #define TK_TBNAME 196
#define TK_QENDTS 197 #define TK_QSTARTTS 197
#define TK_WSTARTTS 198 #define TK_QENDTS 198
#define TK_WENDTS 199 #define TK_WSTARTTS 199
#define TK_WDURATION 200 #define TK_WENDTS 200
#define TK_CAST 201 #define TK_WDURATION 201
#define TK_NOW 202 #define TK_CAST 202
#define TK_TODAY 203 #define TK_NOW 203
#define TK_TIMEZONE 204 #define TK_TODAY 204
#define TK_CLIENT_VERSION 205 #define TK_TIMEZONE 205
#define TK_SERVER_VERSION 206 #define TK_CLIENT_VERSION 206
#define TK_SERVER_STATUS 207 #define TK_SERVER_VERSION 207
#define TK_CURRENT_USER 208 #define TK_SERVER_STATUS 208
#define TK_COUNT 209 #define TK_CURRENT_USER 209
#define TK_LAST_ROW 210 #define TK_COUNT 210
#define TK_BETWEEN 211 #define TK_LAST_ROW 211
#define TK_IS 212 #define TK_BETWEEN 212
#define TK_NK_LT 213 #define TK_IS 213
#define TK_NK_GT 214 #define TK_NK_LT 214
#define TK_NK_LE 215 #define TK_NK_GT 215
#define TK_NK_GE 216 #define TK_NK_LE 216
#define TK_NK_NE 217 #define TK_NK_GE 217
#define TK_MATCH 218 #define TK_NK_NE 218
#define TK_NMATCH 219 #define TK_MATCH 219
#define TK_CONTAINS 220 #define TK_NMATCH 220
#define TK_JOIN 221 #define TK_CONTAINS 221
#define TK_INNER 222 #define TK_JOIN 222
#define TK_SELECT 223 #define TK_INNER 223
#define TK_DISTINCT 224 #define TK_SELECT 224
#define TK_WHERE 225 #define TK_DISTINCT 225
#define TK_PARTITION 226 #define TK_WHERE 226
#define TK_BY 227 #define TK_PARTITION 227
#define TK_SESSION 228 #define TK_BY 228
#define TK_STATE_WINDOW 229 #define TK_SESSION 229
#define TK_SLIDING 230 #define TK_STATE_WINDOW 230
#define TK_FILL 231 #define TK_SLIDING 231
#define TK_VALUE 232 #define TK_FILL 232
#define TK_NONE 233 #define TK_VALUE 233
#define TK_PREV 234 #define TK_NONE 234
#define TK_LINEAR 235 #define TK_PREV 235
#define TK_NEXT 236 #define TK_LINEAR 236
#define TK_HAVING 237 #define TK_NEXT 237
#define TK_RANGE 238 #define TK_HAVING 238
#define TK_EVERY 239 #define TK_RANGE 239
#define TK_ORDER 240 #define TK_EVERY 240
#define TK_SLIMIT 241 #define TK_ORDER 241
#define TK_SOFFSET 242 #define TK_SLIMIT 242
#define TK_LIMIT 243 #define TK_SOFFSET 243
#define TK_OFFSET 244 #define TK_LIMIT 244
#define TK_ASC 245 #define TK_OFFSET 245
#define TK_NULLS 246 #define TK_ASC 246
#define TK_ID 247 #define TK_NULLS 247
#define TK_NK_BITNOT 248 #define TK_ID 248
#define TK_VALUES 249 #define TK_NK_BITNOT 249
#define TK_IMPORT 250 #define TK_VALUES 250
#define TK_NK_SEMI 251 #define TK_IMPORT 251
#define TK_FILE 252 #define TK_NK_SEMI 252
#define TK_FILE 253
#define TK_NK_SPACE 300 #define TK_NK_SPACE 300
#define TK_NK_COMMENT 301 #define TK_NK_COMMENT 301

View File

@ -45,6 +45,10 @@ typedef struct SDeleterParam {
SArray* pUidList; SArray* pUidList;
} SDeleterParam; } SDeleterParam;
typedef struct SInserterParam {
SReadHandle* readHandle;
} SInserterParam;
typedef struct SDataSinkStat { typedef struct SDataSinkStat {
uint64_t cachedSize; uint64_t cachedSize;
} SDataSinkStat; } SDataSinkStat;
@ -96,7 +100,7 @@ void dsEndPut(DataSinkHandle handle, uint64_t useconds);
* @param handle * @param handle
* @param pLen data length * @param pLen data length
*/ */
void dsGetDataLength(DataSinkHandle handle, int32_t* pLen, bool* pQueryEnd); void dsGetDataLength(DataSinkHandle handle, int64_t* pLen, bool* pQueryEnd);
/** /**
* Get data, the caller needs to allocate data memory. * Get data, the caller needs to allocate data memory.

View File

@ -157,7 +157,7 @@ int64_t qGetQueriedTableUid(qTaskInfo_t tinfo);
*/ */
int32_t qGetQualifiedTableIdList(void* pTableList, const char* tagCond, int32_t tagCondLen, SArray* pTableIdList); int32_t qGetQualifiedTableIdList(void* pTableList, const char* tagCond, int32_t tagCondLen, SArray* pTableIdList);
void qProcessFetchRsp(void* parent, struct SRpcMsg* pMsg, struct SEpSet* pEpSet); void qProcessRspMsg(void* parent, struct SRpcMsg* pMsg, struct SEpSet* pEpSet);
int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, int32_t* resNum, SExplainExecInfo** pRes); int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, int32_t* resNum, SExplainExecInfo** pRes);

View File

@ -51,7 +51,8 @@ extern "C" {
typedef struct SDatabaseOptions { typedef struct SDatabaseOptions {
ENodeType type; ENodeType type;
int32_t buffer; int32_t buffer;
int8_t cachelast; int8_t cacheLast;
int32_t cacheLastSize;
int8_t compressionLevel; int8_t compressionLevel;
int32_t daysPerFile; int32_t daysPerFile;
SValueNode* pDaysPerFile; SValueNode* pDaysPerFile;

View File

@ -67,7 +67,7 @@ int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, in
int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts); int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts);
int32_t qWorkerProcessFetchRsp(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts); int32_t qWorkerProcessRspMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts);
int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts); int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts);

View File

@ -2119,3 +2119,4 @@ const char* blockDecode(SSDataBlock* pBlock, int32_t numOfCols, int32_t numOfRow
ASSERT(pStart - pData == dataLen); ASSERT(pStart - pData == dataLen);
return pStart; return pStart;
} }

View File

@ -89,7 +89,8 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
case TDMT_DND_SYSTABLE_RETRIEVE_RSP: case TDMT_DND_SYSTABLE_RETRIEVE_RSP:
case TDMT_SCH_FETCH_RSP: case TDMT_SCH_FETCH_RSP:
case TDMT_SCH_MERGE_FETCH_RSP: case TDMT_SCH_MERGE_FETCH_RSP:
qWorkerProcessFetchRsp(NULL, NULL, pRpc, 0); case TDMT_VND_SUBMIT_RSP:
qWorkerProcessRspMsg(NULL, NULL, pRpc, 0);
return; return;
case TDMT_MND_STATUS_RSP: case TDMT_MND_STATUS_RSP:
if (pEpSet != NULL) { if (pEpSet != NULL) {

View File

@ -89,9 +89,6 @@ int32_t qndProcessQueryMsg(SQnode *pQnode, int64_t ts, SRpcMsg *pMsg) {
case TDMT_SCH_MERGE_FETCH: case TDMT_SCH_MERGE_FETCH:
code = qWorkerProcessFetchMsg(pQnode, pQnode->pQuery, pMsg, ts); code = qWorkerProcessFetchMsg(pQnode, pQnode->pQuery, pMsg, ts);
break; break;
case TDMT_SCH_FETCH_RSP:
code = qWorkerProcessFetchRsp(pQnode, pQnode->pQuery, pMsg, ts);
break;
case TDMT_SCH_CANCEL_TASK: case TDMT_SCH_CANCEL_TASK:
code = qWorkerProcessCancelMsg(pQnode, pQnode->pQuery, pMsg, ts); code = qWorkerProcessCancelMsg(pQnode, pQnode->pQuery, pMsg, ts);
break; break;

View File

@ -143,6 +143,7 @@ int32_t tsdbLastRowReaderOpen(void *pVnode, int32_t type, SArray *pTableIdList,
void **pReader); void **pReader);
int32_t tsdbRetrieveLastRow(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds); int32_t tsdbRetrieveLastRow(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds);
int32_t tsdbLastrowReaderClose(void *pReader); int32_t tsdbLastrowReaderClose(void *pReader);
int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int64_t* suid);
// tq // tq

View File

@ -3096,3 +3096,39 @@ int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) {
return rows; return rows;
} }
int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int64_t *suid) {
int32_t sversion = 1;
SMetaReader mr = {0};
metaReaderInit(&mr, pVnode->pMeta, 0);
int32_t code = metaGetTableEntryByUid(&mr, uid);
if (code != TSDB_CODE_SUCCESS) {
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
metaReaderClear(&mr);
return terrno;
}
*suid = 0;
if (mr.me.type == TSDB_CHILD_TABLE) {
*suid = mr.me.ctbEntry.suid;
code = metaGetTableEntryByUid(&mr, *suid);
if (code != TSDB_CODE_SUCCESS) {
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
metaReaderClear(&mr);
return terrno;
}
sversion = mr.me.stbEntry.schemaRow.version;
} else {
ASSERT(mr.me.type == TSDB_NORMAL_TABLE);
sversion = mr.me.ntbEntry.schemaRow.version;
}
metaReaderClear(&mr);
*pSchema = metaGetTbTSchema(pVnode->pMeta, uid, sversion);
return TSDB_CODE_SUCCESS;
}

View File

@ -296,7 +296,7 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
case TDMT_SCH_MERGE_FETCH: case TDMT_SCH_MERGE_FETCH:
return qWorkerProcessFetchMsg(pVnode, pVnode->pQuery, pMsg, 0); return qWorkerProcessFetchMsg(pVnode, pVnode->pQuery, pMsg, 0);
case TDMT_SCH_FETCH_RSP: case TDMT_SCH_FETCH_RSP:
return qWorkerProcessFetchRsp(pVnode, pVnode->pQuery, pMsg, 0); return qWorkerProcessRspMsg(pVnode, pVnode->pQuery, pMsg, 0);
case TDMT_SCH_CANCEL_TASK: case TDMT_SCH_CANCEL_TASK:
return qWorkerProcessCancelMsg(pVnode, pVnode->pQuery, pMsg, 0); return qWorkerProcessCancelMsg(pVnode, pVnode->pQuery, pMsg, 0);
case TDMT_SCH_DROP_TASK: case TDMT_SCH_DROP_TASK:

View File

@ -34,7 +34,7 @@ typedef struct SDataSinkManager {
typedef int32_t (*FPutDataBlock)(struct SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue); typedef int32_t (*FPutDataBlock)(struct SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue);
typedef void (*FEndPut)(struct SDataSinkHandle* pHandle, uint64_t useconds); typedef void (*FEndPut)(struct SDataSinkHandle* pHandle, uint64_t useconds);
typedef void (*FGetDataLength)(struct SDataSinkHandle* pHandle, int32_t* pLen, bool* pQueryEnd); typedef void (*FGetDataLength)(struct SDataSinkHandle* pHandle, int64_t* pLen, bool* pQueryEnd);
typedef int32_t (*FGetDataBlock)(struct SDataSinkHandle* pHandle, SOutputData* pOutput); typedef int32_t (*FGetDataBlock)(struct SDataSinkHandle* pHandle, SOutputData* pOutput);
typedef int32_t (*FDestroyDataSinker)(struct SDataSinkHandle* pHandle); typedef int32_t (*FDestroyDataSinker)(struct SDataSinkHandle* pHandle);
typedef int32_t (*FGetCacheSize)(struct SDataSinkHandle* pHandle, uint64_t* size); typedef int32_t (*FGetCacheSize)(struct SDataSinkHandle* pHandle, uint64_t* size);
@ -50,6 +50,7 @@ typedef struct SDataSinkHandle {
int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle); int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle);
int32_t createDataDeleter(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle, void *pParam); int32_t createDataDeleter(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle, void *pParam);
int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle, void *pParam);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -885,7 +885,7 @@ int32_t decodeOperator(SOperatorInfo* ops, const char* data, int32_t length);
void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status); void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status);
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId, int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
const char* sql, EOPTR_EXEC_MODEL model); const char* sql, EOPTR_EXEC_MODEL model);
int32_t createDataSinkParam(SDataSinkNode *pNode, void **pParam, qTaskInfo_t* pTaskInfo); int32_t createDataSinkParam(SDataSinkNode *pNode, void **pParam, qTaskInfo_t* pTaskInfo, SReadHandle* readHandle);
int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo** pRes, int32_t* capacity, int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo** pRes, int32_t* capacity,
int32_t* resNum); int32_t* resNum);

View File

@ -154,7 +154,7 @@ static void endPut(struct SDataSinkHandle* pHandle, uint64_t useconds) {
taosThreadMutexUnlock(&pDeleter->mutex); taosThreadMutexUnlock(&pDeleter->mutex);
} }
static void getDataLength(SDataSinkHandle* pHandle, int32_t* pLen, bool* pQueryEnd) { static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, bool* pQueryEnd) {
SDataDeleterHandle* pDeleter = (SDataDeleterHandle*)pHandle; SDataDeleterHandle* pDeleter = (SDataDeleterHandle*)pHandle;
if (taosQueueEmpty(pDeleter->pDataBlocks)) { if (taosQueueEmpty(pDeleter->pDataBlocks)) {
*pQueryEnd = pDeleter->queryEnd; *pQueryEnd = pDeleter->queryEnd;
@ -168,7 +168,7 @@ static void getDataLength(SDataSinkHandle* pHandle, int32_t* pLen, bool* pQueryE
taosFreeQitem(pBuf); taosFreeQitem(pBuf);
*pLen = ((SDataCacheEntry*)(pDeleter->nextOutput.pData))->dataLen; *pLen = ((SDataCacheEntry*)(pDeleter->nextOutput.pData))->dataLen;
*pQueryEnd = pDeleter->queryEnd; *pQueryEnd = pDeleter->queryEnd;
qDebug("got data len %d, row num %d in sink", *pLen, ((SDataCacheEntry*)(pDeleter->nextOutput.pData))->numOfRows); qDebug("got data len %" PRId64 ", row num %d in sink", *pLen, ((SDataCacheEntry*)(pDeleter->nextOutput.pData))->numOfRows);
} }
static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) { static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) {

View File

@ -156,7 +156,7 @@ static void endPut(struct SDataSinkHandle* pHandle, uint64_t useconds) {
taosThreadMutexUnlock(&pDispatcher->mutex); taosThreadMutexUnlock(&pDispatcher->mutex);
} }
static void getDataLength(SDataSinkHandle* pHandle, int32_t* pLen, bool* pQueryEnd) { static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, bool* pQueryEnd) {
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle; SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
if (taosQueueEmpty(pDispatcher->pDataBlocks)) { if (taosQueueEmpty(pDispatcher->pDataBlocks)) {
*pQueryEnd = pDispatcher->queryEnd; *pQueryEnd = pDispatcher->queryEnd;
@ -170,7 +170,7 @@ static void getDataLength(SDataSinkHandle* pHandle, int32_t* pLen, bool* pQueryE
taosFreeQitem(pBuf); taosFreeQitem(pBuf);
*pLen = ((SDataCacheEntry*)(pDispatcher->nextOutput.pData))->dataLen; *pLen = ((SDataCacheEntry*)(pDispatcher->nextOutput.pData))->dataLen;
*pQueryEnd = pDispatcher->queryEnd; *pQueryEnd = pDispatcher->queryEnd;
qDebug("got data len %d, row num %d in sink", *pLen, ((SDataCacheEntry*)(pDispatcher->nextOutput.pData))->numOfRows); qDebug("got data len %" PRId64 ", row num %d in sink", *pLen, ((SDataCacheEntry*)(pDispatcher->nextOutput.pData))->numOfRows);
} }
static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) { static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) {

View File

@ -24,195 +24,266 @@
extern SDataSinkStat gDataSinkStat; extern SDataSinkStat gDataSinkStat;
typedef struct SDataInserterBuf { typedef struct SSubmitRes {
int32_t useSize; int64_t affectedRows;
int32_t allocSize; int32_t code;
char* pData; SSubmitRsp *pRsp;
} SDataInserterBuf; } SSubmitRes;
typedef struct SDataCacheEntry {
int32_t dataLen;
int32_t numOfRows;
int32_t numOfCols;
int8_t compressed;
char data[];
} SDataCacheEntry;
typedef struct SDataInserterHandle { typedef struct SDataInserterHandle {
SDataSinkHandle sink; SDataSinkHandle sink;
SDataSinkManager* pManager; SDataSinkManager* pManager;
SDataBlockDescNode* pSchema; STSchema* pSchema;
SDataDeleterNode* pDeleter; SQueryInserterNode* pNode;
SDeleterParam* pParam; SSubmitRes submitRes;
STaosQueue* pDataBlocks; SInserterParam* pParam;
SDataInserterBuf nextOutput; SArray* pDataBlocks;
SHashObj* pCols;
int32_t status; int32_t status;
bool queryEnd; bool queryEnd;
uint64_t useconds; uint64_t useconds;
uint64_t cachedSize; uint64_t cachedSize;
TdThreadMutex mutex; TdThreadMutex mutex;
tsem_t ready;
} SDataInserterHandle; } SDataInserterHandle;
static bool needCompress(const SSDataBlock* pData, int32_t numOfCols) { typedef struct SSubmitRspParam {
if (tsCompressColData < 0 || 0 == pData->info.rows) { SDataInserterHandle* pInserter;
return false; } SSubmitRspParam;
}
for (int32_t col = 0; col < numOfCols; ++col) { int32_t inserterCallback(void* param, SDataBuf* pMsg, int32_t code) {
SColumnInfoData* pColRes = taosArrayGet(pData->pDataBlock, col); SSubmitRspParam* pParam = (SSubmitRspParam*)param;
int32_t colSize = pColRes->info.bytes * pData->info.rows; SDataInserterHandle* pInserter = pParam->pInserter;
if (NEEDTO_COMPRESS_QUERY(colSize)) {
return true;
}
}
return false; pInserter->submitRes.code = code;
}
static void toDataCacheEntry(SDataInserterHandle* pHandle, const SInputData* pInput, SDataInserterBuf* pBuf) {
int32_t numOfCols = LIST_LENGTH(pHandle->pSchema->pSlots);
SDataCacheEntry* pEntry = (SDataCacheEntry*)pBuf->pData;
pEntry->compressed = 0;
pEntry->numOfRows = pInput->pData->info.rows;
pEntry->numOfCols = taosArrayGetSize(pInput->pData->pDataBlock);
pEntry->dataLen = sizeof(SDeleterRes);
ASSERT(1 == pEntry->numOfRows);
ASSERT(1 == pEntry->numOfCols);
pBuf->useSize = sizeof(SDataCacheEntry);
SColumnInfoData* pColRes = (SColumnInfoData*)taosArrayGet(pInput->pData->pDataBlock, 0);
SDeleterRes* pRes = (SDeleterRes*)pEntry->data;
pRes->suid = pHandle->pParam->suid;
pRes->uidList = pHandle->pParam->pUidList;
pRes->skey = pHandle->pDeleter->deleteTimeRange.skey;
pRes->ekey = pHandle->pDeleter->deleteTimeRange.ekey;
pRes->affectedRows = *(int64_t*)pColRes->pData;
pBuf->useSize += pEntry->dataLen;
atomic_add_fetch_64(&pHandle->cachedSize, pEntry->dataLen); if (code == TSDB_CODE_SUCCESS) {
atomic_add_fetch_64(&gDataSinkStat.cachedSize, pEntry->dataLen); pInserter->submitRes.pRsp = taosMemoryCalloc(1, sizeof(SSubmitRsp));
} SDecoder coder = {0};
tDecoderInit(&coder, pMsg->pData, pMsg->len);
code = tDecodeSSubmitRsp(&coder, pInserter->submitRes.pRsp);
if (code) {
tFreeSSubmitRsp(pInserter->submitRes.pRsp);
pInserter->submitRes.code = code;
goto _return;
}
if (pInserter->submitRes.pRsp->nBlocks > 0) {
for (int32_t i = 0; i < pInserter->submitRes.pRsp->nBlocks; ++i) {
SSubmitBlkRsp *blk = pInserter->submitRes.pRsp->pBlocks + i;
if (TSDB_CODE_SUCCESS != blk->code) {
code = blk->code;
tFreeSSubmitRsp(pInserter->submitRes.pRsp);
pInserter->submitRes.code = code;
goto _return;
}
}
}
pInserter->submitRes.affectedRows += pInserter->submitRes.pRsp->affectedRows;
qDebug("submit rsp received, affectedRows:%d, total:%d", pInserter->submitRes.pRsp->affectedRows, pInserter->submitRes.affectedRows);
static bool allocBuf(SDataInserterHandle* pDeleter, const SInputData* pInput, SDataInserterBuf* pBuf) { tFreeSSubmitRsp(pInserter->submitRes.pRsp);
uint32_t capacity = pDeleter->pManager->cfg.maxDataBlockNumPerQuery;
if (taosQueueItemSize(pDeleter->pDataBlocks) > capacity) {
qError("SinkNode queue is full, no capacity, max:%d, current:%d, no capacity", capacity,
taosQueueItemSize(pDeleter->pDataBlocks));
return false;
} }
pBuf->allocSize = sizeof(SDataCacheEntry) + sizeof(SDeleterRes); _return:
pBuf->pData = taosMemoryMalloc(pBuf->allocSize); tsem_post(&pInserter->ready);
if (pBuf->pData == NULL) {
qError("SinkNode failed to malloc memory, size:%d, code:%d", pBuf->allocSize, TAOS_SYSTEM_ERROR(errno)); taosMemoryFree(param);
return TSDB_CODE_SUCCESS;
}
static int32_t sendSubmitRequest(SDataInserterHandle* pInserter, SSubmitReq* pMsg, void* pTransporter, SEpSet* pEpset) {
// send the fetch remote task result reques
SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (NULL == pMsgSendInfo) {
taosMemoryFreeClear(pMsg);
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
return terrno;
} }
return NULL != pBuf->pData; SSubmitRspParam* pParam = taosMemoryCalloc(1, sizeof(SSubmitRspParam));
pParam->pInserter = pInserter;
pMsgSendInfo->param = pParam;
pMsgSendInfo->msgInfo.pData = pMsg;
pMsgSendInfo->msgInfo.len = ntohl(pMsg->length);
pMsgSendInfo->msgType = TDMT_VND_SUBMIT;
pMsgSendInfo->fp = inserterCallback;
int64_t transporterId = 0;
return asyncSendMsgToServer(pTransporter, pEpset, &transporterId, pMsgSendInfo);
} }
static int32_t updateStatus(SDataInserterHandle* pDeleter) {
taosThreadMutexLock(&pDeleter->mutex); int32_t dataBlockToSubmit(SDataInserterHandle* pInserter, SSubmitReq** pReq) {
int32_t blockNums = taosQueueItemSize(pDeleter->pDataBlocks); const SArray* pBlocks = pInserter->pDataBlocks;
int32_t status = const STSchema* pTSchema = pInserter->pSchema;
(0 == blockNums ? DS_BUF_EMPTY int64_t uid = pInserter->pNode->tableId;
: (blockNums < pDeleter->pManager->cfg.maxDataBlockNumPerQuery ? DS_BUF_LOW : DS_BUF_FULL)); int64_t suid = pInserter->pNode->stableId;
pDeleter->status = status; int32_t vgId = pInserter->pNode->vgId;
taosThreadMutexUnlock(&pDeleter->mutex); bool fullCol = (pInserter->pNode->pCols->length == pTSchema->numOfCols);
return status;
SSubmitReq* ret = NULL;
int32_t sz = taosArrayGetSize(pBlocks);
// cal size
int32_t cap = sizeof(SSubmitReq);
for (int32_t i = 0; i < sz; i++) {
SSDataBlock* pDataBlock = taosArrayGetP(pBlocks, i);
int32_t rows = pDataBlock->info.rows;
// TODO min
int32_t rowSize = pDataBlock->info.rowSize;
int32_t maxLen = TD_ROW_MAX_BYTES_FROM_SCHEMA(pTSchema);
cap += sizeof(SSubmitBlk) + rows * maxLen;
}
// assign data
// TODO
ret = taosMemoryCalloc(1, cap);
ret->header.vgId = htonl(vgId);
ret->version = htonl(pTSchema->version);
ret->length = sizeof(SSubmitReq);
ret->numOfBlocks = htonl(sz);
SSubmitBlk* blkHead = POINTER_SHIFT(ret, sizeof(SSubmitReq));
for (int32_t i = 0; i < sz; i++) {
SSDataBlock* pDataBlock = taosArrayGetP(pBlocks, i);
blkHead->sversion = htonl(pTSchema->version);
// TODO
blkHead->suid = htobe64(suid);
blkHead->uid = htobe64(uid);
blkHead->schemaLen = htonl(0);
int32_t rows = 0;
int32_t dataLen = 0;
STSRow* rowData = POINTER_SHIFT(blkHead, sizeof(SSubmitBlk));
int64_t lastTs = TSKEY_MIN;
bool ignoreRow = false;
for (int32_t j = 0; j < pDataBlock->info.rows; j++) {
SRowBuilder rb = {0};
tdSRowInit(&rb, pTSchema->version);
tdSRowSetTpInfo(&rb, pTSchema->numOfCols, pTSchema->flen);
tdSRowResetBuf(&rb, rowData);
ignoreRow = false;
for (int32_t k = 0; k < pTSchema->numOfCols; k++) {
const STColumn* pColumn = &pTSchema->columns[k];
SColumnInfoData* pColData = NULL;
int16_t colIdx = k;
if (!fullCol) {
int16_t *slotId = taosHashGet(pInserter->pCols, &pColumn->colId, sizeof(pColumn->colId));
if (NULL == slotId) {
continue;
}
colIdx = *slotId;
}
pColData = taosArrayGet(pDataBlock->pDataBlock, colIdx);
if (pColData->info.type != pColumn->type) {
qError("col type mis-match, schema type:%d, type in block:%d", pColumn->type, pColData->info.type);
terrno = TSDB_CODE_APP_ERROR;
return TSDB_CODE_APP_ERROR;
}
if (colDataIsNull_s(pColData, j)) {
if (0 == k && TSDB_DATA_TYPE_TIMESTAMP == pColumn->type) {
ignoreRow = true;
break;
}
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, pColumn->offset, k);
} else {
void* data = colDataGetData(pColData, j);
if (0 == k && TSDB_DATA_TYPE_TIMESTAMP == pColumn->type) {
if (*(int64_t*)data == lastTs) {
ignoreRow = true;
break;
} else {
lastTs = *(int64_t*)data;
}
}
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, data, true, pColumn->offset, k);
}
}
if (ignoreRow) {
continue;
}
rows++;
int32_t rowLen = TD_ROW_LEN(rowData);
rowData = POINTER_SHIFT(rowData, rowLen);
dataLen += rowLen;
}
blkHead->dataLen = htonl(dataLen);
blkHead->numOfRows = htons(rows);
ret->length += sizeof(SSubmitBlk) + dataLen;
blkHead = POINTER_SHIFT(blkHead, sizeof(SSubmitBlk) + dataLen);
}
ret->length = htonl(ret->length);
*pReq = ret;
return TSDB_CODE_SUCCESS;
} }
static int32_t getStatus(SDataInserterHandle* pDeleter) {
taosThreadMutexLock(&pDeleter->mutex);
int32_t status = pDeleter->status;
taosThreadMutexUnlock(&pDeleter->mutex);
return status;
}
static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) { static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) {
SDataInserterHandle* pDeleter = (SDataInserterHandle*)pHandle; SDataInserterHandle* pInserter = (SDataInserterHandle*)pHandle;
SDataInserterBuf* pBuf = taosAllocateQitem(sizeof(SDataInserterBuf), DEF_QITEM); taosArrayPush(pInserter->pDataBlocks, &pInput->pData);
if (NULL == pBuf || !allocBuf(pDeleter, pInput, pBuf)) { SSubmitReq* pMsg = NULL;
return TSDB_CODE_QRY_OUT_OF_MEMORY; int32_t code = dataBlockToSubmit(pInserter, &pMsg);
if (code) {
return code;
} }
toDataCacheEntry(pDeleter, pInput, pBuf);
taosWriteQitem(pDeleter->pDataBlocks, pBuf); code = sendSubmitRequest(pInserter, pMsg, pInserter->pParam->readHandle->pMsgCb->clientRpc, &pInserter->pNode->epSet);
*pContinue = (DS_BUF_LOW == updateStatus(pDeleter) ? true : false); if (code) {
return code;
}
tsem_wait(&pInserter->ready);
if (pInserter->submitRes.code) {
return pInserter->submitRes.code;
}
*pContinue = true;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static void endPut(struct SDataSinkHandle* pHandle, uint64_t useconds) { static void endPut(struct SDataSinkHandle* pHandle, uint64_t useconds) {
SDataInserterHandle* pDeleter = (SDataInserterHandle*)pHandle; SDataInserterHandle* pInserter = (SDataInserterHandle*)pHandle;
taosThreadMutexLock(&pDeleter->mutex); taosThreadMutexLock(&pInserter->mutex);
pDeleter->queryEnd = true; pInserter->queryEnd = true;
pDeleter->useconds = useconds; pInserter->useconds = useconds;
taosThreadMutexUnlock(&pDeleter->mutex); taosThreadMutexUnlock(&pInserter->mutex);
} }
static void getDataLength(SDataSinkHandle* pHandle, int32_t* pLen, bool* pQueryEnd) { static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, bool* pQueryEnd) {
SDataInserterHandle* pDeleter = (SDataInserterHandle*)pHandle; SDataInserterHandle* pDispatcher = (SDataInserterHandle*)pHandle;
if (taosQueueEmpty(pDeleter->pDataBlocks)) { *pLen = pDispatcher->submitRes.affectedRows;
*pQueryEnd = pDeleter->queryEnd; qDebug("got total affectedRows %" PRId64 , *pLen);
*pLen = 0;
return;
}
SDataInserterBuf* pBuf = NULL;
taosReadQitem(pDeleter->pDataBlocks, (void**)&pBuf);
memcpy(&pDeleter->nextOutput, pBuf, sizeof(SDataInserterBuf));
taosFreeQitem(pBuf);
*pLen = ((SDataCacheEntry*)(pDeleter->nextOutput.pData))->dataLen;
*pQueryEnd = pDeleter->queryEnd;
qDebug("got data len %d, row num %d in sink", *pLen, ((SDataCacheEntry*)(pDeleter->nextOutput.pData))->numOfRows);
} }
static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) {
SDataInserterHandle* pDeleter = (SDataInserterHandle*)pHandle;
if (NULL == pDeleter->nextOutput.pData) {
assert(pDeleter->queryEnd);
pOutput->useconds = pDeleter->useconds;
pOutput->precision = pDeleter->pSchema->precision;
pOutput->bufStatus = DS_BUF_EMPTY;
pOutput->queryEnd = pDeleter->queryEnd;
return TSDB_CODE_SUCCESS;
}
SDataCacheEntry* pEntry = (SDataCacheEntry*)(pDeleter->nextOutput.pData);
memcpy(pOutput->pData, pEntry->data, pEntry->dataLen);
pOutput->numOfRows = pEntry->numOfRows;
pOutput->numOfCols = pEntry->numOfCols;
pOutput->compressed = pEntry->compressed;
atomic_sub_fetch_64(&pDeleter->cachedSize, pEntry->dataLen);
atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pEntry->dataLen);
taosMemoryFreeClear(pDeleter->nextOutput.pData); // todo persistent
pOutput->bufStatus = updateStatus(pDeleter);
taosThreadMutexLock(&pDeleter->mutex);
pOutput->queryEnd = pDeleter->queryEnd;
pOutput->useconds = pDeleter->useconds;
pOutput->precision = pDeleter->pSchema->precision;
taosThreadMutexUnlock(&pDeleter->mutex);
return TSDB_CODE_SUCCESS;
}
static int32_t destroyDataSinker(SDataSinkHandle* pHandle) { static int32_t destroyDataSinker(SDataSinkHandle* pHandle) {
SDataInserterHandle* pDeleter = (SDataInserterHandle*)pHandle; SDataInserterHandle* pInserter = (SDataInserterHandle*)pHandle;
atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pDeleter->cachedSize); atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pInserter->cachedSize);
taosMemoryFreeClear(pDeleter->nextOutput.pData); taosArrayDestroy(pInserter->pDataBlocks);
while (!taosQueueEmpty(pDeleter->pDataBlocks)) { taosMemoryFree(pInserter->pSchema);
SDataInserterBuf* pBuf = NULL; taosThreadMutexDestroy(&pInserter->mutex);
taosReadQitem(pDeleter->pDataBlocks, (void**)&pBuf);
taosMemoryFreeClear(pBuf->pData);
taosFreeQitem(pBuf);
}
taosCloseQueue(pDeleter->pDataBlocks);
taosThreadMutexDestroy(&pDeleter->mutex);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -230,25 +301,46 @@ int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDat
return TSDB_CODE_QRY_OUT_OF_MEMORY; return TSDB_CODE_QRY_OUT_OF_MEMORY;
} }
SDataDeleterNode* pDeleterNode = (SDataDeleterNode *)pDataSink; SQueryInserterNode* pInserterNode = (SQueryInserterNode *)pDataSink;
inserter->sink.fPut = putDataBlock; inserter->sink.fPut = putDataBlock;
inserter->sink.fEndPut = endPut; inserter->sink.fEndPut = endPut;
inserter->sink.fGetLen = getDataLength; inserter->sink.fGetLen = getDataLength;
inserter->sink.fGetData = getDataBlock; inserter->sink.fGetData = NULL;
inserter->sink.fDestroy = destroyDataSinker; inserter->sink.fDestroy = destroyDataSinker;
inserter->sink.fGetCacheSize = getCacheSize; inserter->sink.fGetCacheSize = getCacheSize;
inserter->pManager = pManager; inserter->pManager = pManager;
inserter->pDeleter = pDeleterNode; inserter->pNode = pInserterNode;
inserter->pSchema = pDataSink->pInputDataBlockDesc;
inserter->pParam = pParam; inserter->pParam = pParam;
inserter->status = DS_BUF_EMPTY; inserter->status = DS_BUF_EMPTY;
inserter->queryEnd = false; inserter->queryEnd = false;
inserter->pDataBlocks = taosOpenQueue();
int64_t suid = 0;
int32_t code = tsdbGetTableSchema(inserter->pParam->readHandle->vnode, pInserterNode->tableId, &inserter->pSchema, &suid);
if (code) {
return code;
}
if (pInserterNode->stableId != suid) {
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
return terrno;
}
inserter->pDataBlocks = taosArrayInit(1, POINTER_BYTES);
taosThreadMutexInit(&inserter->mutex, NULL); taosThreadMutexInit(&inserter->mutex, NULL);
if (NULL == inserter->pDataBlocks) { if (NULL == inserter->pDataBlocks) {
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
return TSDB_CODE_QRY_OUT_OF_MEMORY; return TSDB_CODE_QRY_OUT_OF_MEMORY;
} }
inserter->pCols = taosHashInit(pInserterNode->pCols->length, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT), false, HASH_NO_LOCK);
SNode* pNode = NULL;
FOREACH(pNode, pInserterNode->pCols) {
SColumnNode* pCol = (SColumnNode*)pNode;
taosHashPut(inserter->pCols, &pCol->colId, sizeof(pCol->colId), &pCol->slotId, sizeof(pCol->slotId));
}
tsem_init(&inserter->ready, 0, 0);
*pHandle = inserter; *pHandle = inserter;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }

View File

@ -40,6 +40,8 @@ int32_t dsCreateDataSinker(const SDataSinkNode *pDataSink, DataSinkHandle* pHand
return createDataDispatcher(&gDataSinkManager, pDataSink, pHandle); return createDataDispatcher(&gDataSinkManager, pDataSink, pHandle);
case QUERY_NODE_PHYSICAL_PLAN_DELETE: case QUERY_NODE_PHYSICAL_PLAN_DELETE:
return createDataDeleter(&gDataSinkManager, pDataSink, pHandle, pParam); return createDataDeleter(&gDataSinkManager, pDataSink, pHandle, pParam);
case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT:
return createDataInserter(&gDataSinkManager, pDataSink, pHandle, pParam);
} }
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
@ -54,7 +56,7 @@ void dsEndPut(DataSinkHandle handle, uint64_t useconds) {
return pHandleImpl->fEndPut(pHandleImpl, useconds); return pHandleImpl->fEndPut(pHandleImpl, useconds);
} }
void dsGetDataLength(DataSinkHandle handle, int32_t* pLen, bool* pQueryEnd) { void dsGetDataLength(DataSinkHandle handle, int64_t* pLen, bool* pQueryEnd) {
SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle; SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle;
pHandleImpl->fGetLen(pHandleImpl, pLen, pQueryEnd); pHandleImpl->fGetLen(pHandleImpl, pLen, pQueryEnd);
} }

View File

@ -52,7 +52,7 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId,
if (handle) { if (handle) {
void* pSinkParam = NULL; void* pSinkParam = NULL;
code = createDataSinkParam(pSubplan->pDataSink, &pSinkParam, pTaskInfo); code = createDataSinkParam(pSubplan->pDataSink, &pSinkParam, pTaskInfo, readHandle);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }

View File

@ -1994,7 +1994,7 @@ static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {
taosMemoryFreeClear(pMsgBody); taosMemoryFreeClear(pMsgBody);
} }
void qProcessFetchRsp(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { void qProcessRspMsg(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->info.ahandle; SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->info.ahandle;
assert(pMsg->info.ahandle != NULL); assert(pMsg->info.ahandle != NULL);
@ -2463,6 +2463,8 @@ static void destroySortedMergeOperatorInfo(void* param, int32_t numOfOutput) {
blockDataDestroy(pInfo->binfo.pRes); blockDataDestroy(pInfo->binfo.pRes);
cleanupAggSup(&pInfo->aggSup); cleanupAggSup(&pInfo->aggSup);
taosMemoryFreeClear(param);
} }
static bool needToMerge(SSDataBlock* pBlock, SArray* groupInfo, char** buf, int32_t rowIndex) { static bool needToMerge(SSDataBlock* pBlock, SArray* groupInfo, char** buf, int32_t rowIndex) {
@ -3504,7 +3506,6 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator) {
} }
taosMemoryFreeClear(pOperator->exprSupp.pExprInfo); taosMemoryFreeClear(pOperator->exprSupp.pExprInfo);
taosMemoryFreeClear(pOperator->info);
taosMemoryFreeClear(pOperator); taosMemoryFreeClear(pOperator);
} }
@ -3674,11 +3675,15 @@ void cleanupBasicInfo(SOptrBasicInfo* pInfo) {
void destroyBasicOperatorInfo(void* param, int32_t numOfOutput) { void destroyBasicOperatorInfo(void* param, int32_t numOfOutput) {
SOptrBasicInfo* pInfo = (SOptrBasicInfo*)param; SOptrBasicInfo* pInfo = (SOptrBasicInfo*)param;
cleanupBasicInfo(pInfo); cleanupBasicInfo(pInfo);
taosMemoryFreeClear(param);
} }
void destroyAggOperatorInfo(void* param, int32_t numOfOutput) { void destroyAggOperatorInfo(void* param, int32_t numOfOutput) {
SAggOperatorInfo* pInfo = (SAggOperatorInfo*)param; SAggOperatorInfo* pInfo = (SAggOperatorInfo*)param;
cleanupBasicInfo(&pInfo->binfo); cleanupBasicInfo(&pInfo->binfo);
taosMemoryFreeClear(param);
} }
void destroySFillOperatorInfo(void* param, int32_t numOfOutput) { void destroySFillOperatorInfo(void* param, int32_t numOfOutput) {
@ -3686,6 +3691,8 @@ void destroySFillOperatorInfo(void* param, int32_t numOfOutput) {
pInfo->pFillInfo = taosDestroyFillInfo(pInfo->pFillInfo); pInfo->pFillInfo = taosDestroyFillInfo(pInfo->pFillInfo);
pInfo->pRes = blockDataDestroy(pInfo->pRes); pInfo->pRes = blockDataDestroy(pInfo->pRes);
taosMemoryFreeClear(pInfo->p); taosMemoryFreeClear(pInfo->p);
taosMemoryFreeClear(param);
} }
static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput) { static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput) {
@ -3696,6 +3703,8 @@ static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput) {
cleanupBasicInfo(&pInfo->binfo); cleanupBasicInfo(&pInfo->binfo);
cleanupAggSup(&pInfo->aggSup); cleanupAggSup(&pInfo->aggSup);
taosArrayDestroy(pInfo->pPseudoColInfo); taosArrayDestroy(pInfo->pPseudoColInfo);
taosMemoryFreeClear(param);
} }
void cleanupExprSupp(SExprSupp* pSupp) { void cleanupExprSupp(SExprSupp* pSupp) {
@ -3712,6 +3721,8 @@ static void destroyIndefinitOperatorInfo(void* param, int32_t numOfOutput) {
taosArrayDestroy(pInfo->pPseudoColInfo); taosArrayDestroy(pInfo->pPseudoColInfo);
cleanupAggSup(&pInfo->aggSup); cleanupAggSup(&pInfo->aggSup);
cleanupExprSupp(&pInfo->scalarSup); cleanupExprSupp(&pInfo->scalarSup);
taosMemoryFreeClear(param);
} }
void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput) { void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput) {
@ -3729,6 +3740,8 @@ void doDestroyExchangeOperatorInfo(void* param) {
} }
tsem_destroy(&pExInfo->ready); tsem_destroy(&pExInfo->ready);
taosMemoryFreeClear(param);
} }
static SArray* setRowTsColumnOutputInfo(SqlFunctionCtx* pCtx, int32_t numOfCols) { static SArray* setRowTsColumnOutputInfo(SqlFunctionCtx* pCtx, int32_t numOfCols) {
@ -4772,10 +4785,20 @@ int32_t decodeOperator(SOperatorInfo* ops, const char* result, int32_t length) {
return TDB_CODE_SUCCESS; return TDB_CODE_SUCCESS;
} }
int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pTaskInfo) { int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pTaskInfo, SReadHandle* readHandle) {
SExecTaskInfo* pTask = *(SExecTaskInfo**)pTaskInfo; SExecTaskInfo* pTask = *(SExecTaskInfo**)pTaskInfo;
switch (pNode->type) { switch (pNode->type) {
case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT: {
SInserterParam* pInserterParam = taosMemoryCalloc(1, sizeof(SInserterParam));
if (NULL == pInserterParam) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pInserterParam->readHandle = readHandle;
*pParam = pInserterParam;
break;
}
case QUERY_NODE_PHYSICAL_PLAN_DELETE: { case QUERY_NODE_PHYSICAL_PLAN_DELETE: {
SDeleterParam* pDeleterParam = taosMemoryCalloc(1, sizeof(SDeleterParam)); SDeleterParam* pDeleterParam = taosMemoryCalloc(1, sizeof(SDeleterParam));
if (NULL == pDeleterParam) { if (NULL == pDeleterParam) {

View File

@ -38,6 +38,8 @@ static void destroyGroupOperatorInfo(void* param, int32_t numOfOutput) {
taosArrayDestroy(pInfo->pGroupCols); taosArrayDestroy(pInfo->pGroupCols);
taosArrayDestroy(pInfo->pGroupColVals); taosArrayDestroy(pInfo->pGroupColVals);
cleanupExprSupp(&pInfo->scalarSup); cleanupExprSupp(&pInfo->scalarSup);
taosMemoryFreeClear(param);
} }
static int32_t initGroupOptrInfo(SArray** pGroupColVals, int32_t* keyLen, char** keyBuf, const SArray* pGroupColList) { static int32_t initGroupOptrInfo(SArray** pGroupColVals, int32_t* keyLen, char** keyBuf, const SArray* pGroupColList) {
@ -724,6 +726,8 @@ static void destroyPartitionOperatorInfo(void* param, int32_t numOfOutput) {
taosMemoryFree(pInfo->columnOffset); taosMemoryFree(pInfo->columnOffset);
cleanupExprSupp(&pInfo->scalarSup); cleanupExprSupp(&pInfo->scalarSup);
taosMemoryFreeClear(param);
} }
SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNode* pPartNode, SExecTaskInfo* pTaskInfo) { SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNode* pPartNode, SExecTaskInfo* pTaskInfo) {
@ -806,4 +810,4 @@ int32_t setGroupResultOutputBuf(SOperatorInfo* pOperator, SOptrBasicInfo* binfo,
setResultRowInitCtx(pResultRow, pCtx, numOfCols, pOperator->exprSupp.rowEntryInfoOffset); setResultRowInitCtx(pResultRow, pCtx, numOfCols, pOperator->exprSupp.rowEntryInfoOffset);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }

View File

@ -104,6 +104,8 @@ void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode) {
void destroyMergeJoinOperator(void* param, int32_t numOfOutput) { void destroyMergeJoinOperator(void* param, int32_t numOfOutput) {
SJoinOperatorInfo* pJoinOperator = (SJoinOperatorInfo*)param; SJoinOperatorInfo* pJoinOperator = (SJoinOperatorInfo*)param;
nodesDestroyNode(pJoinOperator->pCondAfterMerge); nodesDestroyNode(pJoinOperator->pCondAfterMerge);
taosMemoryFreeClear(param);
} }
static void doMergeJoinImpl(struct SOperatorInfo* pOperator, SSDataBlock* pRes) { static void doMergeJoinImpl(struct SOperatorInfo* pOperator, SSDataBlock* pRes) {

View File

@ -595,6 +595,8 @@ static void destroyTableScanOperatorInfo(void* param, int32_t numOfOutput) {
if (pTableScanInfo->pColMatchInfo != NULL) { if (pTableScanInfo->pColMatchInfo != NULL) {
taosArrayDestroy(pTableScanInfo->pColMatchInfo); taosArrayDestroy(pTableScanInfo->pColMatchInfo);
} }
taosMemoryFreeClear(param);
} }
SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle, SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle,
@ -743,6 +745,8 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) {
static void destroyBlockDistScanOperatorInfo(void* param, int32_t numOfOutput) { static void destroyBlockDistScanOperatorInfo(void* param, int32_t numOfOutput) {
SBlockDistInfo* pDistInfo = (SBlockDistInfo*)param; SBlockDistInfo* pDistInfo = (SBlockDistInfo*)param;
blockDataDestroy(pDistInfo->pResBlock); blockDataDestroy(pDistInfo->pResBlock);
taosMemoryFreeClear(param);
} }
SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SReadHandle* readHandle, uint64_t uid, SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SReadHandle* readHandle, uint64_t uid,
@ -1493,6 +1497,8 @@ static void destroySysScanOperator(void* param, int32_t numOfOutput) {
taosArrayDestroy(pInfo->scanCols); taosArrayDestroy(pInfo->scanCols);
taosMemoryFreeClear(pInfo->pUser); taosMemoryFreeClear(pInfo->pUser);
taosMemoryFreeClear(param);
} }
static int32_t getSysTableDbNameColId(const char* pTable) { static int32_t getSysTableDbNameColId(const char* pTable) {
@ -2171,6 +2177,8 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) { static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) {
STagScanInfo* pInfo = (STagScanInfo*)param; STagScanInfo* pInfo = (STagScanInfo*)param;
pInfo->pRes = blockDataDestroy(pInfo->pRes); pInfo->pRes = blockDataDestroy(pInfo->pRes);
taosMemoryFreeClear(param);
} }
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode, SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode,
@ -2661,6 +2669,8 @@ void destroyTableMergeScanOperatorInfo(void* param, int32_t numOfOutput) {
pTableScanInfo->pSortInputBlock = blockDataDestroy(pTableScanInfo->pSortInputBlock); pTableScanInfo->pSortInputBlock = blockDataDestroy(pTableScanInfo->pSortInputBlock);
taosArrayDestroy(pTableScanInfo->pSortInfo); taosArrayDestroy(pTableScanInfo->pSortInfo);
taosMemoryFreeClear(param);
} }
typedef struct STableMergeScanExecInfo { typedef struct STableMergeScanExecInfo {
@ -2792,6 +2802,8 @@ static void destroyLastrowScanOperator(void* param, int32_t numOfOutput) {
SLastrowScanInfo* pInfo = (SLastrowScanInfo*)param; SLastrowScanInfo* pInfo = (SLastrowScanInfo*)param;
blockDataDestroy(pInfo->pRes); blockDataDestroy(pInfo->pRes);
tsdbLastrowReaderClose(pInfo->pLastrowReader); tsdbLastrowReaderClose(pInfo->pLastrowReader);
taosMemoryFreeClear(param);
} }
SOperatorInfo* createLastrowScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandle* readHandle, SArray* pTableList, SOperatorInfo* createLastrowScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandle* readHandle, SArray* pTableList,

View File

@ -235,6 +235,8 @@ void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) {
taosArrayDestroy(pInfo->pSortInfo); taosArrayDestroy(pInfo->pSortInfo);
taosArrayDestroy(pInfo->pColMatchInfo); taosArrayDestroy(pInfo->pColMatchInfo);
taosMemoryFreeClear(param);
} }
int32_t getExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) { int32_t getExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
@ -451,6 +453,8 @@ void destroyGroupSortOperatorInfo(void* param, int32_t numOfOutput) {
taosArrayDestroy(pInfo->pSortInfo); taosArrayDestroy(pInfo->pSortInfo);
taosArrayDestroy(pInfo->pColMatchInfo); taosArrayDestroy(pInfo->pColMatchInfo);
taosMemoryFreeClear(param);
} }
SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSortPhysiNode* pSortPhyNode, SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSortPhysiNode* pSortPhyNode,
@ -670,6 +674,8 @@ void destroyMultiwayMergeOperatorInfo(void* param, int32_t numOfOutput) {
taosArrayDestroy(pInfo->pSortInfo); taosArrayDestroy(pInfo->pSortInfo);
taosArrayDestroy(pInfo->pColMatchInfo); taosArrayDestroy(pInfo->pColMatchInfo);
taosMemoryFreeClear(param);
} }
int32_t getMultiwayMergeExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) { int32_t getMultiwayMergeExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {

View File

@ -1557,6 +1557,8 @@ static void destroyStateWindowOperatorInfo(void* param, int32_t numOfOutput) {
SStateWindowOperatorInfo* pInfo = (SStateWindowOperatorInfo*)param; SStateWindowOperatorInfo* pInfo = (SStateWindowOperatorInfo*)param;
cleanupBasicInfo(&pInfo->binfo); cleanupBasicInfo(&pInfo->binfo);
taosMemoryFreeClear(pInfo->stateKey.pData); taosMemoryFreeClear(pInfo->stateKey.pData);
taosMemoryFreeClear(param);
} }
void destroyIntervalOperatorInfo(void* param, int32_t numOfOutput) { void destroyIntervalOperatorInfo(void* param, int32_t numOfOutput) {
@ -1564,6 +1566,8 @@ void destroyIntervalOperatorInfo(void* param, int32_t numOfOutput) {
cleanupBasicInfo(&pInfo->binfo); cleanupBasicInfo(&pInfo->binfo);
cleanupAggSup(&pInfo->aggSup); cleanupAggSup(&pInfo->aggSup);
taosArrayDestroy(pInfo->pRecycledPages); taosArrayDestroy(pInfo->pRecycledPages);
taosMemoryFreeClear(param);
} }
void destroyStreamFinalIntervalOperatorInfo(void* param, int32_t numOfOutput) { void destroyStreamFinalIntervalOperatorInfo(void* param, int32_t numOfOutput) {
@ -1586,6 +1590,8 @@ void destroyStreamFinalIntervalOperatorInfo(void* param, int32_t numOfOutput) {
} }
} }
nodesDestroyNode((SNode*)pInfo->pPhyNode); nodesDestroyNode((SNode*)pInfo->pPhyNode);
taosMemoryFreeClear(param);
} }
static bool allInvertible(SqlFunctionCtx* pFCtx, int32_t numOfCols) { static bool allInvertible(SqlFunctionCtx* pFCtx, int32_t numOfCols) {
@ -2319,6 +2325,8 @@ _error:
void destroySWindowOperatorInfo(void* param, int32_t numOfOutput) { void destroySWindowOperatorInfo(void* param, int32_t numOfOutput) {
SSessionAggOperatorInfo* pInfo = (SSessionAggOperatorInfo*)param; SSessionAggOperatorInfo* pInfo = (SSessionAggOperatorInfo*)param;
cleanupBasicInfo(&pInfo->binfo); cleanupBasicInfo(&pInfo->binfo);
taosMemoryFreeClear(param);
} }
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
@ -2995,6 +3003,8 @@ void destroyStreamSessionAggOperatorInfo(void* param, int32_t numOfOutput) {
taosMemoryFreeClear(pChInfo); taosMemoryFreeClear(pChInfo);
} }
} }
taosMemoryFreeClear(param);
} }
int32_t initBasicInfoEx(SOptrBasicInfo* pBasicInfo, SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfCols, int32_t initBasicInfoEx(SOptrBasicInfo* pBasicInfo, SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfCols,
@ -3954,6 +3964,8 @@ void destroyStreamStateOperatorInfo(void* param, int32_t numOfOutput) {
taosMemoryFreeClear(pChInfo); taosMemoryFreeClear(pChInfo);
} }
} }
taosMemoryFreeClear(param);
} }
int64_t getStateWinTsKey(void* data, int32_t index) { int64_t getStateWinTsKey(void* data, int32_t index) {
@ -4357,7 +4369,7 @@ _error:
} }
typedef struct SMergeAlignedIntervalAggOperatorInfo { typedef struct SMergeAlignedIntervalAggOperatorInfo {
SIntervalAggOperatorInfo intervalAggOperatorInfo; SIntervalAggOperatorInfo *intervalAggOperatorInfo;
bool hasGroupId; bool hasGroupId;
uint64_t groupId; uint64_t groupId;
@ -4367,13 +4379,15 @@ typedef struct SMergeAlignedIntervalAggOperatorInfo {
void destroyMergeAlignedIntervalOperatorInfo(void* param, int32_t numOfOutput) { void destroyMergeAlignedIntervalOperatorInfo(void* param, int32_t numOfOutput) {
SMergeAlignedIntervalAggOperatorInfo* miaInfo = (SMergeAlignedIntervalAggOperatorInfo*)param; SMergeAlignedIntervalAggOperatorInfo* miaInfo = (SMergeAlignedIntervalAggOperatorInfo*)param;
destroyIntervalOperatorInfo(&miaInfo->intervalAggOperatorInfo, numOfOutput); destroyIntervalOperatorInfo(miaInfo->intervalAggOperatorInfo, numOfOutput);
taosMemoryFreeClear(param);
} }
static int32_t outputMergeAlignedIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t tableGroupId, static int32_t outputMergeAlignedIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t tableGroupId,
SSDataBlock* pResultBlock, TSKEY wstartTs) { SSDataBlock* pResultBlock, TSKEY wstartTs) {
SMergeAlignedIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info; SMergeAlignedIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
SIntervalAggOperatorInfo* iaInfo = &miaInfo->intervalAggOperatorInfo; SIntervalAggOperatorInfo* iaInfo = miaInfo->intervalAggOperatorInfo;
SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
SExprSupp* pSup = &pOperatorInfo->exprSupp; SExprSupp* pSup = &pOperatorInfo->exprSupp;
@ -4394,7 +4408,7 @@ static int32_t outputMergeAlignedIntervalResult(SOperatorInfo* pOperatorInfo, ui
static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo,
SSDataBlock* pBlock, int32_t scanFlag, SSDataBlock* pResultBlock) { SSDataBlock* pBlock, int32_t scanFlag, SSDataBlock* pResultBlock) {
SMergeAlignedIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info; SMergeAlignedIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
SIntervalAggOperatorInfo* iaInfo = &miaInfo->intervalAggOperatorInfo; SIntervalAggOperatorInfo* iaInfo = miaInfo->intervalAggOperatorInfo;
SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
SExprSupp* pSup = &pOperatorInfo->exprSupp; SExprSupp* pSup = &pOperatorInfo->exprSupp;
@ -4459,7 +4473,7 @@ static SSDataBlock* doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SMergeAlignedIntervalAggOperatorInfo* miaInfo = pOperator->info; SMergeAlignedIntervalAggOperatorInfo* miaInfo = pOperator->info;
SIntervalAggOperatorInfo* iaInfo = &miaInfo->intervalAggOperatorInfo; SIntervalAggOperatorInfo* iaInfo = miaInfo->intervalAggOperatorInfo;
if (pOperator->status == OP_EXEC_DONE) { if (pOperator->status == OP_EXEC_DONE) {
return NULL; return NULL;
} }
@ -4525,7 +4539,12 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream,
goto _error; goto _error;
} }
SIntervalAggOperatorInfo* iaInfo = &miaInfo->intervalAggOperatorInfo; miaInfo->intervalAggOperatorInfo = taosMemoryCalloc(1, sizeof(SIntervalAggOperatorInfo));
if (miaInfo->intervalAggOperatorInfo == NULL) {
goto _error;
}
SIntervalAggOperatorInfo* iaInfo = miaInfo->intervalAggOperatorInfo;
SExprSupp* pSup = &pOperator->exprSupp; SExprSupp* pSup = &pOperator->exprSupp;
iaInfo->win = pTaskInfo->window; iaInfo->win = pTaskInfo->window;
@ -4602,6 +4621,8 @@ void destroyMergeIntervalOperatorInfo(void* param, int32_t numOfOutput) {
SMergeIntervalAggOperatorInfo* miaInfo = (SMergeIntervalAggOperatorInfo*)param; SMergeIntervalAggOperatorInfo* miaInfo = (SMergeIntervalAggOperatorInfo*)param;
tdListFree(miaInfo->groupIntervals); tdListFree(miaInfo->groupIntervals);
destroyIntervalOperatorInfo(&miaInfo->intervalAggOperatorInfo, numOfOutput); destroyIntervalOperatorInfo(&miaInfo->intervalAggOperatorInfo, numOfOutput);
taosMemoryFreeClear(param);
} }
static int32_t finalizeWindowResult(SOperatorInfo* pOperatorInfo, uint64_t tableGroupId, STimeWindow* win, static int32_t finalizeWindowResult(SOperatorInfo* pOperatorInfo, uint64_t tableGroupId, STimeWindow* win,

View File

@ -3605,7 +3605,7 @@ static int32_t databaseOptionsToJson(const void* pObj, SJson* pJson) {
int32_t code = tjsonAddIntegerToObject(pJson, jkDatabaseOptionsBuffer, pNode->buffer); int32_t code = tjsonAddIntegerToObject(pJson, jkDatabaseOptionsBuffer, pNode->buffer);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkDatabaseOptionsCachelast, pNode->cachelast); code = tjsonAddIntegerToObject(pJson, jkDatabaseOptionsCachelast, pNode->cacheLast);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkDatabaseOptionsCompressionLevel, pNode->compressionLevel); code = tjsonAddIntegerToObject(pJson, jkDatabaseOptionsCompressionLevel, pNode->compressionLevel);
@ -3667,7 +3667,7 @@ static int32_t jsonToDatabaseOptions(const SJson* pJson, void* pObj) {
int32_t code = tjsonGetIntValue(pJson, jkDatabaseOptionsBuffer, &pNode->buffer); int32_t code = tjsonGetIntValue(pJson, jkDatabaseOptionsBuffer, &pNode->buffer);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetTinyIntValue(pJson, jkDatabaseOptionsCachelast, &pNode->cachelast); code = tjsonGetTinyIntValue(pJson, jkDatabaseOptionsCachelast, &pNode->cacheLast);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetTinyIntValue(pJson, jkDatabaseOptionsCompressionLevel, &pNode->compressionLevel); code = tjsonGetTinyIntValue(pJson, jkDatabaseOptionsCompressionLevel, &pNode->compressionLevel);

View File

@ -39,6 +39,7 @@ typedef struct SAstCreateContext {
typedef enum EDatabaseOptionType { typedef enum EDatabaseOptionType {
DB_OPTION_BUFFER = 1, DB_OPTION_BUFFER = 1,
DB_OPTION_CACHELAST, DB_OPTION_CACHELAST,
DB_OPTION_CACHELASTSIZE,
DB_OPTION_COMP, DB_OPTION_COMP,
DB_OPTION_DAYS, DB_OPTION_DAYS,
DB_OPTION_FSYNC, DB_OPTION_FSYNC,

View File

@ -172,6 +172,7 @@ exists_opt(A) ::= .
db_options(A) ::= . { A = createDefaultDatabaseOptions(pCxt); } db_options(A) ::= . { A = createDefaultDatabaseOptions(pCxt); }
db_options(A) ::= db_options(B) BUFFER NK_INTEGER(C). { A = setDatabaseOption(pCxt, B, DB_OPTION_BUFFER, &C); } db_options(A) ::= db_options(B) BUFFER NK_INTEGER(C). { A = setDatabaseOption(pCxt, B, DB_OPTION_BUFFER, &C); }
db_options(A) ::= db_options(B) CACHELAST NK_INTEGER(C). { A = setDatabaseOption(pCxt, B, DB_OPTION_CACHELAST, &C); } db_options(A) ::= db_options(B) CACHELAST NK_INTEGER(C). { A = setDatabaseOption(pCxt, B, DB_OPTION_CACHELAST, &C); }
db_options(A) ::= db_options(B) CACHELASTSIZE NK_INTEGER(C). { A = setDatabaseOption(pCxt, B, DB_OPTION_CACHELASTSIZE, &C); }
db_options(A) ::= db_options(B) COMP NK_INTEGER(C). { A = setDatabaseOption(pCxt, B, DB_OPTION_COMP, &C); } db_options(A) ::= db_options(B) COMP NK_INTEGER(C). { A = setDatabaseOption(pCxt, B, DB_OPTION_COMP, &C); }
db_options(A) ::= db_options(B) DURATION NK_INTEGER(C). { A = setDatabaseOption(pCxt, B, DB_OPTION_DAYS, &C); } db_options(A) ::= db_options(B) DURATION NK_INTEGER(C). { A = setDatabaseOption(pCxt, B, DB_OPTION_DAYS, &C); }
db_options(A) ::= db_options(B) DURATION NK_VARIABLE(C). { A = setDatabaseOption(pCxt, B, DB_OPTION_DAYS, &C); } db_options(A) ::= db_options(B) DURATION NK_VARIABLE(C). { A = setDatabaseOption(pCxt, B, DB_OPTION_DAYS, &C); }
@ -198,6 +199,7 @@ alter_db_options(A) ::= alter_db_options(B) alter_db_option(C).
%destructor alter_db_option { } %destructor alter_db_option { }
alter_db_option(A) ::= BUFFER NK_INTEGER(B). { A.type = DB_OPTION_BUFFER; A.val = B; } alter_db_option(A) ::= BUFFER NK_INTEGER(B). { A.type = DB_OPTION_BUFFER; A.val = B; }
alter_db_option(A) ::= CACHELAST NK_INTEGER(B). { A.type = DB_OPTION_CACHELAST; A.val = B; } alter_db_option(A) ::= CACHELAST NK_INTEGER(B). { A.type = DB_OPTION_CACHELAST; A.val = B; }
alter_db_option(A) ::= CACHELASTSIZE NK_INTEGER(B). { A.type = DB_OPTION_CACHELASTSIZE; A.val = B; }
alter_db_option(A) ::= FSYNC NK_INTEGER(B). { A.type = DB_OPTION_FSYNC; A.val = B; } alter_db_option(A) ::= FSYNC NK_INTEGER(B). { A.type = DB_OPTION_FSYNC; A.val = B; }
alter_db_option(A) ::= KEEP integer_list(B). { A.type = DB_OPTION_KEEP; A.pList = B; } alter_db_option(A) ::= KEEP integer_list(B). { A.type = DB_OPTION_KEEP; A.pList = B; }
alter_db_option(A) ::= KEEP variable_list(B). { A.type = DB_OPTION_KEEP; A.pList = B; } alter_db_option(A) ::= KEEP variable_list(B). { A.type = DB_OPTION_KEEP; A.pList = B; }

View File

@ -746,7 +746,8 @@ SNode* createDefaultDatabaseOptions(SAstCreateContext* pCxt) {
SDatabaseOptions* pOptions = (SDatabaseOptions*)nodesMakeNode(QUERY_NODE_DATABASE_OPTIONS); SDatabaseOptions* pOptions = (SDatabaseOptions*)nodesMakeNode(QUERY_NODE_DATABASE_OPTIONS);
CHECK_OUT_OF_MEM(pOptions); CHECK_OUT_OF_MEM(pOptions);
pOptions->buffer = TSDB_DEFAULT_BUFFER_PER_VNODE; pOptions->buffer = TSDB_DEFAULT_BUFFER_PER_VNODE;
pOptions->cachelast = TSDB_DEFAULT_CACHE_LAST_ROW; pOptions->cacheLast = TSDB_DEFAULT_CACHE_LAST_ROW;
pOptions->cacheLastSize = TSDB_DEFAULT_LAST_ROW_MEM;
pOptions->compressionLevel = TSDB_DEFAULT_COMP_LEVEL; pOptions->compressionLevel = TSDB_DEFAULT_COMP_LEVEL;
pOptions->daysPerFile = TSDB_DEFAULT_DAYS_PER_FILE; pOptions->daysPerFile = TSDB_DEFAULT_DAYS_PER_FILE;
pOptions->fsyncPeriod = TSDB_DEFAULT_FSYNC_PERIOD; pOptions->fsyncPeriod = TSDB_DEFAULT_FSYNC_PERIOD;
@ -772,7 +773,8 @@ SNode* createAlterDatabaseOptions(SAstCreateContext* pCxt) {
SDatabaseOptions* pOptions = (SDatabaseOptions*)nodesMakeNode(QUERY_NODE_DATABASE_OPTIONS); SDatabaseOptions* pOptions = (SDatabaseOptions*)nodesMakeNode(QUERY_NODE_DATABASE_OPTIONS);
CHECK_OUT_OF_MEM(pOptions); CHECK_OUT_OF_MEM(pOptions);
pOptions->buffer = -1; pOptions->buffer = -1;
pOptions->cachelast = -1; pOptions->cacheLast = -1;
pOptions->cacheLastSize = -1;
pOptions->compressionLevel = -1; pOptions->compressionLevel = -1;
pOptions->daysPerFile = -1; pOptions->daysPerFile = -1;
pOptions->fsyncPeriod = -1; pOptions->fsyncPeriod = -1;
@ -800,7 +802,10 @@ SNode* setDatabaseOption(SAstCreateContext* pCxt, SNode* pOptions, EDatabaseOpti
((SDatabaseOptions*)pOptions)->buffer = taosStr2Int32(((SToken*)pVal)->z, NULL, 10); ((SDatabaseOptions*)pOptions)->buffer = taosStr2Int32(((SToken*)pVal)->z, NULL, 10);
break; break;
case DB_OPTION_CACHELAST: case DB_OPTION_CACHELAST:
((SDatabaseOptions*)pOptions)->cachelast = taosStr2Int8(((SToken*)pVal)->z, NULL, 10); ((SDatabaseOptions*)pOptions)->cacheLast = taosStr2Int8(((SToken*)pVal)->z, NULL, 10);
break;
case DB_OPTION_CACHELASTSIZE:
((SDatabaseOptions*)pOptions)->cacheLastSize = taosStr2Int32(((SToken*)pVal)->z, NULL, 10);
break; break;
case DB_OPTION_COMP: case DB_OPTION_COMP:
((SDatabaseOptions*)pOptions)->compressionLevel = taosStr2Int8(((SToken*)pVal)->z, NULL, 10); ((SDatabaseOptions*)pOptions)->compressionLevel = taosStr2Int8(((SToken*)pVal)->z, NULL, 10);

View File

@ -53,6 +53,7 @@ static SKeyword keywordTable[] = {
{"BY", TK_BY}, {"BY", TK_BY},
{"CACHE", TK_CACHE}, {"CACHE", TK_CACHE},
{"CACHELAST", TK_CACHELAST}, {"CACHELAST", TK_CACHELAST},
{"CACHELASTSIZE", TK_CACHELASTSIZE},
{"CAST", TK_CAST}, {"CAST", TK_CAST},
{"CLIENT_VERSION", TK_CLIENT_VERSION}, {"CLIENT_VERSION", TK_CLIENT_VERSION},
{"CLUSTER", TK_CLUSTER}, {"CLUSTER", TK_CLUSTER},

View File

@ -2988,7 +2988,8 @@ static int32_t buildCreateDbReq(STranslateContext* pCxt, SCreateDatabaseStmt* pS
pReq->compression = pStmt->pOptions->compressionLevel; pReq->compression = pStmt->pOptions->compressionLevel;
pReq->replications = pStmt->pOptions->replica; pReq->replications = pStmt->pOptions->replica;
pReq->strict = pStmt->pOptions->strict; pReq->strict = pStmt->pOptions->strict;
pReq->cacheLastRow = pStmt->pOptions->cachelast; pReq->cacheLastRow = pStmt->pOptions->cacheLast;
pReq->lastRowMem = pStmt->pOptions->cacheLastSize;
pReq->schemaless = pStmt->pOptions->schemaless; pReq->schemaless = pStmt->pOptions->schemaless;
pReq->ignoreExist = pStmt->ignoreExists; pReq->ignoreExist = pStmt->ignoreExists;
return buildCreateDbRetentions(pStmt->pOptions->pRetentions, pReq); return buildCreateDbRetentions(pStmt->pOptions->pRetentions, pReq);
@ -3149,9 +3150,13 @@ static int32_t checkDatabaseOptions(STranslateContext* pCxt, const char* pDbName
int32_t code = int32_t code =
checkRangeOption(pCxt, "buffer", pOptions->buffer, TSDB_MIN_BUFFER_PER_VNODE, TSDB_MAX_BUFFER_PER_VNODE); checkRangeOption(pCxt, "buffer", pOptions->buffer, TSDB_MIN_BUFFER_PER_VNODE, TSDB_MAX_BUFFER_PER_VNODE);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = checkRangeOption(pCxt, "cacheLast", pOptions->cachelast, TSDB_MIN_DB_CACHE_LAST_ROW, code = checkRangeOption(pCxt, "cacheLast", pOptions->cacheLast, TSDB_MIN_DB_CACHE_LAST_ROW,
TSDB_MAX_DB_CACHE_LAST_ROW); TSDB_MAX_DB_CACHE_LAST_ROW);
} }
if (TSDB_CODE_SUCCESS == code) {
code = checkRangeOption(pCxt, "cacheLastSize", pOptions->cacheLastSize, TSDB_MIN_DB_LAST_ROW_MEM,
TSDB_MAX_DB_LAST_ROW_MEM);
}
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = checkRangeOption(pCxt, "compression", pOptions->compressionLevel, TSDB_MIN_COMP_LEVEL, TSDB_MAX_COMP_LEVEL); code = checkRangeOption(pCxt, "compression", pOptions->compressionLevel, TSDB_MIN_COMP_LEVEL, TSDB_MAX_COMP_LEVEL);
} }
@ -3271,7 +3276,8 @@ static void buildAlterDbReq(STranslateContext* pCxt, SAlterDatabaseStmt* pStmt,
pReq->fsyncPeriod = pStmt->pOptions->fsyncPeriod; pReq->fsyncPeriod = pStmt->pOptions->fsyncPeriod;
pReq->walLevel = pStmt->pOptions->walLevel; pReq->walLevel = pStmt->pOptions->walLevel;
pReq->strict = pStmt->pOptions->strict; pReq->strict = pStmt->pOptions->strict;
pReq->cacheLastRow = pStmt->pOptions->cachelast; pReq->cacheLastRow = pStmt->pOptions->cacheLast;
pReq->lastRowMem = pStmt->pOptions->cacheLastSize;
pReq->replications = pStmt->pOptions->replica; pReq->replications = pStmt->pOptions->replica;
return; return;
} }

View File

@ -39,6 +39,9 @@ bool qIsInsertValuesSql(const char* pStr, size_t length) {
if (TK_USING == t.type || TK_VALUES == t.type) { if (TK_USING == t.type || TK_VALUES == t.type) {
return true; return true;
} }
if (0 == t.type) {
break;
}
} while (pStr - pSql < length); } while (pStr - pSql < length);
return false; return false;
} }

File diff suppressed because it is too large Load Diff

View File

@ -77,6 +77,7 @@ TEST_F(ParserInitialCTest, createDatabase) {
expect.ignoreExist = igExists; expect.ignoreExist = igExists;
expect.buffer = TSDB_DEFAULT_BUFFER_PER_VNODE; expect.buffer = TSDB_DEFAULT_BUFFER_PER_VNODE;
expect.cacheLastRow = TSDB_DEFAULT_CACHE_LAST_ROW; expect.cacheLastRow = TSDB_DEFAULT_CACHE_LAST_ROW;
expect.lastRowMem = TSDB_DEFAULT_LAST_ROW_MEM;
expect.compression = TSDB_DEFAULT_COMP_LEVEL; expect.compression = TSDB_DEFAULT_COMP_LEVEL;
expect.daysPerFile = TSDB_DEFAULT_DAYS_PER_FILE; expect.daysPerFile = TSDB_DEFAULT_DAYS_PER_FILE;
expect.fsyncPeriod = TSDB_DEFAULT_FSYNC_PERIOD; expect.fsyncPeriod = TSDB_DEFAULT_FSYNC_PERIOD;
@ -97,7 +98,8 @@ TEST_F(ParserInitialCTest, createDatabase) {
}; };
auto setDbBufferFunc = [&](int32_t buffer) { expect.buffer = buffer; }; auto setDbBufferFunc = [&](int32_t buffer) { expect.buffer = buffer; };
auto setDbCachelastFunc = [&](int8_t CACHELAST) { expect.cacheLastRow = CACHELAST; }; auto setDbCachelastFunc = [&](int8_t cachelast) { expect.cacheLastRow = cachelast; };
auto setDbCachelastSize = [&](int8_t cachelastSize) { expect.lastRowMem = cachelastSize; };
auto setDbCompressionFunc = [&](int8_t compressionLevel) { expect.compression = compressionLevel; }; auto setDbCompressionFunc = [&](int8_t compressionLevel) { expect.compression = compressionLevel; };
auto setDbDaysFunc = [&](int32_t daysPerFile) { expect.daysPerFile = daysPerFile; }; auto setDbDaysFunc = [&](int32_t daysPerFile) { expect.daysPerFile = daysPerFile; };
auto setDbFsyncFunc = [&](int32_t fsyncPeriod) { expect.fsyncPeriod = fsyncPeriod; }; auto setDbFsyncFunc = [&](int32_t fsyncPeriod) { expect.fsyncPeriod = fsyncPeriod; };
@ -154,6 +156,7 @@ TEST_F(ParserInitialCTest, createDatabase) {
ASSERT_EQ(req.replications, expect.replications); ASSERT_EQ(req.replications, expect.replications);
ASSERT_EQ(req.strict, expect.strict); ASSERT_EQ(req.strict, expect.strict);
ASSERT_EQ(req.cacheLastRow, expect.cacheLastRow); ASSERT_EQ(req.cacheLastRow, expect.cacheLastRow);
ASSERT_EQ(req.lastRowMem, expect.lastRowMem);
// ASSERT_EQ(req.schemaless, expect.schemaless); // ASSERT_EQ(req.schemaless, expect.schemaless);
ASSERT_EQ(req.ignoreExist, expect.ignoreExist); ASSERT_EQ(req.ignoreExist, expect.ignoreExist);
ASSERT_EQ(req.numOfRetensions, expect.numOfRetensions); ASSERT_EQ(req.numOfRetensions, expect.numOfRetensions);
@ -179,6 +182,7 @@ TEST_F(ParserInitialCTest, createDatabase) {
setCreateDbReqFunc("wxy_db", 1); setCreateDbReqFunc("wxy_db", 1);
setDbBufferFunc(64); setDbBufferFunc(64);
setDbCachelastFunc(2); setDbCachelastFunc(2);
setDbCachelastSize(20);
setDbCompressionFunc(1); setDbCompressionFunc(1);
setDbDaysFunc(100 * 1440); setDbDaysFunc(100 * 1440);
setDbFsyncFunc(100); setDbFsyncFunc(100);
@ -200,6 +204,7 @@ TEST_F(ParserInitialCTest, createDatabase) {
run("CREATE DATABASE IF NOT EXISTS wxy_db " run("CREATE DATABASE IF NOT EXISTS wxy_db "
"BUFFER 64 " "BUFFER 64 "
"CACHELAST 2 " "CACHELAST 2 "
"CACHELASTSIZE 20 "
"COMP 1 " "COMP 1 "
"DURATION 100 " "DURATION 100 "
"FSYNC 100 " "FSYNC 100 "

View File

@ -1340,6 +1340,17 @@ static void doSetLogicNodeParent(SLogicNode* pNode, SLogicNode* pParent) {
static void setLogicNodeParent(SLogicNode* pNode) { doSetLogicNodeParent(pNode, NULL); } static void setLogicNodeParent(SLogicNode* pNode) { doSetLogicNodeParent(pNode, NULL); }
static void setLogicSubplanType(SLogicSubplan* pSubplan) {
if (QUERY_NODE_LOGIC_PLAN_VNODE_MODIFY != nodeType(pSubplan->pNode)) {
pSubplan->subplanType = SUBPLAN_TYPE_SCAN;
} else {
SVnodeModifyLogicNode* pModify = (SVnodeModifyLogicNode*)pSubplan->pNode;
pSubplan->subplanType = (MODIFY_TABLE_TYPE_INSERT == pModify->modifyType && NULL != pModify->node.pChildren)
? SUBPLAN_TYPE_SCAN
: SUBPLAN_TYPE_MODIFY;
}
}
int32_t createLogicPlan(SPlanContext* pCxt, SLogicSubplan** pLogicSubplan) { int32_t createLogicPlan(SPlanContext* pCxt, SLogicSubplan** pLogicSubplan) {
SLogicPlanContext cxt = {.pPlanCxt = pCxt}; SLogicPlanContext cxt = {.pPlanCxt = pCxt};
@ -1354,11 +1365,7 @@ int32_t createLogicPlan(SPlanContext* pCxt, SLogicSubplan** pLogicSubplan) {
int32_t code = createQueryLogicNode(&cxt, pCxt->pAstRoot, &pSubplan->pNode); int32_t code = createQueryLogicNode(&cxt, pCxt->pAstRoot, &pSubplan->pNode);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
setLogicNodeParent(pSubplan->pNode); setLogicNodeParent(pSubplan->pNode);
if (QUERY_NODE_LOGIC_PLAN_VNODE_MODIFY == nodeType(pSubplan->pNode)) { setLogicSubplanType(pSubplan);
pSubplan->subplanType = SUBPLAN_TYPE_MODIFY;
} else {
pSubplan->subplanType = SUBPLAN_TYPE_SCAN;
}
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {

View File

@ -1566,7 +1566,7 @@ static int32_t buildInsertSelectSubplan(SPhysiPlanContext* pCxt, SVnodeModifyLog
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = createQueryInserter(pCxt, pModify, pSubplan, &pSubplan->pDataSink); code = createQueryInserter(pCxt, pModify, pSubplan, &pSubplan->pDataSink);
} }
pSubplan->msgType = TDMT_VND_SUBMIT; pSubplan->msgType = TDMT_SCH_MERGE_QUERY;
return code; return code;
} }

View File

@ -39,7 +39,6 @@ typedef struct SSplitRule {
FSplit splitFunc; FSplit splitFunc;
} SSplitRule; } SSplitRule;
// typedef bool (*FSplFindSplitNode)(SSplitContext* pCxt, SLogicSubplan* pSubplan, void* pInfo);
typedef bool (*FSplFindSplitNode)(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode, void* pInfo); typedef bool (*FSplFindSplitNode)(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode, void* pInfo);
static void splSetSubplanVgroups(SLogicSubplan* pSubplan, SLogicNode* pNode) { static void splSetSubplanVgroups(SLogicSubplan* pSubplan, SLogicNode* pNode) {
@ -67,6 +66,19 @@ static SLogicSubplan* splCreateScanSubplan(SSplitContext* pCxt, SLogicNode* pNod
return pSubplan; return pSubplan;
} }
static SLogicSubplan* splCreateSubplan(SSplitContext* pCxt, SLogicNode* pNode, ESubplanType subplanType) {
SLogicSubplan* pSubplan = (SLogicSubplan*)nodesMakeNode(QUERY_NODE_LOGIC_SUBPLAN);
if (NULL == pSubplan) {
return NULL;
}
pSubplan->id.queryId = pCxt->queryId;
pSubplan->id.groupId = pCxt->groupId;
pSubplan->subplanType = subplanType;
pSubplan->pNode = pNode;
pNode->pParent = NULL;
return pSubplan;
}
static int32_t splCreateExchangeNode(SSplitContext* pCxt, SLogicNode* pChild, SExchangeLogicNode** pOutput) { static int32_t splCreateExchangeNode(SSplitContext* pCxt, SLogicNode* pChild, SExchangeLogicNode** pOutput) {
SExchangeLogicNode* pExchange = (SExchangeLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_EXCHANGE); SExchangeLogicNode* pExchange = (SExchangeLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_EXCHANGE);
if (NULL == pExchange) { if (NULL == pExchange) {
@ -98,6 +110,43 @@ static int32_t splCreateExchangeNodeForSubplan(SSplitContext* pCxt, SLogicSubpla
return code; return code;
} }
static bool splIsChildSubplan(SLogicNode* pLogicNode, int32_t groupId) {
if (QUERY_NODE_LOGIC_PLAN_EXCHANGE == nodeType(pLogicNode)) {
return ((SExchangeLogicNode*)pLogicNode)->srcGroupId == groupId;
}
if (QUERY_NODE_LOGIC_PLAN_MERGE == nodeType(pLogicNode)) {
return ((SMergeLogicNode*)pLogicNode)->srcGroupId == groupId;
}
SNode* pChild;
FOREACH(pChild, pLogicNode->pChildren) {
bool isChild = splIsChildSubplan((SLogicNode*)pChild, groupId);
if (isChild) {
return isChild;
}
}
return false;
}
static int32_t splMountSubplan(SLogicSubplan* pParent, SNodeList* pChildren) {
SNode* pChild = NULL;
WHERE_EACH(pChild, pChildren) {
if (splIsChildSubplan(pParent->pNode, ((SLogicSubplan*)pChild)->id.groupId)) {
int32_t code = nodesListMakeAppend(&pParent->pChildren, pChild);
if (TSDB_CODE_SUCCESS == code) {
REPLACE_NODE(NULL);
ERASE_NODE(pChildren);
continue;
} else {
return code;
}
}
WHERE_NEXT;
}
return TSDB_CODE_SUCCESS;
}
static bool splMatchByNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode, FSplFindSplitNode func, static bool splMatchByNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode, FSplFindSplitNode func,
void* pInfo) { void* pInfo) {
if (func(pCxt, pSubplan, pNode, pInfo)) { if (func(pCxt, pSubplan, pNode, pInfo)) {
@ -982,56 +1031,6 @@ static int32_t singleTableJoinSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan
return code; return code;
} }
static bool unionIsChildSubplan(SLogicNode* pLogicNode, int32_t groupId) {
if (QUERY_NODE_LOGIC_PLAN_EXCHANGE == nodeType(pLogicNode)) {
return ((SExchangeLogicNode*)pLogicNode)->srcGroupId == groupId;
}
if (QUERY_NODE_LOGIC_PLAN_MERGE == nodeType(pLogicNode)) {
return ((SMergeLogicNode*)pLogicNode)->srcGroupId == groupId;
}
SNode* pChild;
FOREACH(pChild, pLogicNode->pChildren) {
bool isChild = unionIsChildSubplan((SLogicNode*)pChild, groupId);
if (isChild) {
return isChild;
}
}
return false;
}
static int32_t unionMountSubplan(SLogicSubplan* pParent, SNodeList* pChildren) {
SNode* pChild = NULL;
WHERE_EACH(pChild, pChildren) {
if (unionIsChildSubplan(pParent->pNode, ((SLogicSubplan*)pChild)->id.groupId)) {
int32_t code = nodesListMakeAppend(&pParent->pChildren, pChild);
if (TSDB_CODE_SUCCESS == code) {
REPLACE_NODE(NULL);
ERASE_NODE(pChildren);
continue;
} else {
return code;
}
}
WHERE_NEXT;
}
return TSDB_CODE_SUCCESS;
}
static SLogicSubplan* unionCreateSubplan(SSplitContext* pCxt, SLogicNode* pNode, ESubplanType subplanType) {
SLogicSubplan* pSubplan = (SLogicSubplan*)nodesMakeNode(QUERY_NODE_LOGIC_SUBPLAN);
if (NULL == pSubplan) {
return NULL;
}
pSubplan->id.queryId = pCxt->queryId;
pSubplan->id.groupId = pCxt->groupId;
pSubplan->subplanType = subplanType;
pSubplan->pNode = pNode;
pNode->pParent = NULL;
return pSubplan;
}
static int32_t unionSplitSubplan(SSplitContext* pCxt, SLogicSubplan* pUnionSubplan, SLogicNode* pSplitNode) { static int32_t unionSplitSubplan(SSplitContext* pCxt, SLogicSubplan* pUnionSubplan, SLogicNode* pSplitNode) {
SNodeList* pSubplanChildren = pUnionSubplan->pChildren; SNodeList* pSubplanChildren = pUnionSubplan->pChildren;
pUnionSubplan->pChildren = NULL; pUnionSubplan->pChildren = NULL;
@ -1040,11 +1039,11 @@ static int32_t unionSplitSubplan(SSplitContext* pCxt, SLogicSubplan* pUnionSubpl
SNode* pChild = NULL; SNode* pChild = NULL;
FOREACH(pChild, pSplitNode->pChildren) { FOREACH(pChild, pSplitNode->pChildren) {
SLogicSubplan* pNewSubplan = unionCreateSubplan(pCxt, (SLogicNode*)pChild, pUnionSubplan->subplanType); SLogicSubplan* pNewSubplan = splCreateSubplan(pCxt, (SLogicNode*)pChild, pUnionSubplan->subplanType);
code = nodesListMakeStrictAppend(&pUnionSubplan->pChildren, (SNode*)pNewSubplan); code = nodesListMakeStrictAppend(&pUnionSubplan->pChildren, (SNode*)pNewSubplan);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
REPLACE_NODE(NULL); REPLACE_NODE(NULL);
code = unionMountSubplan(pNewSubplan, pSubplanChildren); code = splMountSubplan(pNewSubplan, pSubplanChildren);
} }
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
break; break;
@ -1219,14 +1218,24 @@ static int32_t insertSelectSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t code = splCreateExchangeNodeForSubplan(pCxt, info.pSubplan, info.pQueryRoot, info.pSubplan->subplanType); SLogicSubplan* pNewSubplan = NULL;
SNodeList* pSubplanChildren = info.pSubplan->pChildren;
ESubplanType subplanType = info.pSubplan->subplanType;
int32_t code = splCreateExchangeNodeForSubplan(pCxt, info.pSubplan, info.pQueryRoot, SUBPLAN_TYPE_MODIFY);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = nodesListMakeStrictAppend(&info.pSubplan->pChildren, (SNode*)splCreateScanSubplan(pCxt, info.pQueryRoot, 0)); pNewSubplan = splCreateSubplan(pCxt, info.pQueryRoot, subplanType);
if (NULL == pNewSubplan) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
info.pSubplan->subplanType = SUBPLAN_TYPE_MODIFY; code = nodesListMakeStrictAppend(&info.pSubplan->pChildren, (SNode*)pNewSubplan);
SPLIT_FLAG_SET_MASK(info.pSubplan->splitFlag, SPLIT_FLAG_INSERT_SPLIT);
} }
if (TSDB_CODE_SUCCESS == code) {
code = splMountSubplan(pNewSubplan, pSubplanChildren);
}
SPLIT_FLAG_SET_MASK(info.pSubplan->splitFlag, SPLIT_FLAG_INSERT_SPLIT);
++(pCxt->groupId); ++(pCxt->groupId);
pCxt->split = true; pCxt->split = true;
return code; return code;

View File

@ -80,12 +80,19 @@ typedef struct SQWDebug {
extern SQWDebug gQWDebug; extern SQWDebug gQWDebug;
typedef struct SQWMsgInfo {
int8_t taskType;
int8_t explain;
int8_t needFetch;
} SQWMsgInfo;
typedef struct SQWMsg { typedef struct SQWMsg {
void *node; void *node;
int32_t code; int32_t code;
int32_t msgType; int32_t msgType;
char *msg; char *msg;
int32_t msgLen; int32_t msgLen;
SQWMsgInfo msgInfo;
SRpcHandleInfo connInfo; SRpcHandleInfo connInfo;
} SQWMsg; } SQWMsg;
@ -122,15 +129,18 @@ typedef struct SQWTaskCtx {
int8_t phase; int8_t phase;
int8_t taskType; int8_t taskType;
int8_t explain; int8_t explain;
int8_t needFetch;
int32_t queryType; int32_t queryType;
int32_t fetchType; int32_t fetchType;
int32_t execId; int32_t execId;
bool queryRsped;
bool queryFetched; bool queryFetched;
bool queryEnd; bool queryEnd;
bool queryContinue; bool queryContinue;
bool queryInQueue; bool queryInQueue;
int32_t rspCode; int32_t rspCode;
int64_t affectedRows; // for insert ...select stmt
SRpcHandleInfo ctrlConnInfo; SRpcHandleInfo ctrlConnInfo;
SRpcHandleInfo dataConnInfo; SRpcHandleInfo dataConnInfo;
@ -162,7 +172,7 @@ typedef struct SQWMsgStat {
uint64_t queryProcessed; uint64_t queryProcessed;
uint64_t cqueryProcessed; uint64_t cqueryProcessed;
uint64_t fetchProcessed; uint64_t fetchProcessed;
uint64_t fetchRspProcessed; uint64_t rspProcessed;
uint64_t cancelProcessed; uint64_t cancelProcessed;
uint64_t dropProcessed; uint64_t dropProcessed;
uint64_t hbProcessed; uint64_t hbProcessed;
@ -212,8 +222,8 @@ typedef struct SQWorkerMgmt {
#define QW_STAT_GET(_item) atomic_load_64(&(_item)) #define QW_STAT_GET(_item) atomic_load_64(&(_item))
#define QW_GET_EVENT(ctx, event) atomic_load_8(&(ctx)->events[event]) #define QW_GET_EVENT(ctx, event) atomic_load_8(&(ctx)->events[event])
#define QW_IS_EVENT_RECEIVED(ctx, event) (QW_GET_EVENT(ctx, event) == QW_EVENT_RECEIVED) #define QW_EVENT_RECEIVED(ctx, event) (QW_GET_EVENT(ctx, event) == QW_EVENT_RECEIVED)
#define QW_IS_EVENT_PROCESSED(ctx, event) (QW_GET_EVENT(ctx, event) == QW_EVENT_PROCESSED) #define QW_EVENT_PROCESSED(ctx, event) (QW_GET_EVENT(ctx, event) == QW_EVENT_PROCESSED)
#define QW_SET_EVENT_RECEIVED(ctx, event) atomic_store_8(&(ctx)->events[event], QW_EVENT_RECEIVED) #define QW_SET_EVENT_RECEIVED(ctx, event) atomic_store_8(&(ctx)->events[event], QW_EVENT_RECEIVED)
#define QW_SET_EVENT_PROCESSED(ctx, event) atomic_store_8(&(ctx)->events[event], QW_EVENT_PROCESSED) #define QW_SET_EVENT_PROCESSED(ctx, event) atomic_store_8(&(ctx)->events[event], QW_EVENT_PROCESSED)
@ -222,13 +232,8 @@ typedef struct SQWorkerMgmt {
#define QW_SET_RSP_CODE(ctx, code) atomic_store_32(&(ctx)->rspCode, code) #define QW_SET_RSP_CODE(ctx, code) atomic_store_32(&(ctx)->rspCode, code)
#define QW_UPDATE_RSP_CODE(ctx, code) atomic_val_compare_exchange_32(&(ctx)->rspCode, 0, code) #define QW_UPDATE_RSP_CODE(ctx, code) atomic_val_compare_exchange_32(&(ctx)->rspCode, 0, code)
#define QW_IS_QUERY_RUNNING(ctx) (QW_GET_PHASE(ctx) == QW_PHASE_PRE_QUERY || QW_GET_PHASE(ctx) == QW_PHASE_PRE_CQUERY) #define QW_QUERY_RUNNING(ctx) (QW_GET_PHASE(ctx) == QW_PHASE_PRE_QUERY || QW_GET_PHASE(ctx) == QW_PHASE_PRE_CQUERY)
#define QW_TASK_NOT_EXIST(code) (TSDB_CODE_QRY_SCH_NOT_EXIST == (code) || TSDB_CODE_QRY_TASK_NOT_EXIST == (code))
#define QW_TASK_ALREADY_EXIST(code) (TSDB_CODE_QRY_TASK_ALREADY_EXIST == (code))
#define QW_TASK_READY(status) \
(status == JOB_TASK_STATUS_SUCC || status == JOB_TASK_STATUS_FAIL || status == JOB_TASK_STATUS_CANCELLED || \
status == JOB_TASK_STATUS_PART_SUCC)
#define QW_SET_QTID(id, qId, tId, eId) \ #define QW_SET_QTID(id, qId, tId, eId) \
do { \ do { \
*(uint64_t *)(id) = (qId); \ *(uint64_t *)(id) = (qId); \

View File

@ -25,7 +25,7 @@ extern "C" {
int32_t qwAbortPrerocessQuery(QW_FPARAMS_DEF); int32_t qwAbortPrerocessQuery(QW_FPARAMS_DEF);
int32_t qwPrerocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg); int32_t qwPrerocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg);
int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t explain, const char* sql); int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, const char* sql);
int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg); int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg);
int32_t qwProcessReady(QW_FPARAMS_DEF, SQWMsg *qwMsg); int32_t qwProcessReady(QW_FPARAMS_DEF, SQWMsg *qwMsg);
int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg); int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg);
@ -38,7 +38,7 @@ int32_t qwBuildAndSendCancelRsp(SRpcHandleInfo *pConn, int32_t code);
int32_t qwBuildAndSendFetchRsp(int32_t rspType, SRpcHandleInfo *pConn, SRetrieveTableRsp *pRsp, int32_t dataLength, int32_t code); int32_t qwBuildAndSendFetchRsp(int32_t rspType, SRpcHandleInfo *pConn, SRetrieveTableRsp *pRsp, int32_t dataLength, int32_t code);
void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete); void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete);
int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn); int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn);
int32_t qwBuildAndSendQueryRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code, STbVerInfo* tbInfo); int32_t qwBuildAndSendQueryRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code, SQWTaskCtx *ctx);
int32_t qwBuildAndSendExplainRsp(SRpcHandleInfo *pConn, SExplainExecInfo *execInfo, int32_t num); int32_t qwBuildAndSendExplainRsp(SRpcHandleInfo *pConn, SExplainExecInfo *execInfo, int32_t num);
void qwFreeFetchRsp(void *msg); void qwFreeFetchRsp(void *msg);
int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp); int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp);

View File

@ -43,13 +43,16 @@ void qwFreeFetchRsp(void *msg) {
} }
} }
int32_t qwBuildAndSendQueryRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code, STbVerInfo* tbInfo) { int32_t qwBuildAndSendQueryRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code, SQWTaskCtx *ctx) {
STbVerInfo* tbInfo = ctx ? &ctx->tbInfo : NULL;
int64_t affectedRows = ctx ? ctx->affectedRows : 0;
SQueryTableRsp *pRsp = (SQueryTableRsp *)rpcMallocCont(sizeof(SQueryTableRsp)); SQueryTableRsp *pRsp = (SQueryTableRsp *)rpcMallocCont(sizeof(SQueryTableRsp));
pRsp->code = code; pRsp->code = htonl(code);
pRsp->affectedRows = htobe64(affectedRows);
if (tbInfo) { if (tbInfo) {
strcpy(pRsp->tbFName, tbInfo->tbFName); strcpy(pRsp->tbFName, tbInfo->tbFName);
pRsp->sversion = tbInfo->sversion; pRsp->sversion = htonl(tbInfo->sversion);
pRsp->tversion = tbInfo->tversion; pRsp->tversion = htonl(tbInfo->tversion);
} }
SRpcMsg rpcRsp = { SRpcMsg rpcRsp = {
@ -366,10 +369,14 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int
int32_t eId = msg->execId; int32_t eId = msg->execId;
SQWMsg qwMsg = {.node = node, .msg = msg->msg + msg->sqlLen, .msgLen = msg->phyLen, .connInfo = pMsg->info, .msgType = pMsg->msgType}; SQWMsg qwMsg = {.node = node, .msg = msg->msg + msg->sqlLen, .msgLen = msg->phyLen, .connInfo = pMsg->info, .msgType = pMsg->msgType};
qwMsg.msgInfo.explain = msg->explain;
qwMsg.msgInfo.taskType = msg->taskType;
qwMsg.msgInfo.needFetch = msg->needFetch;
char * sql = strndup(msg->msg, msg->sqlLen); char * sql = strndup(msg->msg, msg->sqlLen);
QW_SCH_TASK_DLOG("processQuery start, node:%p, type:%s, handle:%p, sql:%s", node, TMSG_INFO(pMsg->msgType), pMsg->info.handle, sql); QW_SCH_TASK_DLOG("processQuery start, node:%p, type:%s, handle:%p, sql:%s", node, TMSG_INFO(pMsg->msgType), pMsg->info.handle, sql);
QW_ERR_RET(qwProcessQuery(QW_FPARAMS(), &qwMsg, msg->taskType, msg->explain, sql)); QW_ERR_RET(qwProcessQuery(QW_FPARAMS(), &qwMsg, sql));
QW_SCH_TASK_DLOG("processQuery end, node:%p", node); QW_SCH_TASK_DLOG("processQuery end, node:%p", node);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -447,14 +454,14 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qWorkerProcessFetchRsp(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) { int32_t qWorkerProcessRspMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) {
SQWorker * mgmt = (SQWorker *)qWorkerMgmt; SQWorker * mgmt = (SQWorker *)qWorkerMgmt;
if (mgmt) { if (mgmt) {
qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE); qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE);
QW_STAT_INC(mgmt->stat.msgStat.fetchRspProcessed, 1); QW_STAT_INC(mgmt->stat.msgStat.rspProcessed, 1);
} }
qProcessFetchRsp(NULL, pMsg, NULL); qProcessRspMsg(NULL, pMsg, NULL);
pMsg->pCont = NULL; pMsg->pCont = NULL;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }

View File

@ -57,6 +57,10 @@ int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
connInfo.ahandle = NULL; connInfo.ahandle = NULL;
QW_ERR_RET(qwBuildAndSendExplainRsp(&connInfo, execInfo, resNum)); QW_ERR_RET(qwBuildAndSendExplainRsp(&connInfo, execInfo, resNum));
} }
if (!ctx->needFetch) {
dsGetDataLength(ctx->sinkHandle, &ctx->affectedRows, NULL);
}
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -123,11 +127,11 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) {
break; break;
} }
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_READY) && execNum >= QW_DEFAULT_SHORT_RUN_TIMES) { if (ctx->needFetch && (!ctx->queryRsped) && execNum >= QW_DEFAULT_SHORT_RUN_TIMES) {
break; break;
} }
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) { if (QW_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
break; break;
} }
@ -184,7 +188,7 @@ int32_t qwGenerateSchHbRsp(SQWorker *mgmt, SQWSchStatus *sch, SQWHbInfo *hbInfo)
} }
int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void **rspMsg, SOutputData *pOutput) { int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void **rspMsg, SOutputData *pOutput) {
int32_t len = 0; int64_t len = 0;
SRetrieveTableRsp *rsp = NULL; SRetrieveTableRsp *rsp = NULL;
bool queryEnd = false; bool queryEnd = false;
int32_t code = 0; int32_t code = 0;
@ -243,7 +247,7 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen,
} }
int32_t qwGetDeleteResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SDeleteRes *pRes) { int32_t qwGetDeleteResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SDeleteRes *pRes) {
int32_t len = 0; int64_t len = 0;
bool queryEnd = false; bool queryEnd = false;
int32_t code = 0; int32_t code = 0;
SOutputData output = {0}; SOutputData output = {0};
@ -251,7 +255,7 @@ int32_t qwGetDeleteResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SDeleteRes *pRes
dsGetDataLength(ctx->sinkHandle, &len, &queryEnd); dsGetDataLength(ctx->sinkHandle, &len, &queryEnd);
if (len <= 0 || len != sizeof(SDeleterRes)) { if (len <= 0 || len != sizeof(SDeleterRes)) {
QW_TASK_ELOG("invalid length from dsGetDataLength, length:%d", len); QW_TASK_ELOG("invalid length from dsGetDataLength, length:%" PRId64, len);
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
} }
@ -282,7 +286,6 @@ int32_t qwGetDeleteResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SDeleteRes *pRes
int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQWPhaseOutput *output) { int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQWPhaseOutput *output) {
int32_t code = 0; int32_t code = 0;
SQWTaskCtx *ctx = NULL; SQWTaskCtx *ctx = NULL;
SRpcHandleInfo *cancelConnection = NULL;
QW_TASK_DLOG("start to handle event at phase %s", qwPhaseStr(phase)); QW_TASK_DLOG("start to handle event at phase %s", qwPhaseStr(phase));
@ -303,13 +306,13 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
switch (phase) { switch (phase) {
case QW_PHASE_PRE_QUERY: { case QW_PHASE_PRE_QUERY: {
if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) { if (QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
QW_TASK_ELOG("task already dropped at wrong phase %s", qwPhaseStr(phase)); QW_TASK_ELOG("task already dropped at wrong phase %s", qwPhaseStr(phase));
QW_ERR_JRET(TSDB_CODE_QRY_TASK_STATUS_ERROR); QW_ERR_JRET(TSDB_CODE_QRY_TASK_STATUS_ERROR);
break; break;
} }
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) { if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
QW_ERR_JRET(qwDropTask(QW_FPARAMS())); QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
//qwBuildAndSendDropRsp(&ctx->ctrlConnInfo, code); //qwBuildAndSendDropRsp(&ctx->ctrlConnInfo, code);
@ -323,29 +326,29 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
break; break;
} }
case QW_PHASE_PRE_FETCH: { case QW_PHASE_PRE_FETCH: {
if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP) || QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) { if (QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP) || QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
QW_TASK_WLOG("task dropping or already dropped, phase:%s", qwPhaseStr(phase)); QW_TASK_WLOG("task dropping or already dropped, phase:%s", qwPhaseStr(phase));
QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED); QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
} }
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) { if (QW_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
QW_TASK_WLOG("last fetch still not processed, phase:%s", qwPhaseStr(phase)); QW_TASK_WLOG("last fetch still not processed, phase:%s", qwPhaseStr(phase));
QW_ERR_JRET(TSDB_CODE_QRY_DUPLICATTED_OPERATION); QW_ERR_JRET(TSDB_CODE_QRY_DUPLICATTED_OPERATION);
} }
if (!QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_READY)) { if (!ctx->queryRsped) {
QW_TASK_ELOG("ready msg has not been processed, phase:%s", qwPhaseStr(phase)); QW_TASK_ELOG("ready msg has not been processed, phase:%s", qwPhaseStr(phase));
QW_ERR_JRET(TSDB_CODE_QRY_TASK_MSG_ERROR); QW_ERR_JRET(TSDB_CODE_QRY_TASK_MSG_ERROR);
} }
break; break;
} }
case QW_PHASE_PRE_CQUERY: { case QW_PHASE_PRE_CQUERY: {
if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) { if (QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
QW_TASK_WLOG("task already dropped, phase:%s", qwPhaseStr(phase)); QW_TASK_WLOG("task already dropped, phase:%s", qwPhaseStr(phase));
QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED); QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
} }
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) { if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
QW_ERR_JRET(qwDropTask(QW_FPARAMS())); QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
//qwBuildAndSendDropRsp(&ctx->ctrlConnInfo, code); //qwBuildAndSendDropRsp(&ctx->ctrlConnInfo, code);
@ -374,11 +377,6 @@ _return:
qwReleaseTaskCtx(mgmt, ctx); qwReleaseTaskCtx(mgmt, ctx);
} }
if (cancelConnection) {
qwBuildAndSendCancelRsp(cancelConnection, code);
QW_TASK_DLOG("cancel rsp send, handle:%p, code:%x - %s", cancelConnection->handle, code, tstrerror(code));
}
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
QW_TASK_ELOG("end to handle event at phase %s, code:%s", qwPhaseStr(phase), tstrerror(code)); QW_TASK_ELOG("end to handle event at phase %s, code:%s", qwPhaseStr(phase), tstrerror(code));
} else { } else {
@ -400,7 +398,7 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp
QW_LOCK(QW_WRITE, &ctx->lock); QW_LOCK(QW_WRITE, &ctx->lock);
if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) { if (QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
QW_TASK_WLOG("task already dropped, phase:%s", qwPhaseStr(phase)); QW_TASK_WLOG("task already dropped, phase:%s", qwPhaseStr(phase));
QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED); QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
} }
@ -409,10 +407,10 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp
connInfo = ctx->ctrlConnInfo; connInfo = ctx->ctrlConnInfo;
rspConnection = &connInfo; rspConnection = &connInfo;
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_READY); ctx->queryRsped = true;
} }
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) { if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
if (QW_PHASE_POST_FETCH == phase) { if (QW_PHASE_POST_FETCH == phase) {
QW_TASK_WLOG("drop received at wrong phase %s", qwPhaseStr(phase)); QW_TASK_WLOG("drop received at wrong phase %s", qwPhaseStr(phase));
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
@ -440,7 +438,7 @@ _return:
} }
if (rspConnection) { if (rspConnection) {
qwBuildAndSendQueryRsp(input->msgType + 1, rspConnection, code, ctx ? &ctx->tbInfo : NULL); qwBuildAndSendQueryRsp(input->msgType + 1, rspConnection, code, ctx);
QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", rspConnection->handle, code, tstrerror(code)); QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", rspConnection->handle, code, tstrerror(code));
} }
@ -501,7 +499,7 @@ _return:
} }
int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t explain, const char* sql) { int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, const char* sql) {
int32_t code = 0; int32_t code = 0;
bool queryRsped = false; bool queryRsped = false;
SSubplan *plan = NULL; SSubplan *plan = NULL;
@ -514,8 +512,9 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t ex
QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx)); QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
ctx->taskType = taskType; ctx->taskType = qwMsg->msgInfo.taskType;
ctx->explain = explain; ctx->explain = qwMsg->msgInfo.explain;
ctx->needFetch = qwMsg->msgInfo.needFetch;
ctx->queryType = qwMsg->msgType; ctx->queryType = qwMsg->msgType;
QW_TASK_DLOGL("subplan json string, len:%d, %s", qwMsg->msgLen, qwMsg->msg); QW_TASK_DLOGL("subplan json string, len:%d, %s", qwMsg->msgLen, qwMsg->msg);
@ -585,7 +584,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, &queryEnd)); QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, &queryEnd));
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) { if (QW_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
SOutputData sOutput = {0}; SOutputData sOutput = {0};
QW_ERR_JRET(qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput)); QW_ERR_JRET(qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput));
@ -622,7 +621,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
break; break;
} }
if (code && QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) { if (code && QW_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH); QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH);
qwFreeFetchRsp(rsp); qwFreeFetchRsp(rsp);
rsp = NULL; rsp = NULL;
@ -686,7 +685,7 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
locked = true; locked = true;
// RC WARNING // RC WARNING
if (QW_IS_QUERY_RUNNING(ctx)) { if (QW_QUERY_RUNNING(ctx)) {
atomic_store_8((int8_t *)&ctx->queryContinue, 1); atomic_store_8((int8_t *)&ctx->queryContinue, 1);
} else if (0 == atomic_load_8((int8_t *)&ctx->queryInQueue)) { } else if (0 == atomic_load_8((int8_t *)&ctx->queryInQueue)) {
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXEC); qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXEC);
@ -714,7 +713,7 @@ _return:
if (code || rsp) { if (code || rsp) {
qwBuildAndSendFetchRsp(qwMsg->msgType + 1, &qwMsg->connInfo, rsp, dataLen, code); qwBuildAndSendFetchRsp(qwMsg->msgType + 1, &qwMsg->connInfo, rsp, dataLen, code);
QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code), QW_TASK_DLOG("%s send, handle:%p, code:%x - %s, dataLen:%d", TMSG_INFO(qwMsg->msgType + 1), qwMsg->connInfo.handle, code, tstrerror(code),
dataLen); dataLen);
} }
@ -733,12 +732,12 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
locked = true; locked = true;
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) { if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
QW_TASK_WLOG_E("task already dropping"); QW_TASK_WLOG_E("task already dropping");
QW_ERR_JRET(TSDB_CODE_QRY_DUPLICATTED_OPERATION); QW_ERR_JRET(TSDB_CODE_QRY_DUPLICATTED_OPERATION);
} }
if (QW_IS_QUERY_RUNNING(ctx)) { if (QW_QUERY_RUNNING(ctx)) {
QW_ERR_JRET(qwKillTaskHandle(QW_FPARAMS(), ctx)); QW_ERR_JRET(qwKillTaskHandle(QW_FPARAMS(), ctx));
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROP); qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROP);
} else if (ctx->phase > 0) { } else if (ctx->phase > 0) {

View File

@ -332,7 +332,7 @@ void qwtEndPut(DataSinkHandle handle, uint64_t useconds) {
qwtTestSinkQueryEnd = true; qwtTestSinkQueryEnd = true;
} }
void qwtGetDataLength(DataSinkHandle handle, int32_t* pLen, bool* pQueryEnd) { void qwtGetDataLength(DataSinkHandle handle, int64_t* pLen, bool* pQueryEnd) {
static int32_t in = 0; static int32_t in = 0;
if (in > 0) { if (in > 0) {

View File

@ -296,8 +296,8 @@ extern SSchedulerMgmt schMgmt;
#define SCH_TASK_ID(_task) ((_task) ? (_task)->taskId : -1) #define SCH_TASK_ID(_task) ((_task) ? (_task)->taskId : -1)
#define SCH_TASK_EID(_task) ((_task) ? (_task)->execId : -1) #define SCH_TASK_EID(_task) ((_task) ? (_task)->execId : -1)
#define SCH_IS_DATA_SRC_QRY_TASK(task) ((task)->plan->subplanType == SUBPLAN_TYPE_SCAN) #define SCH_IS_DATA_BIND_QRY_TASK(task) ((task)->plan->subplanType == SUBPLAN_TYPE_SCAN)
#define SCH_IS_DATA_SRC_TASK(task) (((task)->plan->subplanType == SUBPLAN_TYPE_SCAN) || ((task)->plan->subplanType == SUBPLAN_TYPE_MODIFY)) #define SCH_IS_DATA_BIND_TASK(task) (((task)->plan->subplanType == SUBPLAN_TYPE_SCAN) || ((task)->plan->subplanType == SUBPLAN_TYPE_MODIFY))
#define SCH_IS_LEAF_TASK(_job, _task) (((_task)->level->level + 1) == (_job)->levelNum) #define SCH_IS_LEAF_TASK(_job, _task) (((_task)->level->level + 1) == (_job)->levelNum)
#define SCH_SET_TASK_STATUS(task, st) atomic_store_8(&(task)->status, st) #define SCH_SET_TASK_STATUS(task, st) atomic_store_8(&(task)->status, st)
@ -317,8 +317,9 @@ extern SSchedulerMgmt schMgmt;
#define SCH_SET_JOB_NEED_FLOW_CTRL(_job) (_job)->attr.needFlowCtrl = true #define SCH_SET_JOB_NEED_FLOW_CTRL(_job) (_job)->attr.needFlowCtrl = true
#define SCH_JOB_NEED_FLOW_CTRL(_job) ((_job)->attr.needFlowCtrl) #define SCH_JOB_NEED_FLOW_CTRL(_job) ((_job)->attr.needFlowCtrl)
#define SCH_TASK_NEED_FLOW_CTRL(_job, _task) (SCH_IS_DATA_SRC_QRY_TASK(_task) && SCH_JOB_NEED_FLOW_CTRL(_job) && SCH_IS_LEVEL_UNFINISHED((_task)->level)) #define SCH_TASK_NEED_FLOW_CTRL(_job, _task) (SCH_IS_DATA_BIND_QRY_TASK(_task) && SCH_JOB_NEED_FLOW_CTRL(_job) && SCH_IS_LEVEL_UNFINISHED((_task)->level))
#define SCH_FETCH_TYPE(_pSrcTask) (SCH_IS_DATA_SRC_QRY_TASK(_pSrcTask) ? TDMT_SCH_FETCH : TDMT_SCH_MERGE_FETCH) #define SCH_FETCH_TYPE(_pSrcTask) (SCH_IS_DATA_BIND_QRY_TASK(_pSrcTask) ? TDMT_SCH_FETCH : TDMT_SCH_MERGE_FETCH)
#define SCH_TASK_NEED_FETCH(_task) ((_task)->plan->subplanType != SUBPLAN_TYPE_MODIFY)
#define SCH_SET_JOB_TYPE(_job, type) do { if ((type) != SUBPLAN_TYPE_MODIFY) { (_job)->attr.queryJob = true; } } while (0) #define SCH_SET_JOB_TYPE(_job, type) do { if ((type) != SUBPLAN_TYPE_MODIFY) { (_job)->attr.queryJob = true; } } while (0)
#define SCH_IS_QUERY_JOB(_job) ((_job)->attr.queryJob) #define SCH_IS_QUERY_JOB(_job) ((_job)->attr.queryJob)

View File

@ -247,7 +247,7 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
int32_t schAppendJobDataSrc(SSchJob *pJob, SSchTask *pTask) { int32_t schAppendJobDataSrc(SSchJob *pJob, SSchTask *pTask) {
if (!SCH_IS_DATA_SRC_QRY_TASK(pTask)) { if (!SCH_IS_DATA_BIND_QRY_TASK(pTask)) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -879,7 +879,7 @@ int32_t schProcessOnCbBegin(SSchJob** job, SSchTask** task, uint64_t qId, int64_
} }
if (schJobNeedToStop(pJob, &status)) { if (schJobNeedToStop(pJob, &status)) {
SCH_TASK_ELOG("will not do further processing cause of job status %s", jobTaskStatusStr(status)); SCH_TASK_DLOG("will not do further processing cause of job status %s", jobTaskStatusStr(status));
SCH_ERR_JRET(TSDB_CODE_SCH_IGNORE_ERROR); SCH_ERR_JRET(TSDB_CODE_SCH_IGNORE_ERROR);
} }

View File

@ -21,44 +21,30 @@
#include "tref.h" #include "tref.h"
#include "trpc.h" #include "trpc.h"
int32_t schValidateReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgType) { int32_t schValidateRspMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgType) {
int32_t lastMsgType = pTask->lastMsgType; int32_t lastMsgType = pTask->lastMsgType;
int32_t taskStatus = SCH_GET_TASK_STATUS(pTask); int32_t taskStatus = SCH_GET_TASK_STATUS(pTask);
int32_t reqMsgType = msgType - 1; int32_t reqMsgType = (msgType & 1U) ? msgType : (msgType - 1);
switch (msgType) { switch (msgType) {
case TDMT_SCH_LINK_BROKEN: case TDMT_SCH_LINK_BROKEN:
case TDMT_SCH_EXPLAIN_RSP: case TDMT_SCH_EXPLAIN_RSP:
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
case TDMT_SCH_MERGE_QUERY_RSP:
case TDMT_SCH_QUERY_RSP: // query_rsp may be processed later than ready_rsp
if (lastMsgType != reqMsgType && -1 != lastMsgType) {
SCH_TASK_DLOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s", TMSG_INFO(lastMsgType),
TMSG_INFO(msgType));
}
if (taskStatus != JOB_TASK_STATUS_EXEC && taskStatus != JOB_TASK_STATUS_PART_SUCC) {
SCH_TASK_DLOG("rsp msg conflicted with task status, status:%s, rspType:%s", jobTaskStatusStr(taskStatus),
TMSG_INFO(msgType));
}
// SCH_SET_TASK_LASTMSG_TYPE(pTask, -1);
return TSDB_CODE_SUCCESS;
case TDMT_SCH_FETCH_RSP: case TDMT_SCH_FETCH_RSP:
case TDMT_SCH_MERGE_FETCH_RSP: case TDMT_SCH_MERGE_FETCH_RSP:
if (lastMsgType != reqMsgType && -1 != lastMsgType) { if (lastMsgType != reqMsgType) {
SCH_TASK_ELOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s", TMSG_INFO(lastMsgType), SCH_TASK_ELOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s", TMSG_INFO(lastMsgType),
TMSG_INFO(msgType)); TMSG_INFO(msgType));
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
} }
if (taskStatus != JOB_TASK_STATUS_PART_SUCC) {
if (taskStatus != JOB_TASK_STATUS_EXEC && taskStatus != JOB_TASK_STATUS_PART_SUCC) {
SCH_TASK_ELOG("rsp msg conflicted with task status, status:%s, rspType:%s", jobTaskStatusStr(taskStatus), SCH_TASK_ELOG("rsp msg conflicted with task status, status:%s, rspType:%s", jobTaskStatusStr(taskStatus),
TMSG_INFO(msgType)); TMSG_INFO(msgType));
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
} }
// SCH_SET_TASK_LASTMSG_TYPE(pTask, -1);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
case TDMT_SCH_MERGE_QUERY_RSP:
case TDMT_SCH_QUERY_RSP:
case TDMT_VND_CREATE_TABLE_RSP: case TDMT_VND_CREATE_TABLE_RSP:
case TDMT_VND_DROP_TABLE_RSP: case TDMT_VND_DROP_TABLE_RSP:
case TDMT_VND_ALTER_TABLE_RSP: case TDMT_VND_ALTER_TABLE_RSP:
@ -77,14 +63,12 @@ int32_t schValidateReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgTy
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
} }
if (taskStatus != JOB_TASK_STATUS_EXEC && taskStatus != JOB_TASK_STATUS_PART_SUCC) { if (taskStatus != JOB_TASK_STATUS_EXEC) {
SCH_TASK_ELOG("rsp msg conflicted with task status, status:%s, rspType:%s", jobTaskStatusStr(taskStatus), SCH_TASK_ELOG("rsp msg conflicted with task status, status:%s, rspType:%s", jobTaskStatusStr(taskStatus),
TMSG_INFO(msgType)); TMSG_INFO(msgType));
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
} }
// SCH_SET_TASK_LASTMSG_TYPE(pTask, -1);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -98,7 +82,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa
bool dropExecNode = (msgType == TDMT_SCH_LINK_BROKEN || SCH_NETWORK_ERR(rspCode)); bool dropExecNode = (msgType == TDMT_SCH_LINK_BROKEN || SCH_NETWORK_ERR(rspCode));
SCH_ERR_JRET(schUpdateTaskHandle(pJob, pTask, dropExecNode, pMsg->handle, execId)); SCH_ERR_JRET(schUpdateTaskHandle(pJob, pTask, dropExecNode, pMsg->handle, execId));
SCH_ERR_JRET(schValidateReceivedMsgType(pJob, pTask, msgType)); SCH_ERR_JRET(schValidateRspMsgType(pJob, pTask, msgType));
int32_t reqType = IsReq(pMsg) ? pMsg->msgType : (pMsg->msgType - 1); int32_t reqType = IsReq(pMsg) ? pMsg->msgType : (pMsg->msgType - 1);
if (SCH_NEED_REDIRECT(reqType, rspCode, pMsg->len)) { if (SCH_NEED_REDIRECT(reqType, rspCode, pMsg->len)) {
@ -259,18 +243,25 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa
} }
case TDMT_SCH_QUERY_RSP: case TDMT_SCH_QUERY_RSP:
case TDMT_SCH_MERGE_QUERY_RSP: { case TDMT_SCH_MERGE_QUERY_RSP: {
SQueryTableRsp *rsp = (SQueryTableRsp *)msg;
SCH_ERR_JRET(rspCode); SCH_ERR_JRET(rspCode);
if (NULL == msg) { if (NULL == msg) {
SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
} }
SQueryTableRsp *rsp = (SQueryTableRsp *)msg;
rsp->code = ntohl(rsp->code);
rsp->sversion = ntohl(rsp->sversion);
rsp->tversion = ntohl(rsp->tversion);
rsp->affectedRows = be64toh(rsp->affectedRows);
SCH_ERR_JRET(rsp->code); SCH_ERR_JRET(rsp->code);
SCH_ERR_JRET(schSaveJobQueryRes(pJob, rsp)); SCH_ERR_JRET(schSaveJobQueryRes(pJob, rsp));
taosMemoryFreeClear(msg); atomic_add_fetch_32(&pJob->resNumOfRows, rsp->affectedRows);
taosMemoryFreeClear(msg);
SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask)); SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
break; break;
@ -1010,6 +1001,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
pMsg->execId = htonl(pTask->execId); pMsg->execId = htonl(pTask->execId);
pMsg->taskType = TASK_TYPE_TEMP; pMsg->taskType = TASK_TYPE_TEMP;
pMsg->explain = SCH_IS_EXPLAIN_JOB(pJob); pMsg->explain = SCH_IS_EXPLAIN_JOB(pJob);
pMsg->needFetch = SCH_TASK_NEED_FETCH(pTask);
pMsg->phyLen = htonl(pTask->msgLen); pMsg->phyLen = htonl(pTask->msgLen);
pMsg->sqlLen = htonl(len); pMsg->sqlLen = htonl(len);

View File

@ -276,7 +276,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
} }
int32_t schRescheduleTask(SSchJob *pJob, SSchTask *pTask) { int32_t schRescheduleTask(SSchJob *pJob, SSchTask *pTask) {
if (SCH_IS_DATA_SRC_QRY_TASK(pTask)) { if (SCH_IS_DATA_BIND_TASK(pTask)) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -313,7 +313,7 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf* pData, int32
pTask->lastMsgType = 0; pTask->lastMsgType = 0;
memset(&pTask->succeedAddr, 0, sizeof(pTask->succeedAddr)); memset(&pTask->succeedAddr, 0, sizeof(pTask->succeedAddr));
if (SCH_IS_DATA_SRC_QRY_TASK(pTask)) { if (SCH_IS_DATA_BIND_TASK(pTask)) {
if (pData) { if (pData) {
SCH_ERR_JRET(schUpdateTaskCandidateAddr(pJob, pTask, pData->pEpSet)); SCH_ERR_JRET(schUpdateTaskCandidateAddr(pJob, pTask, pData->pEpSet));
} }
@ -358,7 +358,7 @@ _return:
int32_t schHandleRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf* pData, int32_t rspCode) { int32_t schHandleRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf* pData, int32_t rspCode) {
int32_t code = 0; int32_t code = 0;
if (SCH_IS_DATA_SRC_QRY_TASK(pTask)) { if (SCH_IS_DATA_BIND_TASK(pTask)) {
if (NULL == pData->pEpSet) { if (NULL == pData->pEpSet) {
SCH_TASK_ELOG("no epset updated while got error %s", tstrerror(rspCode)); SCH_TASK_ELOG("no epset updated while got error %s", tstrerror(rspCode));
SCH_ERR_JRET(rspCode); SCH_ERR_JRET(rspCode);
@ -492,7 +492,7 @@ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bo
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (SCH_IS_DATA_SRC_TASK(pTask)) { if (SCH_IS_DATA_BIND_TASK(pTask)) {
if ((pTask->execId + 1) >= SCH_TASK_NUM_OF_EPS(&pTask->plan->execNode)) { if ((pTask->execId + 1) >= SCH_TASK_NUM_OF_EPS(&pTask->plan->execNode)) {
*needRetry = false; *needRetry = false;
SCH_TASK_DLOG("task no more retry since all ep tried, execId:%d, epNum:%d", pTask->execId, SCH_TASK_DLOG("task no more retry since all ep tried, execId:%d, epNum:%d", pTask->execId,
@ -528,7 +528,7 @@ int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) {
schDeregisterTaskHb(pJob, pTask); schDeregisterTaskHb(pJob, pTask);
if (SCH_IS_DATA_SRC_TASK(pTask)) { if (SCH_IS_DATA_BIND_TASK(pTask)) {
SCH_SWITCH_EPSET(&pTask->plan->execNode); SCH_SWITCH_EPSET(&pTask->plan->execNode);
} else { } else {
int32_t candidateNum = taosArrayGetSize(pTask->candidateAddrs); int32_t candidateNum = taosArrayGetSize(pTask->candidateAddrs);
@ -596,7 +596,7 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (SCH_IS_DATA_SRC_QRY_TASK(pTask)) { if (SCH_IS_DATA_BIND_TASK(pTask)) {
SCH_TASK_ELOG("no execNode specifed for data src task, numOfEps:%d", pTask->plan->execNode.epSet.numOfEps); SCH_TASK_ELOG("no execNode specifed for data src task, numOfEps:%d", pTask->plan->execNode.epSet.numOfEps);
SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR); SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
} }

View File

@ -390,8 +390,9 @@ static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) {
pHead->traceId = pMsg->info.traceId; pHead->traceId = pMsg->info.traceId;
pHead->hasEpSet = pMsg->info.hasEpSet; pHead->hasEpSet = pMsg->info.hasEpSet;
if (pConn->status == ConnNormal) { if (pConn->status == ConnNormal) {
pHead->msgType = pConn->inType + 1; pHead->msgType = (0 == pMsg->msgType ? pConn->inType + 1 : pMsg->msgType);
} else { } else {
if (smsg->type == Release) { if (smsg->type == Release) {
pHead->msgType = 0; pHead->msgType = 0;
@ -400,11 +401,8 @@ static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) {
destroyConnRegArg(pConn); destroyConnRegArg(pConn);
transUnrefSrvHandle(pConn); transUnrefSrvHandle(pConn);
} else { } else {
pHead->msgType = pMsg->msgType;
// set up resp msg type // set up resp msg type
if (pHead->msgType == 0 && transMsgLenFromCont(pMsg->contLen) == sizeof(STransMsgHead)) { pHead->msgType = (0 == pMsg->msgType ? pConn->inType + 1 : pMsg->msgType);
pHead->msgType = pConn->inType + 1;
}
} }
} }

View File

@ -0,0 +1,45 @@
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sql connect
print ======== step1
sql drop database if exists db1;
sql create database db1 vgroups 3;
sql use db1;
sql create stable st1 (ts timestamp, f1 int, f2 binary(200)) tags(t1 int);
sql create table tb1 using st1 tags(1);
sql insert into tb1 values ('2022-07-07 10:01:01', 11, "aaa");
sql insert into tb1 values ('2022-07-07 11:01:02', 12, "bbb");
sql create table tb2 using st1 tags(2);
sql insert into tb2 values ('2022-07-07 10:02:01', 21, "aaa");
sql insert into tb2 values ('2022-07-07 11:02:02', 22, "bbb");
sql create table tb3 using st1 tags(3);
sql insert into tb3 values ('2022-07-07 10:03:01', 31, "aaa");
sql insert into tb3 values ('2022-07-07 11:03:02', 32, "bbb");
sql create table tb4 using st1 tags(4);
sql insert into tb4 select * from tb1;
sql select * from tb4;
if $rows != 2 then
return -1
endi
sql insert into tb4 select ts,f1,f2 from st1;
sql select * from tb4;
if $rows != 6 then
return -1
endi
sql create table tba (ts timestamp, f1 binary(10), f2 bigint, f3 double);
sql_error insert into tba select * from tb1;
sql insert into tba (ts,f2,f1) select * from tb1;
sql select * from tba;
if $rows != 2 then
return -1
endi
sql create table tbb (ts timestamp, f1 binary(10), f2 bigint, f3 double);
sql insert into tbb (f2,f1,ts) select f1+1,f2,ts+3 from tb2;
sql select * from tbb;
if $rows != 2 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -163,8 +163,8 @@
# --- sma # --- sma
./test.sh -f tsim/sma/drop_sma.sim ./test.sh -f tsim/sma/drop_sma.sim
./test.sh -f tsim/sma/tsmaCreateInsertQuery.sim ./test.sh -f tsim/sma/tsmaCreateInsertQuery.sim
./test.sh -f tsim/sma/rsmaCreateInsertQuery.sim #./test.sh -f tsim/sma/rsmaCreateInsertQuery.sim
./test.sh -f tsim/sma/rsmaPersistenceRecovery.sim #./test.sh -f tsim/sma/rsmaPersistenceRecovery.sim
# --- valgrind # --- valgrind
./test.sh -f tsim/valgrind/checkError1.sim ./test.sh -f tsim/valgrind/checkError1.sim