refactor: move compress data in query threads.
This commit is contained in:
parent
4aeb3c5c90
commit
92b247e5b4
|
@ -104,7 +104,7 @@ void dsReset(DataSinkHandle handle);
|
||||||
* @param handle
|
* @param handle
|
||||||
* @param pLen data length
|
* @param pLen data length
|
||||||
*/
|
*/
|
||||||
void dsGetDataLength(DataSinkHandle handle, int64_t* pLen, bool* pQueryEnd);
|
void dsGetDataLength(DataSinkHandle handle, int64_t* pLen, int64_t* pRawLen, bool* pQueryEnd);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get data, the caller needs to allocate data memory.
|
* Get data, the caller needs to allocate data memory.
|
||||||
|
|
|
@ -2217,13 +2217,14 @@ int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableR
|
||||||
|
|
||||||
int32_t len = tsDecompressString((void*)pRsp->data, htonl(pRsp->compLen), 1, pResultInfo->decompressBuf, payloadLen,
|
int32_t len = tsDecompressString((void*)pRsp->data, htonl(pRsp->compLen), 1, pResultInfo->decompressBuf, payloadLen,
|
||||||
ONE_STAGE_COMP, NULL, 0);
|
ONE_STAGE_COMP, NULL, 0);
|
||||||
ASSERT(len == payloadLen);
|
// ASSERT(len == payloadLen);
|
||||||
|
|
||||||
pResultInfo->pData = pResultInfo->decompressBuf;
|
pResultInfo->pData = pResultInfo->decompressBuf;
|
||||||
pResultInfo->payloadLen = payloadLen;
|
pResultInfo->payloadLen = payloadLen;
|
||||||
} else {
|
} else {
|
||||||
pResultInfo->pData = (void*)pRsp->data;
|
pResultInfo->pData = (void*)pRsp->data;
|
||||||
pResultInfo->payloadLen = htonl(pRsp->compLen);
|
pResultInfo->payloadLen = htonl(pRsp->compLen);
|
||||||
|
ASSERT(pRsp->compLen == pRsp->payloadLen);
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO handle the compressed case
|
// TODO handle the compressed case
|
||||||
|
|
|
@ -821,32 +821,21 @@ TEST(clientCase, projection_query_tables) {
|
||||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||||
ASSERT_NE(pConn, nullptr);
|
ASSERT_NE(pConn, nullptr);
|
||||||
|
|
||||||
|
TAOS_RES* pRes = NULL;
|
||||||
|
|
||||||
// TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 1");
|
// TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 1");
|
||||||
// if (taos_errno(pRes) != 0) {
|
// if (taos_errno(pRes) != 0) {
|
||||||
// printf("error in create db, reason:%s\n", taos_errstr(pRes));
|
// printf("error in create db, reason:%s\n", taos_errstr(pRes));
|
||||||
// }
|
// }
|
||||||
// taos_free_result(pRes);
|
// taos_free_result(pRes);
|
||||||
|
/*
|
||||||
TAOS_RES* pRes = taos_query(pConn, "alter local 'fqdn 127.0.0.1'");
|
TAOS_RES* pRes = taos_query(pConn, "select last(ts), ts from cache_1.t1");
|
||||||
if (taos_errno(pRes) != 0) {
|
|
||||||
printf("failed to exec query, %s\n", taos_errstr(pRes));
|
|
||||||
}
|
|
||||||
|
|
||||||
taos_free_result(pRes);
|
|
||||||
|
|
||||||
pRes = taos_query(pConn, "select last(ts), ts from cache_1.t1");
|
|
||||||
// pRes = taos_query(pConn, "select last(ts), ts from cache_1.no_pk_t1");
|
// pRes = taos_query(pConn, "select last(ts), ts from cache_1.no_pk_t1");
|
||||||
if (taos_errno(pRes) != 0) {
|
if (taos_errno(pRes) != 0) {
|
||||||
printf("failed to exec query, %s\n", taos_errstr(pRes));
|
printf("failed to create table tu, reason:%s\n", taos_errstr(pRes));
|
||||||
}
|
}
|
||||||
taos_free_result(pRes);
|
taos_free_result(pRes);
|
||||||
|
|
||||||
// pRes = taos_query(pConn, "create stream stream_1 trigger at_once fill_history 1 ignore expired 0 into str_res1 as select _wstart as ts, count(*) from stable_1 interval(10s);");
|
|
||||||
// if (taos_errno(pRes) != 0) {
|
|
||||||
// printf("failed to create table tu, reason:%s\n", taos_errstr(pRes));
|
|
||||||
// }
|
|
||||||
// taos_free_result(pRes);
|
|
||||||
|
|
||||||
pRes = taos_query(pConn, "create table tu using st2 tags(2)");
|
pRes = taos_query(pConn, "create table tu using st2 tags(2)");
|
||||||
if (taos_errno(pRes) != 0) {
|
if (taos_errno(pRes) != 0) {
|
||||||
printf("failed to create table tu, reason:%s\n", taos_errstr(pRes));
|
printf("failed to create table tu, reason:%s\n", taos_errstr(pRes));
|
||||||
|
@ -876,27 +865,27 @@ TEST(clientCase, projection_query_tables) {
|
||||||
for(int32_t j = 0; j < 1; ++j) {
|
for(int32_t j = 0; j < 1; ++j) {
|
||||||
start += 20;
|
start += 20;
|
||||||
for (int32_t i = 0; i < 1; ++i) {
|
for (int32_t i = 0; i < 1; ++i) {
|
||||||
createNewTable(pConn, i, 100, start, pstr);
|
createNewTable(pConn, i, 100000, 0, pstr);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
*/
|
||||||
|
|
||||||
//
|
pRes = taos_query(pConn, "select * from abc1.st2");
|
||||||
// pRes = taos_query(pConn, "select * from tu");
|
if (taos_errno(pRes) != 0) {
|
||||||
// if (taos_errno(pRes) != 0) {
|
printf("failed to select from table, reason:%s\n", taos_errstr(pRes));
|
||||||
// printf("failed to select from table, reason:%s\n", taos_errstr(pRes));
|
taos_free_result(pRes);
|
||||||
// taos_free_result(pRes);
|
ASSERT_TRUE(false);
|
||||||
// ASSERT_TRUE(false);
|
}
|
||||||
// }
|
|
||||||
//
|
TAOS_ROW pRow = NULL;
|
||||||
// TAOS_ROW pRow = NULL;
|
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
|
||||||
// TAOS_FIELD* pFields = taos_fetch_fields(pRes);
|
int32_t numOfFields = taos_num_fields(pRes);
|
||||||
// int32_t numOfFields = taos_num_fields(pRes);
|
|
||||||
//
|
char str[512] = {0};
|
||||||
// char str[512] = {0};
|
while ((pRow = taos_fetch_row(pRes)) != NULL) {
|
||||||
// while ((pRow = taos_fetch_row(pRes)) != NULL) {
|
// int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
|
||||||
// int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
|
// printf("%s\n", str);
|
||||||
// printf("%s\n", str);
|
}
|
||||||
// }
|
|
||||||
|
|
||||||
taos_free_result(pRes);
|
taos_free_result(pRes);
|
||||||
taos_close(pConn);
|
taos_close(pConn);
|
||||||
|
|
|
@ -1957,6 +1957,7 @@ int32_t qExplainGetRspFromCtx(void *ctx, SRetrieveTableRsp **pRsp) {
|
||||||
int32_t len = blockEncode(pBlock, rsp->data, taosArrayGetSize(pBlock->pDataBlock));
|
int32_t len = blockEncode(pBlock, rsp->data, taosArrayGetSize(pBlock->pDataBlock));
|
||||||
|
|
||||||
rsp->compLen = htonl(len);
|
rsp->compLen = htonl(len);
|
||||||
|
rsp->payloadLen = rsp->compLen;
|
||||||
|
|
||||||
blockDataDestroy(pBlock);
|
blockDataDestroy(pBlock);
|
||||||
|
|
||||||
|
|
|
@ -36,7 +36,7 @@ typedef struct SDataSinkManager {
|
||||||
typedef int32_t (*FPutDataBlock)(struct SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue);
|
typedef int32_t (*FPutDataBlock)(struct SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue);
|
||||||
typedef void (*FEndPut)(struct SDataSinkHandle* pHandle, uint64_t useconds);
|
typedef void (*FEndPut)(struct SDataSinkHandle* pHandle, uint64_t useconds);
|
||||||
typedef void (*FReset)(struct SDataSinkHandle* pHandle);
|
typedef void (*FReset)(struct SDataSinkHandle* pHandle);
|
||||||
typedef void (*FGetDataLength)(struct SDataSinkHandle* pHandle, int64_t* pLen, bool* pQueryEnd);
|
typedef void (*FGetDataLength)(struct SDataSinkHandle* pHandle, int64_t* pLen, int64_t* pRowLen, bool* pQueryEnd);
|
||||||
typedef int32_t (*FGetDataBlock)(struct SDataSinkHandle* pHandle, SOutputData* pOutput);
|
typedef int32_t (*FGetDataBlock)(struct SDataSinkHandle* pHandle, SOutputData* pOutput);
|
||||||
typedef int32_t (*FDestroyDataSinker)(struct SDataSinkHandle* pHandle);
|
typedef int32_t (*FDestroyDataSinker)(struct SDataSinkHandle* pHandle);
|
||||||
typedef int32_t (*FGetCacheSize)(struct SDataSinkHandle* pHandle, uint64_t* size);
|
typedef int32_t (*FGetCacheSize)(struct SDataSinkHandle* pHandle, uint64_t* size);
|
||||||
|
|
|
@ -154,7 +154,7 @@ static void endPut(struct SDataSinkHandle* pHandle, uint64_t useconds) {
|
||||||
taosThreadMutexUnlock(&pDeleter->mutex);
|
taosThreadMutexUnlock(&pDeleter->mutex);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, bool* pQueryEnd) {
|
static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, int64_t* pRawLen, bool* pQueryEnd) {
|
||||||
SDataDeleterHandle* pDeleter = (SDataDeleterHandle*)pHandle;
|
SDataDeleterHandle* pDeleter = (SDataDeleterHandle*)pHandle;
|
||||||
if (taosQueueEmpty(pDeleter->pDataBlocks)) {
|
if (taosQueueEmpty(pDeleter->pDataBlocks)) {
|
||||||
*pQueryEnd = pDeleter->queryEnd;
|
*pQueryEnd = pDeleter->queryEnd;
|
||||||
|
@ -171,6 +171,8 @@ static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, bool* pQueryE
|
||||||
|
|
||||||
SDataCacheEntry* pEntry = (SDataCacheEntry*)pDeleter->nextOutput.pData;
|
SDataCacheEntry* pEntry = (SDataCacheEntry*)pDeleter->nextOutput.pData;
|
||||||
*pLen = pEntry->dataLen;
|
*pLen = pEntry->dataLen;
|
||||||
|
*pRawLen = pEntry->dataLen;
|
||||||
|
|
||||||
*pQueryEnd = pDeleter->queryEnd;
|
*pQueryEnd = pDeleter->queryEnd;
|
||||||
qDebug("got data len %" PRId64 ", row num %d in sink", *pLen,
|
qDebug("got data len %" PRId64 ", row num %d in sink", *pLen,
|
||||||
((SDataCacheEntry*)(pDeleter->nextOutput.pData))->numOfRows);
|
((SDataCacheEntry*)(pDeleter->nextOutput.pData))->numOfRows);
|
||||||
|
@ -186,6 +188,7 @@ static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) {
|
||||||
pOutput->queryEnd = pDeleter->queryEnd;
|
pOutput->queryEnd = pDeleter->queryEnd;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
SDataCacheEntry* pEntry = (SDataCacheEntry*)(pDeleter->nextOutput.pData);
|
SDataCacheEntry* pEntry = (SDataCacheEntry*)(pDeleter->nextOutput.pData);
|
||||||
memcpy(pOutput->pData, pEntry->data, pEntry->dataLen);
|
memcpy(pOutput->pData, pEntry->data, pEntry->dataLen);
|
||||||
pDeleter->pParam->pUidList = NULL;
|
pDeleter->pParam->pUidList = NULL;
|
||||||
|
|
|
@ -31,6 +31,7 @@ typedef struct SDataDispatchBuf {
|
||||||
} SDataDispatchBuf;
|
} SDataDispatchBuf;
|
||||||
|
|
||||||
typedef struct SDataCacheEntry {
|
typedef struct SDataCacheEntry {
|
||||||
|
int32_t rawLen;
|
||||||
int32_t dataLen;
|
int32_t dataLen;
|
||||||
int32_t numOfRows;
|
int32_t numOfRows;
|
||||||
int32_t numOfCols;
|
int32_t numOfCols;
|
||||||
|
@ -78,10 +79,26 @@ static void toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData* pIn
|
||||||
pEntry->dataLen = 0;
|
pEntry->dataLen = 0;
|
||||||
|
|
||||||
pBuf->useSize = sizeof(SDataCacheEntry);
|
pBuf->useSize = sizeof(SDataCacheEntry);
|
||||||
pEntry->dataLen = blockEncode(pInput->pData, pEntry->data, numOfCols);
|
|
||||||
// ASSERT(pEntry->numOfRows == *(int32_t*)(pEntry->data + 8));
|
// ASSERT(pEntry->numOfRows == *(int32_t*)(pEntry->data + 8));
|
||||||
// ASSERT(pEntry->numOfCols == *(int32_t*)(pEntry->data + 8 + 4));
|
// ASSERT(pEntry->numOfCols == *(int32_t*)(pEntry->data + 8 + 4));
|
||||||
|
|
||||||
|
{
|
||||||
|
if (pBuf->allocSize > 8192) {
|
||||||
|
char* p = taosMemoryMalloc(pBuf->allocSize);
|
||||||
|
int32_t dataLen = blockEncode(pInput->pData, p, numOfCols);
|
||||||
|
|
||||||
|
int32_t len = tsCompressString(p, dataLen, 1, pEntry->data, pBuf->allocSize, ONE_STAGE_COMP, NULL, 0);
|
||||||
|
|
||||||
|
pEntry->compressed = 1;
|
||||||
|
pEntry->dataLen = len;
|
||||||
|
pEntry->rawLen = dataLen;
|
||||||
|
taosMemoryFree(p);
|
||||||
|
} else {
|
||||||
|
pEntry->dataLen = blockEncode(pInput->pData, pEntry->data, numOfCols);
|
||||||
|
pEntry->rawLen = pEntry->dataLen;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pBuf->useSize += pEntry->dataLen;
|
pBuf->useSize += pEntry->dataLen;
|
||||||
|
|
||||||
atomic_add_fetch_64(&pHandle->cachedSize, pEntry->dataLen);
|
atomic_add_fetch_64(&pHandle->cachedSize, pEntry->dataLen);
|
||||||
|
@ -165,7 +182,7 @@ static void resetDispatcher(struct SDataSinkHandle* pHandle) {
|
||||||
taosThreadMutexUnlock(&pDispatcher->mutex);
|
taosThreadMutexUnlock(&pDispatcher->mutex);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, bool* pQueryEnd) {
|
static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, int64_t* pRowLen, bool* pQueryEnd) {
|
||||||
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
|
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
|
||||||
if (taosQueueEmpty(pDispatcher->pDataBlocks)) {
|
if (taosQueueEmpty(pDispatcher->pDataBlocks)) {
|
||||||
*pQueryEnd = pDispatcher->queryEnd;
|
*pQueryEnd = pDispatcher->queryEnd;
|
||||||
|
@ -182,9 +199,7 @@ static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, bool* pQueryE
|
||||||
|
|
||||||
SDataCacheEntry* pEntry = (SDataCacheEntry*)pDispatcher->nextOutput.pData;
|
SDataCacheEntry* pEntry = (SDataCacheEntry*)pDispatcher->nextOutput.pData;
|
||||||
*pLen = pEntry->dataLen;
|
*pLen = pEntry->dataLen;
|
||||||
|
*pRowLen = pEntry->rawLen;
|
||||||
// ASSERT(pEntry->numOfRows == *(int32_t*)(pEntry->data + 8));
|
|
||||||
// ASSERT(pEntry->numOfCols == *(int32_t*)(pEntry->data + 8 + 4));
|
|
||||||
|
|
||||||
*pQueryEnd = pDispatcher->queryEnd;
|
*pQueryEnd = pDispatcher->queryEnd;
|
||||||
qDebug("got data len %" PRId64 ", row num %d in sink", *pLen,
|
qDebug("got data len %" PRId64 ", row num %d in sink", *pLen,
|
||||||
|
@ -202,6 +217,7 @@ static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) {
|
||||||
pOutput->queryEnd = pDispatcher->queryEnd;
|
pOutput->queryEnd = pDispatcher->queryEnd;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
SDataCacheEntry* pEntry = (SDataCacheEntry*)(pDispatcher->nextOutput.pData);
|
SDataCacheEntry* pEntry = (SDataCacheEntry*)(pDispatcher->nextOutput.pData);
|
||||||
memcpy(pOutput->pData, pEntry->data, pEntry->dataLen);
|
memcpy(pOutput->pData, pEntry->data, pEntry->dataLen);
|
||||||
pOutput->numOfRows = pEntry->numOfRows;
|
pOutput->numOfRows = pEntry->numOfRows;
|
||||||
|
|
|
@ -370,7 +370,7 @@ static void endPut(struct SDataSinkHandle* pHandle, uint64_t useconds) {
|
||||||
taosThreadMutexUnlock(&pInserter->mutex);
|
taosThreadMutexUnlock(&pInserter->mutex);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, bool* pQueryEnd) {
|
static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, int64_t* pRawLen, bool* pQueryEnd) {
|
||||||
SDataInserterHandle* pDispatcher = (SDataInserterHandle*)pHandle;
|
SDataInserterHandle* pDispatcher = (SDataInserterHandle*)pHandle;
|
||||||
*pLen = pDispatcher->submitRes.affectedRows;
|
*pLen = pDispatcher->submitRes.affectedRows;
|
||||||
qDebug("got total affectedRows %" PRId64, *pLen);
|
qDebug("got total affectedRows %" PRId64, *pLen);
|
||||||
|
|
|
@ -75,9 +75,9 @@ void dsReset(DataSinkHandle handle) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void dsGetDataLength(DataSinkHandle handle, int64_t* pLen, bool* pQueryEnd) {
|
void dsGetDataLength(DataSinkHandle handle, int64_t* pLen, int64_t* pRawLen, bool* pQueryEnd) {
|
||||||
SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle;
|
SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle;
|
||||||
pHandleImpl->fGetLen(pHandleImpl, pLen, pQueryEnd);
|
pHandleImpl->fGetLen(pHandleImpl, pLen, pRawLen, pQueryEnd);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t dsGetDataBlock(DataSinkHandle handle, SOutputData* pOutput) {
|
int32_t dsGetDataBlock(DataSinkHandle handle, SOutputData* pOutput) {
|
||||||
|
|
|
@ -38,7 +38,7 @@ int32_t qwBuildAndSendDropRsp(SRpcHandleInfo *pConn, int32_t code);
|
||||||
int32_t qwBuildAndSendCancelRsp(SRpcHandleInfo *pConn, int32_t code);
|
int32_t qwBuildAndSendCancelRsp(SRpcHandleInfo *pConn, int32_t code);
|
||||||
int32_t qwBuildAndSendFetchRsp(int32_t rspType, SRpcHandleInfo *pConn, SRetrieveTableRsp *pRsp, int32_t dataLength,
|
int32_t qwBuildAndSendFetchRsp(int32_t rspType, SRpcHandleInfo *pConn, SRetrieveTableRsp *pRsp, int32_t dataLength,
|
||||||
int32_t code);
|
int32_t code);
|
||||||
void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete);
|
void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, int32_t rawDataLen, bool qComplete);
|
||||||
int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn);
|
int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn);
|
||||||
int32_t qwBuildAndSendQueryRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code, SQWTaskCtx *ctx);
|
int32_t qwBuildAndSendQueryRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code, SQWTaskCtx *ctx);
|
||||||
int32_t qwBuildAndSendExplainRsp(SRpcHandleInfo *pConn, SArray *pExecList);
|
int32_t qwBuildAndSendExplainRsp(SRpcHandleInfo *pConn, SArray *pExecList);
|
||||||
|
|
|
@ -29,13 +29,14 @@ int32_t qwMallocFetchRsp(int8_t rpcMalloc, int32_t length, SRetrieveTableRsp **r
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete) {
|
void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, int32_t rawDataLen, bool qComplete) {
|
||||||
SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg;
|
SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg;
|
||||||
|
|
||||||
rsp->useconds = htobe64(input->useconds);
|
rsp->useconds = htobe64(input->useconds);
|
||||||
rsp->completed = qComplete;
|
rsp->completed = qComplete;
|
||||||
rsp->precision = input->precision;
|
rsp->precision = input->precision;
|
||||||
rsp->compressed = input->compressed;
|
rsp->compressed = input->compressed;
|
||||||
|
rsp->payloadLen = htonl(rawDataLen);
|
||||||
rsp->compLen = htonl(len);
|
rsp->compLen = htonl(len);
|
||||||
rsp->numOfRows = htobe64(input->numOfRows);
|
rsp->numOfRows = htobe64(input->numOfRows);
|
||||||
rsp->numOfCols = htonl(input->numOfCols);
|
rsp->numOfCols = htonl(input->numOfCols);
|
||||||
|
|
|
@ -102,7 +102,7 @@ int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!ctx->needFetch) {
|
if (!ctx->needFetch) {
|
||||||
dsGetDataLength(ctx->sinkHandle, &ctx->affectedRows, NULL);
|
dsGetDataLength(ctx->sinkHandle, &ctx->affectedRows, NULL, NULL);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -285,8 +285,10 @@ int32_t qwGenerateSchHbRsp(SQWorker *mgmt, SQWSchStatus *sch, SQWHbInfo *hbInfo)
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void **rspMsg, SOutputData *pOutput) {
|
int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, int32_t *pRawDataLen, void **rspMsg,
|
||||||
|
SOutputData *pOutput) {
|
||||||
int64_t len = 0;
|
int64_t len = 0;
|
||||||
|
int64_t rawLen = 0;
|
||||||
SRetrieveTableRsp *rsp = NULL;
|
SRetrieveTableRsp *rsp = NULL;
|
||||||
bool queryEnd = false;
|
bool queryEnd = false;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
@ -300,7 +302,7 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen,
|
||||||
*dataLen = 0;
|
*dataLen = 0;
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
dsGetDataLength(ctx->sinkHandle, &len, &queryEnd);
|
dsGetDataLength(ctx->sinkHandle, &len, &rawLen, &queryEnd);
|
||||||
|
|
||||||
if (len < 0) {
|
if (len < 0) {
|
||||||
QW_TASK_ELOG("invalid length from dsGetDataLength, length:%" PRId64 "", len);
|
QW_TASK_ELOG("invalid length from dsGetDataLength, length:%" PRId64 "", len);
|
||||||
|
@ -343,6 +345,7 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen,
|
||||||
QW_TASK_DLOG("there are data in sink, dataLength:%" PRId64 "", len);
|
QW_TASK_DLOG("there are data in sink, dataLength:%" PRId64 "", len);
|
||||||
|
|
||||||
*dataLen += len;
|
*dataLen += len;
|
||||||
|
*pRawDataLen += rawLen;
|
||||||
|
|
||||||
QW_ERR_RET(qwMallocFetchRsp(!ctx->localExec, *dataLen, &rsp));
|
QW_ERR_RET(qwMallocFetchRsp(!ctx->localExec, *dataLen, &rsp));
|
||||||
|
|
||||||
|
@ -375,11 +378,11 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen,
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pOutput->numOfRows >= QW_MIN_RES_ROWS) {
|
// if (pOutput->numOfRows >= QW_MIN_RES_ROWS) {
|
||||||
QW_TASK_DLOG("task fetched blocks %d rows %" PRId64 " reaches the min rows", pOutput->numOfBlocks,
|
// QW_TASK_DLOG("task fetched blocks %d rows %" PRId64 " reaches the min rows", pOutput->numOfBlocks,
|
||||||
pOutput->numOfRows);
|
// pOutput->numOfRows);
|
||||||
break;
|
break;
|
||||||
}
|
// }
|
||||||
}
|
}
|
||||||
|
|
||||||
*rspMsg = rsp;
|
*rspMsg = rsp;
|
||||||
|
@ -389,11 +392,12 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen,
|
||||||
|
|
||||||
int32_t qwGetDeleteResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SDeleteRes *pRes) {
|
int32_t qwGetDeleteResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SDeleteRes *pRes) {
|
||||||
int64_t len = 0;
|
int64_t len = 0;
|
||||||
|
int64_t rawLen = 0;
|
||||||
bool queryEnd = false;
|
bool queryEnd = false;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SOutputData output = {0};
|
SOutputData output = {0};
|
||||||
|
|
||||||
dsGetDataLength(ctx->sinkHandle, &len, &queryEnd);
|
dsGetDataLength(ctx->sinkHandle, &len, &rawLen, &queryEnd);
|
||||||
|
|
||||||
if (len <= 0 || len != sizeof(SDeleterRes)) {
|
if (len <= 0 || len != sizeof(SDeleterRes)) {
|
||||||
QW_TASK_ELOG("invalid length from dsGetDataLength, length:%" PRId64, len);
|
QW_TASK_ELOG("invalid length from dsGetDataLength, length:%" PRId64, len);
|
||||||
|
@ -433,9 +437,10 @@ int32_t qwQuickRspFetchReq(QW_FPARAMS_DEF, SQWTaskCtx * ctx, SQWMsg *qwMsg, i
|
||||||
if (QW_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
|
if (QW_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
|
||||||
void *rsp = NULL;
|
void *rsp = NULL;
|
||||||
int32_t dataLen = 0;
|
int32_t dataLen = 0;
|
||||||
|
int32_t rawLen = 0;
|
||||||
SOutputData sOutput = {0};
|
SOutputData sOutput = {0};
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput);
|
code = qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rawLen, &rsp, &sOutput);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (NULL == rsp && TSDB_CODE_SUCCESS == code) {
|
if (NULL == rsp && TSDB_CODE_SUCCESS == code) {
|
||||||
|
@ -445,7 +450,7 @@ int32_t qwQuickRspFetchReq(QW_FPARAMS_DEF, SQWTaskCtx * ctx, SQWMsg *qwMsg, i
|
||||||
if (NULL != rsp) {
|
if (NULL != rsp) {
|
||||||
bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd);
|
bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd);
|
||||||
|
|
||||||
qwBuildFetchRsp(rsp, &sOutput, dataLen, qComplete);
|
qwBuildFetchRsp(rsp, &sOutput, dataLen, rawLen, qComplete);
|
||||||
if (qComplete) {
|
if (qComplete) {
|
||||||
atomic_store_8((int8_t *)&ctx->queryEnd, true);
|
atomic_store_8((int8_t *)&ctx->queryEnd, true);
|
||||||
}
|
}
|
||||||
|
@ -791,6 +796,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
|
||||||
SQWPhaseInput input = {0};
|
SQWPhaseInput input = {0};
|
||||||
void *rsp = NULL;
|
void *rsp = NULL;
|
||||||
int32_t dataLen = 0;
|
int32_t dataLen = 0;
|
||||||
|
int32_t rawLen = 0;
|
||||||
bool queryStop = false;
|
bool queryStop = false;
|
||||||
bool qComplete = false;
|
bool qComplete = false;
|
||||||
|
|
||||||
|
@ -810,7 +816,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
|
||||||
|
|
||||||
if (QW_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
|
if (QW_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
|
||||||
SOutputData sOutput = {0};
|
SOutputData sOutput = {0};
|
||||||
QW_ERR_JRET(qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput));
|
QW_ERR_JRET(qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rawLen, &rsp, &sOutput));
|
||||||
|
|
||||||
if ((!sOutput.queryEnd) && (DS_BUF_LOW == sOutput.bufStatus || DS_BUF_EMPTY == sOutput.bufStatus)) {
|
if ((!sOutput.queryEnd) && (DS_BUF_LOW == sOutput.bufStatus || DS_BUF_EMPTY == sOutput.bufStatus)) {
|
||||||
QW_TASK_DLOG("task not end and buf is %s, need to continue query", qwBufStatusStr(sOutput.bufStatus));
|
QW_TASK_DLOG("task not end and buf is %s, need to continue query", qwBufStatusStr(sOutput.bufStatus));
|
||||||
|
@ -821,7 +827,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
|
||||||
if (rsp) {
|
if (rsp) {
|
||||||
qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd);
|
qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd);
|
||||||
|
|
||||||
qwBuildFetchRsp(rsp, &sOutput, dataLen, qComplete);
|
qwBuildFetchRsp(rsp, &sOutput, dataLen, rawLen, qComplete);
|
||||||
if (qComplete) {
|
if (qComplete) {
|
||||||
atomic_store_8((int8_t *)&ctx->queryEnd, true);
|
atomic_store_8((int8_t *)&ctx->queryEnd, true);
|
||||||
atomic_store_8((int8_t *)&ctx->queryContinue, 0);
|
atomic_store_8((int8_t *)&ctx->queryContinue, 0);
|
||||||
|
@ -878,6 +884,8 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
|
||||||
int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
|
int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t dataLen = 0;
|
int32_t dataLen = 0;
|
||||||
|
int32_t rawDataLen = 0;
|
||||||
|
|
||||||
bool locked = false;
|
bool locked = false;
|
||||||
SQWTaskCtx *ctx = NULL;
|
SQWTaskCtx *ctx = NULL;
|
||||||
void *rsp = NULL;
|
void *rsp = NULL;
|
||||||
|
@ -896,17 +904,17 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SOutputData sOutput = {0};
|
SOutputData sOutput = {0};
|
||||||
QW_ERR_JRET(qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput));
|
QW_ERR_JRET(qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rawDataLen, &rsp, &sOutput));
|
||||||
|
|
||||||
if (NULL == rsp) {
|
if (NULL == rsp) {
|
||||||
QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_FETCH);
|
QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_FETCH);
|
||||||
} else {
|
} else {
|
||||||
bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd);
|
bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd);
|
||||||
|
|
||||||
qwBuildFetchRsp(rsp, &sOutput, dataLen, qComplete);
|
qwBuildFetchRsp(rsp, &sOutput, dataLen, rawDataLen, qComplete);
|
||||||
|
|
||||||
{
|
{
|
||||||
if (dataLen > 8192) {
|
if (/*dataLen > 8192*/ 0) {
|
||||||
char* p = taosMemoryMalloc(dataLen);
|
char* p = taosMemoryMalloc(dataLen);
|
||||||
int32_t len =
|
int32_t len =
|
||||||
tsCompressString(((SRetrieveTableRsp *)rsp)->data, dataLen, 1, p, dataLen, ONE_STAGE_COMP, NULL, 0);
|
tsCompressString(((SRetrieveTableRsp *)rsp)->data, dataLen, 1, p, dataLen, ONE_STAGE_COMP, NULL, 0);
|
||||||
|
@ -918,8 +926,8 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
|
||||||
((SRetrieveTableRsp*)rsp)->compressed = 1;
|
((SRetrieveTableRsp*)rsp)->compressed = 1;
|
||||||
taosMemoryFree(p);
|
taosMemoryFree(p);
|
||||||
} else {
|
} else {
|
||||||
((SRetrieveTableRsp*)rsp)->payloadLen = ((SRetrieveTableRsp*)rsp)->compLen;
|
// ((SRetrieveTableRsp*)rsp)->payloadLen = ((SRetrieveTableRsp*)rsp)->compLen;
|
||||||
((SRetrieveTableRsp*)rsp)->compressed = 0;
|
// ((SRetrieveTableRsp*)rsp)->compressed = 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1484,6 +1492,7 @@ int32_t qWorkerProcessLocalFetch(void *pMgmt, uint64_t sId, uint64_t qId, uint64
|
||||||
SQWorker *mgmt = (SQWorker *)pMgmt;
|
SQWorker *mgmt = (SQWorker *)pMgmt;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t dataLen = 0;
|
int32_t dataLen = 0;
|
||||||
|
int32_t rawLen = 0;
|
||||||
SQWTaskCtx *ctx = NULL;
|
SQWTaskCtx *ctx = NULL;
|
||||||
void *rsp = NULL;
|
void *rsp = NULL;
|
||||||
bool queryStop = false;
|
bool queryStop = false;
|
||||||
|
@ -1491,7 +1500,6 @@ int32_t qWorkerProcessLocalFetch(void *pMgmt, uint64_t sId, uint64_t qId, uint64
|
||||||
SQWPhaseInput input = {0};
|
SQWPhaseInput input = {0};
|
||||||
|
|
||||||
QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_FETCH, &input, NULL));
|
QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_FETCH, &input, NULL));
|
||||||
|
|
||||||
QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
|
QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
|
||||||
|
|
||||||
ctx->fetchMsgType = TDMT_SCH_MERGE_FETCH;
|
ctx->fetchMsgType = TDMT_SCH_MERGE_FETCH;
|
||||||
|
@ -1500,7 +1508,7 @@ int32_t qWorkerProcessLocalFetch(void *pMgmt, uint64_t sId, uint64_t qId, uint64
|
||||||
SOutputData sOutput = {0};
|
SOutputData sOutput = {0};
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
QW_ERR_JRET(qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput));
|
QW_ERR_JRET(qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rawLen, &rsp, &sOutput));
|
||||||
|
|
||||||
if (NULL == rsp) {
|
if (NULL == rsp) {
|
||||||
QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, &queryStop));
|
QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, &queryStop));
|
||||||
|
@ -1509,7 +1517,7 @@ int32_t qWorkerProcessLocalFetch(void *pMgmt, uint64_t sId, uint64_t qId, uint64
|
||||||
} else {
|
} else {
|
||||||
bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd);
|
bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd);
|
||||||
|
|
||||||
qwBuildFetchRsp(rsp, &sOutput, dataLen, qComplete);
|
qwBuildFetchRsp(rsp, &sOutput, dataLen, rawLen, qComplete);
|
||||||
if (qComplete) {
|
if (qComplete) {
|
||||||
atomic_store_8((int8_t *)&ctx->queryEnd, true);
|
atomic_store_8((int8_t *)&ctx->queryEnd, true);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue