Merge remote-tracking branch 'origin/refact/submit_req' into refact/submit_req
This commit is contained in:
commit
b1d4993a0e
|
@ -2216,13 +2216,18 @@ static int32_t tColDataSwapValue(SColData *pColData, int32_t i, int32_t j) {
|
|||
if (nData1 > nData2) {
|
||||
memcpy(pData, pColData->pData + pColData->aOffset[i], nData1);
|
||||
memcpy(pColData->pData + pColData->aOffset[i], pColData->pData + pColData->aOffset[j], nData2);
|
||||
memmove(pColData->pData + pColData->aOffset[i] + nData2, pColData->pData + pColData->aOffset[i] + nData1,
|
||||
// memmove(pColData->pData + pColData->aOffset[i] + nData2, pColData->pData + pColData->aOffset[i] + nData1,
|
||||
// pColData->aOffset[j] - pColData->aOffset[i + 1]);
|
||||
memmove(pColData->pData + pColData->aOffset[i] + nData2, pColData->pData + pColData->aOffset[i + 1],
|
||||
pColData->aOffset[j] - pColData->aOffset[i + 1]);
|
||||
memcpy(pColData->pData + pColData->aOffset[j] + nData2 - nData1, pData, nData1);
|
||||
} else {
|
||||
memcpy(pData, pColData->pData + pColData->aOffset[j], nData2);
|
||||
memcpy(pColData->pData + pColData->aOffset[j] + nData2 - nData1, pColData->pData + pColData->aOffset[i], nData1);
|
||||
memmove(pColData->pData + pColData->aOffset[j] + nData2 - nData1, pColData->pData + pColData->aOffset[i] + nData1,
|
||||
// memmove(pColData->pData + pColData->aOffset[j] + nData2 - nData1, pColData->pData + pColData->aOffset[i] +
|
||||
// nData1,
|
||||
// pColData->aOffset[j] - pColData->aOffset[i + 1]);
|
||||
memmove(pColData->pData + pColData->aOffset[i] + nData2, pColData->pData + pColData->aOffset[i + 1],
|
||||
pColData->aOffset[j] - pColData->aOffset[i + 1]);
|
||||
memcpy(pColData->pData + pColData->aOffset[i], pData, nData2);
|
||||
}
|
||||
|
@ -2242,6 +2247,7 @@ static int32_t tColDataSwapValue(SColData *pColData, int32_t i, int32_t j) {
|
|||
_exit:
|
||||
return code;
|
||||
}
|
||||
|
||||
static void tColDataSwap(SColData *pColData, int32_t i, int32_t j) {
|
||||
ASSERT(i < j);
|
||||
ASSERT(j < pColData->nVal);
|
||||
|
@ -2277,9 +2283,173 @@ static void tColDataSwap(SColData *pColData, int32_t i, int32_t j) {
|
|||
}
|
||||
}
|
||||
|
||||
static void tColDataSort(SColData *aColData, int32_t nColData) {
|
||||
if (aColData[0].nVal == 0) return;
|
||||
// TODO
|
||||
static int32_t tColDataCopyRowCell(SColData *pFromColData, int32_t iFromRow, SColData *pToColData, int32_t iToRow) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
if (IS_VAR_DATA_TYPE(pToColData->type)) {
|
||||
int32_t nData = (iFromRow < pFromColData->nVal - 1)
|
||||
? pFromColData->aOffset[iFromRow + 1] - pFromColData->aOffset[iFromRow]
|
||||
: pFromColData->nData - pFromColData->aOffset[iFromRow];
|
||||
if (iToRow == 0) {
|
||||
pToColData->aOffset[iToRow] = 0;
|
||||
}
|
||||
|
||||
if (iToRow < pToColData->nVal - 1) {
|
||||
pToColData->aOffset[iToRow + 1] = pToColData->aOffset[iToRow] + nData;
|
||||
}
|
||||
|
||||
memcpy(pToColData->pData + pToColData->aOffset[iToRow], pFromColData->pData + pFromColData->aOffset[iFromRow],
|
||||
nData);
|
||||
} else {
|
||||
memcpy(&pToColData->pData[TYPE_BYTES[pToColData->type] * iToRow],
|
||||
&pFromColData->pData[TYPE_BYTES[pToColData->type] * iFromRow], TYPE_BYTES[pToColData->type]);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tColDataCopyRowSingleCol(SColData *pFromColData, int32_t iFromRow, SColData *pToColData,
|
||||
int32_t iToRow) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
switch (pFromColData->flag) {
|
||||
case HAS_NONE:
|
||||
case HAS_NULL:
|
||||
break;
|
||||
case (HAS_NULL | HAS_NONE): {
|
||||
SET_BIT1(pToColData->pBitMap, iToRow, GET_BIT1(pFromColData->pBitMap, iFromRow));
|
||||
} break;
|
||||
case HAS_VALUE: {
|
||||
tColDataCopyRowCell(pFromColData, iFromRow, pToColData, iToRow);
|
||||
} break;
|
||||
case (HAS_VALUE | HAS_NONE):
|
||||
case (HAS_VALUE | HAS_NULL): {
|
||||
SET_BIT1(pToColData->pBitMap, iToRow, GET_BIT1(pFromColData->pBitMap, iFromRow));
|
||||
tColDataCopyRowCell(pFromColData, iFromRow, pToColData, iToRow);
|
||||
} break;
|
||||
case (HAS_VALUE | HAS_NULL | HAS_NONE): {
|
||||
SET_BIT2(pToColData->pBitMap, iToRow, GET_BIT2(pFromColData->pBitMap, iFromRow));
|
||||
tColDataCopyRowCell(pFromColData, iFromRow, pToColData, iToRow);
|
||||
} break;
|
||||
default:
|
||||
return -1;
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tColDataCopyRow(SColData *aFromColData, int32_t iFromRow, SColData *aToColData, int32_t iToRow,
|
||||
int32_t nColData) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
for (int32_t i = 0; i < nColData; i++) {
|
||||
code = tColDataCopyRowSingleCol(&aFromColData[i], iFromRow, &aToColData[i], iToRow);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tColDataCopyRowAppend(SColData *aFromColData, int32_t iFromRow, SColData *aToColData, int32_t nColData) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
for (int32_t i = 0; i < nColData; i++) {
|
||||
SColVal cv = {0};
|
||||
tColDataGetValue(&aFromColData[i], iFromRow, &cv);
|
||||
code = tColDataAppendValue(&aToColData[i], &cv);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tColDataMergeSortMerge(SColData *aColData, int32_t start, int32_t mid, int32_t end, int32_t nColData) {
|
||||
SColData *aDstColData = NULL;
|
||||
TSKEY *aKey = (TSKEY *)aColData[0].pData;
|
||||
|
||||
int32_t i = start, j = mid + 1, k = 0;
|
||||
|
||||
if (end > start) {
|
||||
aDstColData = taosMemoryCalloc(1, sizeof(SColData) * nColData);
|
||||
for (int c = 0; c < nColData; ++c) {
|
||||
tColDataInit(&aDstColData[c], aColData[c].cid, aColData[c].type, aColData[c].smaOn);
|
||||
}
|
||||
if (aDstColData == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
/*
|
||||
for (int32_t i = 0; i < nColData; i++) {
|
||||
tColDataCopy(&aColData[i], &aDstColData[i], tColDataDefaultMalloc, NULL);
|
||||
}
|
||||
*/
|
||||
}
|
||||
|
||||
while (i <= mid && j <= end) {
|
||||
if (aKey[i] <= aKey[j]) {
|
||||
// tColDataCopyRow(aColData, i++, aDstColData, k++);
|
||||
tColDataCopyRowAppend(aColData, i++, aDstColData, nColData);
|
||||
} else {
|
||||
// tColDataCopyRow(aColData, j++, aDstColData, k++);
|
||||
tColDataCopyRowAppend(aColData, j++, aDstColData, nColData);
|
||||
}
|
||||
}
|
||||
|
||||
while (i <= mid) {
|
||||
// tColDataCopyRow(aColData, i++, aDstColData, k++);
|
||||
tColDataCopyRowAppend(aColData, i++, aDstColData, nColData);
|
||||
}
|
||||
|
||||
while (j <= end) {
|
||||
// tColDataCopyRow(aColData, j++, aDstColData, k++);
|
||||
tColDataCopyRowAppend(aColData, j++, aDstColData, nColData);
|
||||
}
|
||||
|
||||
for (i = start, k = 0; i <= end; ++i, ++k) {
|
||||
tColDataCopyRow(aDstColData, k, aColData, i, nColData);
|
||||
}
|
||||
|
||||
if (aDstColData) {
|
||||
for (int32_t i = 0; i < nColData; i++) {
|
||||
tColDataDestroy(&aDstColData[i]);
|
||||
}
|
||||
taosMemoryFree(aDstColData);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t tColDataMergeSort(SColData *aColData, int32_t start, int32_t end, int32_t nColData) {
|
||||
int32_t ret = TSDB_CODE_SUCCESS;
|
||||
int32_t mid;
|
||||
|
||||
if (start >= end) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
mid = (start + end) / 2;
|
||||
|
||||
ret = tColDataMergeSort(aColData, start, mid, nColData);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
ret = tColDataMergeSort(aColData, mid + 1, end, nColData);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
return tColDataMergeSortMerge(aColData, start, mid, end, nColData);
|
||||
}
|
||||
|
||||
static int32_t tColDataSort(SColData *aColData, int32_t nColData) {
|
||||
int32_t nVal = aColData[0].nVal;
|
||||
|
||||
if (nVal < 2) return TSDB_CODE_SUCCESS;
|
||||
|
||||
return tColDataMergeSort(aColData, 0, nVal - 1, nColData);
|
||||
}
|
||||
static void tColDataMergeImpl(SColData *pColData, int32_t iStart, int32_t iEnd /* not included */) {
|
||||
switch (pColData->flag) {
|
||||
|
@ -2529,6 +2699,15 @@ void tColDataSortMerge(SArray *colDataArr) {
|
|||
tColDataSort(aColData, nColData);
|
||||
}
|
||||
|
||||
if (doMerge != 1) {
|
||||
for (int32_t iVal = 1; iVal < aColData[0].nVal; ++iVal) {
|
||||
if (aKey[iVal] == aKey[iVal - 1]) {
|
||||
doMerge = 1;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// merge -------
|
||||
if (doMerge) {
|
||||
tColDataMerge(aColData, nColData);
|
||||
|
|
|
@ -12,6 +12,7 @@ from util.dnodes import *
|
|||
from util.dnodes import TDDnodes
|
||||
from util.dnodes import TDDnode
|
||||
from util.cluster import *
|
||||
import subprocess
|
||||
|
||||
BASEVERSION = "3.0.1.8"
|
||||
class TDTestCase:
|
||||
|
@ -26,8 +27,21 @@ class TDTestCase:
|
|||
self.replicaVar = int(replicaVar)
|
||||
tdLog.debug(f"start to excute {__file__}")
|
||||
tdSql.init(conn.cursor())
|
||||
|
||||
def checkProcessPid(self,processName):
|
||||
i=0
|
||||
while i<60:
|
||||
print(f"wait stop {processName}")
|
||||
processPid = subprocess.getstatusoutput(f'ps aux|grep {processName} |grep -v "grep"|awk \'{{print $2}}\'')[1]
|
||||
print(f"times:{i},{processName}-pid:{processPid}")
|
||||
if(processPid == ""):
|
||||
break
|
||||
i += 1
|
||||
sleep(1)
|
||||
else:
|
||||
print(f'this processName is not stoped in 60s')
|
||||
|
||||
|
||||
|
||||
def getBuildPath(self):
|
||||
selfPath = os.path.dirname(os.path.realpath(__file__))
|
||||
|
||||
|
@ -115,7 +129,7 @@ class TDTestCase:
|
|||
# tdsqlF.query(f"select count(*) from {stb}")
|
||||
# tdsqlF.checkData(0,0,tableNumbers*recordNumbers1)
|
||||
os.system("pkill taosd")
|
||||
sleep(2)
|
||||
self.checkProcessPid("taosd")
|
||||
|
||||
print(f"start taosd: nohup taosd -c {cPath} & ")
|
||||
os.system(f" nohup taosd -c {cPath} & " )
|
||||
|
@ -123,7 +137,7 @@ class TDTestCase:
|
|||
tdLog.info(" LD_LIBRARY_PATH=/usr/lib taosBenchmark -f 0-others/compa4096.json -y ")
|
||||
os.system("LD_LIBRARY_PATH=/usr/lib taosBenchmark -f 0-others/compa4096.json -y")
|
||||
os.system("pkill taosd") # make sure all the data are saved in disk.
|
||||
|
||||
self.checkProcessPid("taosd")
|
||||
|
||||
|
||||
tdLog.printNoPrefix("==========step2:update new version ")
|
||||
|
|
Loading…
Reference in New Issue