Merge branch '3.0' of https://github.com/taosdata/TDengine into feat/alter_table
This commit is contained in:
commit
94cea61c97
|
@ -1271,7 +1271,6 @@ _error:
|
||||||
static int32_t getEndPosInDataBlock(STsdbReadHandle* pTsdbReadHandle, SDataBlockInfo* pBlockInfo);
|
static int32_t getEndPosInDataBlock(STsdbReadHandle* pTsdbReadHandle, SDataBlockInfo* pBlockInfo);
|
||||||
static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t capacity, int32_t numOfRows,
|
static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t capacity, int32_t numOfRows,
|
||||||
int32_t start, int32_t end);
|
int32_t start, int32_t end);
|
||||||
static void moveDataToFront(STsdbReadHandle* pTsdbReadHandle, int32_t numOfRows, int32_t numOfCols);
|
|
||||||
static void doCheckGeneratedBlockRange(STsdbReadHandle* pTsdbReadHandle);
|
static void doCheckGeneratedBlockRange(STsdbReadHandle* pTsdbReadHandle);
|
||||||
static void copyAllRemainRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, STableCheckInfo* pCheckInfo,
|
static void copyAllRemainRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, STableCheckInfo* pCheckInfo,
|
||||||
SDataBlockInfo* pBlockInfo, int32_t endPos);
|
SDataBlockInfo* pBlockInfo, int32_t endPos);
|
||||||
|
@ -1301,7 +1300,7 @@ static int32_t handleDataMergeIfNeeded(STsdbReadHandle* pTsdbReadHandle, SBlock*
|
||||||
if ((ascScan && (key != TSKEY_INITIAL_VAL && key < binfo.window.skey)) ||
|
if ((ascScan && (key != TSKEY_INITIAL_VAL && key < binfo.window.skey)) ||
|
||||||
(!ascScan && (key != TSKEY_INITIAL_VAL && key > binfo.window.ekey))) {
|
(!ascScan && (key != TSKEY_INITIAL_VAL && key > binfo.window.ekey))) {
|
||||||
// do not load file block into buffer
|
// do not load file block into buffer
|
||||||
int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1;
|
int32_t step = ascScan ? 1 : -1;
|
||||||
|
|
||||||
TSKEY maxKey =
|
TSKEY maxKey =
|
||||||
ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? (binfo.window.skey - step) : (binfo.window.ekey - step);
|
ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? (binfo.window.skey - step) : (binfo.window.ekey - step);
|
||||||
|
@ -1790,22 +1789,6 @@ static int32_t mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capa
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
static void moveDataToFront(STsdbReadHandle* pTsdbReadHandle, int32_t numOfRows, int32_t numOfCols) {
|
|
||||||
if (numOfRows == 0 || ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// if the buffer is not full in case of descending order query, move the data in the front of the buffer
|
|
||||||
if (numOfRows < pTsdbReadHandle->outputCapacity) {
|
|
||||||
int32_t emptySize = pTsdbReadHandle->outputCapacity - numOfRows;
|
|
||||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
|
||||||
SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i);
|
|
||||||
memmove((char*)pColInfo->pData, (char*)pColInfo->pData + emptySize * pColInfo->info.bytes,
|
|
||||||
numOfRows * pColInfo->info.bytes);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static void getQualifiedRowsPos(STsdbReadHandle* pTsdbReadHandle, int32_t startPos, int32_t endPos,
|
static void getQualifiedRowsPos(STsdbReadHandle* pTsdbReadHandle, int32_t startPos, int32_t endPos,
|
||||||
int32_t numOfExisted, int32_t* start, int32_t* end) {
|
int32_t numOfExisted, int32_t* start, int32_t* end) {
|
||||||
*start = -1;
|
*start = -1;
|
||||||
|
@ -1891,9 +1874,6 @@ static void copyAllRemainRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, STa
|
||||||
cur->lastKey = tsArray[endPos] + step;
|
cur->lastKey = tsArray[endPos] + step;
|
||||||
cur->blockCompleted = true;
|
cur->blockCompleted = true;
|
||||||
|
|
||||||
// if the buffer is not full in case of descending order query, move the data in the front of the buffer
|
|
||||||
moveDataToFront(pTsdbReadHandle, numOfRows, numOfCols);
|
|
||||||
|
|
||||||
// The value of pos may be -1 or pBlockInfo->rows, and it is invalid in both cases.
|
// The value of pos may be -1 or pBlockInfo->rows, and it is invalid in both cases.
|
||||||
pos = endPos + step;
|
pos = endPos + step;
|
||||||
updateInfoAfterMerge(pTsdbReadHandle, pCheckInfo, numOfRows, pos);
|
updateInfoAfterMerge(pTsdbReadHandle, pCheckInfo, numOfRows, pos);
|
||||||
|
@ -1944,18 +1924,18 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
|
||||||
assert(pCols->numOfRows == pBlock->numOfRows && tsArray[0] == pBlock->keyFirst &&
|
assert(pCols->numOfRows == pBlock->numOfRows && tsArray[0] == pBlock->keyFirst &&
|
||||||
tsArray[pBlock->numOfRows - 1] == pBlock->keyLast);
|
tsArray[pBlock->numOfRows - 1] == pBlock->keyLast);
|
||||||
|
|
||||||
|
bool ascScan = ASCENDING_TRAVERSE(pTsdbReadHandle->order);
|
||||||
|
int32_t step = ascScan ? 1 : -1;
|
||||||
|
|
||||||
// for search the endPos, so the order needs to reverse
|
// for search the endPos, so the order needs to reverse
|
||||||
int32_t order = (pTsdbReadHandle->order == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC;
|
int32_t order = ascScan ? TSDB_ORDER_DESC : TSDB_ORDER_ASC;
|
||||||
|
|
||||||
int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1;
|
|
||||||
int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pTsdbReadHandle));
|
int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pTsdbReadHandle));
|
||||||
|
|
||||||
STable* pTable = NULL;
|
|
||||||
int32_t endPos = getEndPosInDataBlock(pTsdbReadHandle, &blockInfo);
|
int32_t endPos = getEndPosInDataBlock(pTsdbReadHandle, &blockInfo);
|
||||||
|
|
||||||
|
STimeWindow* pWin = &blockInfo.window;
|
||||||
tsdbDebug("%p uid:%" PRIu64 " start merge data block, file block range:%" PRIu64 "-%" PRIu64
|
tsdbDebug("%p uid:%" PRIu64 " start merge data block, file block range:%" PRIu64 "-%" PRIu64
|
||||||
" rows:%d, start:%d, end:%d, %s",
|
" rows:%d, start:%d, end:%d, %s", pTsdbReadHandle, pCheckInfo->tableId, pWin->skey, pWin->ekey, blockInfo.rows,
|
||||||
pTsdbReadHandle, pCheckInfo->tableId, blockInfo.window.skey, blockInfo.window.ekey, blockInfo.rows,
|
|
||||||
cur->pos, endPos, pTsdbReadHandle->idStr);
|
cur->pos, endPos, pTsdbReadHandle->idStr);
|
||||||
|
|
||||||
// compared with the data from in-memory buffer, to generate the correct timestamp array list
|
// compared with the data from in-memory buffer, to generate the correct timestamp array list
|
||||||
|
@ -1986,20 +1966,16 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
|
||||||
}
|
}
|
||||||
|
|
||||||
TSKEY key = TD_ROW_KEY(row1);
|
TSKEY key = TD_ROW_KEY(row1);
|
||||||
if ((key > pTsdbReadHandle->window.ekey && ASCENDING_TRAVERSE(pTsdbReadHandle->order)) ||
|
if ((key > pTsdbReadHandle->window.ekey && ascScan) || (key < pTsdbReadHandle->window.ekey && !ascScan)) {
|
||||||
(key < pTsdbReadHandle->window.ekey && !ASCENDING_TRAVERSE(pTsdbReadHandle->order))) {
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (((pos > endPos || tsArray[pos] > pTsdbReadHandle->window.ekey) &&
|
if (((pos > endPos || tsArray[pos] > pTsdbReadHandle->window.ekey) && ascScan) ||
|
||||||
ASCENDING_TRAVERSE(pTsdbReadHandle->order)) ||
|
((pos < endPos || tsArray[pos] < pTsdbReadHandle->window.ekey) && !ascScan)) {
|
||||||
((pos < endPos || tsArray[pos] < pTsdbReadHandle->window.ekey) &&
|
|
||||||
!ASCENDING_TRAVERSE(pTsdbReadHandle->order))) {
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((key < tsArray[pos] && ASCENDING_TRAVERSE(pTsdbReadHandle->order)) ||
|
if ((key < tsArray[pos] && ascScan) || (key > tsArray[pos] && !ascScan)) {
|
||||||
(key > tsArray[pos] && !ASCENDING_TRAVERSE(pTsdbReadHandle->order))) {
|
|
||||||
if (rv1 != TD_ROW_SVER(row1)) {
|
if (rv1 != TD_ROW_SVER(row1)) {
|
||||||
// pSchema1 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row1));
|
// pSchema1 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row1));
|
||||||
rv1 = TD_ROW_SVER(row1);
|
rv1 = TD_ROW_SVER(row1);
|
||||||
|
@ -2054,11 +2030,8 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
if (TD_SUPPORT_UPDATE(pCfg->update)) {
|
if (TD_SUPPORT_UPDATE(pCfg->update)) {
|
||||||
if (lastKeyAppend != key) {
|
|
||||||
lastKeyAppend = key;
|
|
||||||
++curRow;
|
|
||||||
}
|
|
||||||
numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, curRow, pos, pos);
|
numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, curRow, pos, pos);
|
||||||
|
lastKeyAppend = key;
|
||||||
|
|
||||||
if (rv1 != TD_ROW_SVER(row1)) {
|
if (rv1 != TD_ROW_SVER(row1)) {
|
||||||
// pSchema1 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row1));
|
// pSchema1 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row1));
|
||||||
|
@ -2068,7 +2041,8 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
|
||||||
// pSchema2 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row2));
|
// pSchema2 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row2));
|
||||||
rv2 = TD_ROW_SVER(row2);
|
rv2 = TD_ROW_SVER(row2);
|
||||||
}
|
}
|
||||||
numOfRows +=
|
|
||||||
|
// still assign data into current row
|
||||||
mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, &curRow, row1, row2, numOfCols,
|
mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, &curRow, row1, row2, numOfCols,
|
||||||
pCheckInfo->tableId, pSchema1, pSchema2, pCfg->update, &lastKeyAppend);
|
pCheckInfo->tableId, pSchema1, pSchema2, pCfg->update, &lastKeyAppend);
|
||||||
|
|
||||||
|
@ -2081,12 +2055,13 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
|
||||||
cur->mixBlock = true;
|
cur->mixBlock = true;
|
||||||
|
|
||||||
moveToNextRowInMem(pCheckInfo);
|
moveToNextRowInMem(pCheckInfo);
|
||||||
|
++curRow;
|
||||||
|
|
||||||
pos += step;
|
pos += step;
|
||||||
} else {
|
} else {
|
||||||
moveToNextRowInMem(pCheckInfo);
|
moveToNextRowInMem(pCheckInfo);
|
||||||
}
|
}
|
||||||
} else if ((key > tsArray[pos] && ASCENDING_TRAVERSE(pTsdbReadHandle->order)) ||
|
} else if ((key > tsArray[pos] && ascScan) || (key < tsArray[pos] && !ascScan)) {
|
||||||
(key < tsArray[pos] && !ASCENDING_TRAVERSE(pTsdbReadHandle->order))) {
|
|
||||||
if (cur->win.skey == TSKEY_INITIAL_VAL) {
|
if (cur->win.skey == TSKEY_INITIAL_VAL) {
|
||||||
cur->win.skey = tsArray[pos];
|
cur->win.skey = tsArray[pos];
|
||||||
}
|
}
|
||||||
|
@ -2112,17 +2087,17 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
|
||||||
int32_t qstart = 0, qend = 0;
|
int32_t qstart = 0, qend = 0;
|
||||||
getQualifiedRowsPos(pTsdbReadHandle, pos, end, numOfRows, &qstart, &qend);
|
getQualifiedRowsPos(pTsdbReadHandle, pos, end, numOfRows, &qstart, &qend);
|
||||||
|
|
||||||
if ((lastKeyAppend != TSKEY_INITIAL_VAL) &&
|
if ((lastKeyAppend != TSKEY_INITIAL_VAL) && (lastKeyAppend != (ascScan ? tsArray[qstart] : tsArray[qend]))) {
|
||||||
(lastKeyAppend != (ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? tsArray[qstart] : tsArray[qend]))) {
|
|
||||||
++curRow;
|
++curRow;
|
||||||
}
|
}
|
||||||
|
|
||||||
numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, curRow, qstart, qend);
|
numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, curRow, qstart, qend);
|
||||||
pos += (qend - qstart + 1) * step;
|
pos += (qend - qstart + 1) * step;
|
||||||
if (numOfRows > 0) {
|
if (numOfRows > 0) {
|
||||||
curRow = numOfRows - 1;
|
curRow = numOfRows - 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
cur->win.ekey = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? tsArray[qend] : tsArray[qstart];
|
cur->win.ekey = ascScan ? tsArray[qend] : tsArray[qstart];
|
||||||
cur->lastKey = cur->win.ekey + step;
|
cur->lastKey = cur->win.ekey + step;
|
||||||
lastKeyAppend = cur->win.ekey;
|
lastKeyAppend = cur->win.ekey;
|
||||||
}
|
}
|
||||||
|
@ -2134,10 +2109,8 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
|
||||||
* copy them all to result buffer, since it may be overlapped with file data block.
|
* copy them all to result buffer, since it may be overlapped with file data block.
|
||||||
*/
|
*/
|
||||||
if (node == NULL ||
|
if (node == NULL ||
|
||||||
((TD_ROW_KEY((STSRow*)SL_GET_NODE_DATA(node)) > pTsdbReadHandle->window.ekey) &&
|
((TD_ROW_KEY((STSRow*)SL_GET_NODE_DATA(node)) > pTsdbReadHandle->window.ekey) && ascScan) ||
|
||||||
ASCENDING_TRAVERSE(pTsdbReadHandle->order)) ||
|
((TD_ROW_KEY((STSRow*)SL_GET_NODE_DATA(node)) < pTsdbReadHandle->window.ekey) && !ascScan)) {
|
||||||
((TD_ROW_KEY((STSRow*)SL_GET_NODE_DATA(node)) < pTsdbReadHandle->window.ekey) &&
|
|
||||||
!ASCENDING_TRAVERSE(pTsdbReadHandle->order))) {
|
|
||||||
// no data in cache or data in cache is greater than the ekey of time window, load data from file block
|
// no data in cache or data in cache is greater than the ekey of time window, load data from file block
|
||||||
if (cur->win.skey == TSKEY_INITIAL_VAL) {
|
if (cur->win.skey == TSKEY_INITIAL_VAL) {
|
||||||
cur->win.skey = tsArray[pos];
|
cur->win.skey = tsArray[pos];
|
||||||
|
@ -2149,22 +2122,20 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
|
||||||
numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, start, end);
|
numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, start, end);
|
||||||
pos += (end - start + 1) * step;
|
pos += (end - start + 1) * step;
|
||||||
|
|
||||||
cur->win.ekey = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? tsArray[end] : tsArray[start];
|
cur->win.ekey = ascScan ? tsArray[end] : tsArray[start];
|
||||||
cur->lastKey = cur->win.ekey + step;
|
cur->lastKey = cur->win.ekey + step;
|
||||||
cur->mixBlock = true;
|
cur->mixBlock = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
cur->blockCompleted =
|
cur->blockCompleted = (((pos > endPos || cur->lastKey > pTsdbReadHandle->window.ekey) && ascScan) ||
|
||||||
(((pos > endPos || cur->lastKey > pTsdbReadHandle->window.ekey) && ASCENDING_TRAVERSE(pTsdbReadHandle->order)) ||
|
((pos < endPos || cur->lastKey < pTsdbReadHandle->window.ekey) && !ascScan));
|
||||||
((pos < endPos || cur->lastKey < pTsdbReadHandle->window.ekey) && !ASCENDING_TRAVERSE(pTsdbReadHandle->order)));
|
|
||||||
|
|
||||||
if (!ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
|
if (!ascScan) {
|
||||||
TSWAP(cur->win.skey, cur->win.ekey);
|
TSWAP(cur->win.skey, cur->win.ekey);
|
||||||
}
|
}
|
||||||
|
|
||||||
moveDataToFront(pTsdbReadHandle, numOfRows, numOfCols);
|
|
||||||
updateInfoAfterMerge(pTsdbReadHandle, pCheckInfo, numOfRows, pos);
|
updateInfoAfterMerge(pTsdbReadHandle, pCheckInfo, numOfRows, pos);
|
||||||
doCheckGeneratedBlockRange(pTsdbReadHandle);
|
doCheckGeneratedBlockRange(pTsdbReadHandle);
|
||||||
|
|
||||||
|
|
|
@ -137,6 +137,8 @@ static void destroySmsg(SSrvMsg* smsg);
|
||||||
// check whether already read complete packet
|
// check whether already read complete packet
|
||||||
static SSrvConn* createConn(void* hThrd);
|
static SSrvConn* createConn(void* hThrd);
|
||||||
static void destroyConn(SSrvConn* conn, bool clear /*clear handle or not*/);
|
static void destroyConn(SSrvConn* conn, bool clear /*clear handle or not*/);
|
||||||
|
static void destroyConnRegArg(SSrvConn* conn);
|
||||||
|
|
||||||
static int reallocConnRefHandle(SSrvConn* conn);
|
static int reallocConnRefHandle(SSrvConn* conn);
|
||||||
|
|
||||||
static void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd);
|
static void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd);
|
||||||
|
@ -429,6 +431,8 @@ static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) {
|
||||||
if (smsg->type == Release) {
|
if (smsg->type == Release) {
|
||||||
pHead->msgType = 0;
|
pHead->msgType = 0;
|
||||||
pConn->status = ConnNormal;
|
pConn->status = ConnNormal;
|
||||||
|
|
||||||
|
destroyConnRegArg(pConn);
|
||||||
transUnrefSrvHandle(pConn);
|
transUnrefSrvHandle(pConn);
|
||||||
} else {
|
} else {
|
||||||
pHead->msgType = pMsg->msgType;
|
pHead->msgType = pMsg->msgType;
|
||||||
|
@ -800,6 +804,12 @@ static void destroyConn(SSrvConn* conn, bool clear) {
|
||||||
// uv_shutdown(req, (uv_stream_t*)conn->pTcp, uvShutDownCb);
|
// uv_shutdown(req, (uv_stream_t*)conn->pTcp, uvShutDownCb);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
static void destroyConnRegArg(SSrvConn* conn) {
|
||||||
|
if (conn->regArg.init == 1) {
|
||||||
|
transFreeMsg(conn->regArg.msg.pCont);
|
||||||
|
conn->regArg.init = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
static int reallocConnRefHandle(SSrvConn* conn) {
|
static int reallocConnRefHandle(SSrvConn* conn) {
|
||||||
uvReleaseExHandle(conn->refId);
|
uvReleaseExHandle(conn->refId);
|
||||||
uvRemoveExHandle(conn->refId);
|
uvRemoveExHandle(conn->refId);
|
||||||
|
@ -827,16 +837,9 @@ static void uvDestroyConn(uv_handle_t* handle) {
|
||||||
// uv_timer_stop(&conn->pTimer);
|
// uv_timer_stop(&conn->pTimer);
|
||||||
transQueueDestroy(&conn->srvMsgs);
|
transQueueDestroy(&conn->srvMsgs);
|
||||||
|
|
||||||
if (conn->regArg.init == 1) {
|
|
||||||
transFreeMsg(conn->regArg.msg.pCont);
|
|
||||||
conn->regArg.init = 0;
|
|
||||||
}
|
|
||||||
QUEUE_REMOVE(&conn->queue);
|
QUEUE_REMOVE(&conn->queue);
|
||||||
taosMemoryFree(conn->pTcp);
|
taosMemoryFree(conn->pTcp);
|
||||||
if (conn->regArg.init == 1) {
|
destroyConnRegArg(conn);
|
||||||
transFreeMsg(conn->regArg.msg.pCont);
|
|
||||||
conn->regArg.init = 0;
|
|
||||||
}
|
|
||||||
taosMemoryFree(conn);
|
taosMemoryFree(conn);
|
||||||
|
|
||||||
if (thrd->quit && QUEUE_IS_EMPTY(&thrd->conn)) {
|
if (thrd->quit && QUEUE_IS_EMPTY(&thrd->conn)) {
|
||||||
|
|
Loading…
Reference in New Issue