more code
This commit is contained in:
parent
52a04cd008
commit
ee4dda56c1
|
@ -557,13 +557,10 @@ void tRowDestroy(SRow *pRow) {
|
|||
}
|
||||
|
||||
static int32_t tRowPCmprFn(const void *p1, const void *p2) {
|
||||
if ((*(SRow **)p1)->ts < (*(SRow **)p2)->ts) {
|
||||
return -1;
|
||||
} else if ((*(SRow **)p1)->ts > (*(SRow **)p2)->ts) {
|
||||
return 1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
SRowKey key1, key2;
|
||||
tRowGetKey(*(SRow **)p1, &key1);
|
||||
tRowGetKey(*(SRow **)p2, &key2);
|
||||
return tRowKeyCompare(&key1, &key2);
|
||||
}
|
||||
static void tRowPDestroy(SRow **ppRow) { tRowDestroy(*ppRow); }
|
||||
static int32_t tRowMergeImpl(SArray *aRowP, STSchema *pTSchema, int32_t iStart, int32_t iEnd, int8_t flag) {
|
||||
|
@ -645,13 +642,18 @@ int32_t tRowMerge(SArray *aRowP, STSchema *pTSchema, int8_t flag) {
|
|||
|
||||
int32_t iStart = 0;
|
||||
while (iStart < aRowP->size) {
|
||||
SRow *pRow = (SRow *)taosArrayGetP(aRowP, iStart);
|
||||
SRowKey key1;
|
||||
SRow *row1 = (SRow *)taosArrayGetP(aRowP, iStart);
|
||||
|
||||
tRowGetKey(row1, &key1);
|
||||
|
||||
int32_t iEnd = iStart + 1;
|
||||
while (iEnd < aRowP->size) {
|
||||
SRow *pRowT = (SRow *)taosArrayGetP(aRowP, iEnd);
|
||||
SRowKey key2;
|
||||
SRow *row2 = (SRow *)taosArrayGetP(aRowP, iEnd);
|
||||
tRowGetKey(row2, &key2);
|
||||
|
||||
if (pRow->ts != pRowT->ts) break;
|
||||
if (tRowKeyCompare(&key1, &key2) != 0) break;
|
||||
|
||||
iEnd++;
|
||||
}
|
||||
|
|
|
@ -32,14 +32,14 @@ static int32_t vnodeProcessAlterTbReq(SVnode *pVnode, int64_t ver, void *pReq, i
|
|||
static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp,
|
||||
SRpcMsg *pOriginRpc);
|
||||
static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp,
|
||||
SRpcMsg *pOriginalMsg);
|
||||
SRpcMsg *pOriginalMsg);
|
||||
static int32_t vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||
static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||
static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||
static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||
static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||
static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp,
|
||||
SRpcMsg *pOriginalMsg);
|
||||
SRpcMsg *pOriginalMsg);
|
||||
static int32_t vnodeProcessBatchDeleteReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||
static int32_t vnodeProcessCreateIndexReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||
static int32_t vnodeProcessDropIndexReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||
|
@ -1516,7 +1516,7 @@ static int32_t vnodeRebuildSubmitReqMsg(SSubmitReq2 *pSubmitReq, void **ppMsg) {
|
|||
}
|
||||
|
||||
static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp,
|
||||
SRpcMsg *pOriginalMsg) {
|
||||
SRpcMsg *pOriginalMsg) {
|
||||
int32_t code = 0;
|
||||
terrno = 0;
|
||||
|
||||
|
@ -1586,12 +1586,26 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t ver, void *pReq, in
|
|||
} else {
|
||||
int32_t nRow = TARRAY_SIZE(pSubmitTbData->aRowP);
|
||||
SRow **aRow = (SRow **)TARRAY_DATA(pSubmitTbData->aRowP);
|
||||
SRowKey lastRowKey;
|
||||
for (int32_t iRow = 0; iRow < nRow; ++iRow) {
|
||||
if (aRow[iRow]->ts < minKey || aRow[iRow]->ts > maxKey || (iRow > 0 && aRow[iRow]->ts <= aRow[iRow - 1]->ts)) {
|
||||
if (aRow[iRow]->ts < minKey || aRow[iRow]->ts > maxKey) {
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
vError("vgId:%d %s failed since %s, version:%" PRId64, TD_VID(pVnode), __func__, tstrerror(code), ver);
|
||||
goto _exit;
|
||||
}
|
||||
if (iRow == 0) {
|
||||
tRowGetKey(aRow[iRow], &lastRowKey);
|
||||
} else {
|
||||
SRowKey rowKey;
|
||||
tRowGetKey(aRow[iRow], &rowKey);
|
||||
|
||||
if (tRowKeyCompare(&lastRowKey, &rowKey) >= 0) {
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
vError("vgId:%d %s failed since %s, version:%" PRId64, TD_VID(pVnode), __func__, tstrerror(code), ver);
|
||||
goto _exit;
|
||||
}
|
||||
lastRowKey = rowKey;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1735,10 +1749,14 @@ _exit:
|
|||
atomic_add_fetch_64(&pVnode->statis.nInsertSuccess, pSubmitRsp->affectedRows);
|
||||
atomic_add_fetch_64(&pVnode->statis.nBatchInsert, 1);
|
||||
|
||||
if(tsEnableMonitor && pSubmitRsp->affectedRows > 0 && strlen(pOriginalMsg->info.conn.user) > 0){
|
||||
const char *sample_labels[] = {VNODE_METRIC_TAG_VALUE_INSERT_AFFECTED_ROWS, pVnode->monitor.strClusterId,
|
||||
pVnode->monitor.strDnodeId, tsLocalEp, pVnode->monitor.strVgId,
|
||||
pOriginalMsg->info.conn.user, "Success"};
|
||||
if (tsEnableMonitor && pSubmitRsp->affectedRows > 0 && strlen(pOriginalMsg->info.conn.user) > 0) {
|
||||
const char *sample_labels[] = {VNODE_METRIC_TAG_VALUE_INSERT_AFFECTED_ROWS,
|
||||
pVnode->monitor.strClusterId,
|
||||
pVnode->monitor.strDnodeId,
|
||||
tsLocalEp,
|
||||
pVnode->monitor.strVgId,
|
||||
pOriginalMsg->info.conn.user,
|
||||
"Success"};
|
||||
taos_counter_add(pVnode->monitor.insertCounter, pSubmitRsp->affectedRows, sample_labels);
|
||||
}
|
||||
|
||||
|
@ -2019,7 +2037,7 @@ static int32_t vnodeProcessBatchDeleteReq(SVnode *pVnode, int64_t ver, void *pRe
|
|||
}
|
||||
|
||||
static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp,
|
||||
SRpcMsg *pOriginalMsg) {
|
||||
SRpcMsg *pOriginalMsg) {
|
||||
int32_t code = 0;
|
||||
SDecoder *pCoder = &(SDecoder){0};
|
||||
SDeleteRes *pRes = &(SDeleteRes){0};
|
||||
|
|
Loading…
Reference in New Issue