fix(query): add value check.
This commit is contained in:
parent
97b802ad77
commit
597ea1c2bf
|
@ -1672,7 +1672,12 @@ static int32_t doConvertJson(SReqResultInfo* pResultInfo, int32_t numOfCols, int
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (!needConvert) return TSDB_CODE_SUCCESS;
|
|
||||||
|
if (!needConvert) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
tscDebug("start to convert form json format string");
|
||||||
|
|
||||||
char* p = (char*)pResultInfo->pData;
|
char* p = (char*)pResultInfo->pData;
|
||||||
int32_t dataLen = estimateJsonLen(pResultInfo, numOfCols, numOfRows);
|
int32_t dataLen = estimateJsonLen(pResultInfo, numOfCols, numOfRows);
|
||||||
|
|
|
@ -2120,6 +2120,7 @@ void blockEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_
|
||||||
int32_t* rows = (int32_t*)data;
|
int32_t* rows = (int32_t*)data;
|
||||||
*rows = pBlock->info.rows;
|
*rows = pBlock->info.rows;
|
||||||
data += sizeof(int32_t);
|
data += sizeof(int32_t);
|
||||||
|
ASSERT(*rows > 0);
|
||||||
|
|
||||||
int32_t* cols = (int32_t*)data;
|
int32_t* cols = (int32_t*)data;
|
||||||
*cols = numOfCols;
|
*cols = numOfCols;
|
||||||
|
@ -2183,6 +2184,8 @@ void blockEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_
|
||||||
|
|
||||||
*actualLen = *dataLen;
|
*actualLen = *dataLen;
|
||||||
*groupId = pBlock->info.groupId;
|
*groupId = pBlock->info.groupId;
|
||||||
|
ASSERT(*dataLen > 0);
|
||||||
|
uDebug("build data block, actualLen:%d, rows:%d, cols:%d", *dataLen, *rows, *cols);
|
||||||
}
|
}
|
||||||
|
|
||||||
const char* blockDecode(SSDataBlock* pBlock, const char* pData) {
|
const char* blockDecode(SSDataBlock* pBlock, const char* pData) {
|
||||||
|
|
|
@ -168,7 +168,9 @@ static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, bool* pQueryE
|
||||||
taosReadQitem(pDeleter->pDataBlocks, (void**)&pBuf);
|
taosReadQitem(pDeleter->pDataBlocks, (void**)&pBuf);
|
||||||
memcpy(&pDeleter->nextOutput, pBuf, sizeof(SDataDeleterBuf));
|
memcpy(&pDeleter->nextOutput, pBuf, sizeof(SDataDeleterBuf));
|
||||||
taosFreeQitem(pBuf);
|
taosFreeQitem(pBuf);
|
||||||
*pLen = ((SDataCacheEntry*)(pDeleter->nextOutput.pData))->dataLen;
|
|
||||||
|
SDataCacheEntry* pEntry = (SDataCacheEntry*)pDeleter->nextOutput.pData;
|
||||||
|
*pLen = pEntry->dataLen;
|
||||||
*pQueryEnd = pDeleter->queryEnd;
|
*pQueryEnd = pDeleter->queryEnd;
|
||||||
qDebug("got data len %" PRId64 ", row num %d in sink", *pLen, ((SDataCacheEntry*)(pDeleter->nextOutput.pData))->numOfRows);
|
qDebug("got data len %" PRId64 ", row num %d in sink", *pLen, ((SDataCacheEntry*)(pDeleter->nextOutput.pData))->numOfRows);
|
||||||
}
|
}
|
||||||
|
|
|
@ -93,6 +93,8 @@ static void toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData* pIn
|
||||||
|
|
||||||
pBuf->useSize = sizeof(SDataCacheEntry);
|
pBuf->useSize = sizeof(SDataCacheEntry);
|
||||||
blockEncode(pInput->pData, pEntry->data, &pEntry->dataLen, numOfCols, pEntry->compressed);
|
blockEncode(pInput->pData, pEntry->data, &pEntry->dataLen, numOfCols, pEntry->compressed);
|
||||||
|
ASSERT(pEntry->numOfRows == *(int32_t*)(pEntry->data+8));
|
||||||
|
ASSERT(pEntry->numOfCols == *(int32_t*)(pEntry->data+8+4));
|
||||||
|
|
||||||
pBuf->useSize += pEntry->dataLen;
|
pBuf->useSize += pEntry->dataLen;
|
||||||
|
|
||||||
|
@ -170,7 +172,13 @@ static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, bool* pQueryE
|
||||||
taosReadQitem(pDispatcher->pDataBlocks, (void**)&pBuf);
|
taosReadQitem(pDispatcher->pDataBlocks, (void**)&pBuf);
|
||||||
memcpy(&pDispatcher->nextOutput, pBuf, sizeof(SDataDispatchBuf));
|
memcpy(&pDispatcher->nextOutput, pBuf, sizeof(SDataDispatchBuf));
|
||||||
taosFreeQitem(pBuf);
|
taosFreeQitem(pBuf);
|
||||||
*pLen = ((SDataCacheEntry*)(pDispatcher->nextOutput.pData))->dataLen;
|
|
||||||
|
SDataCacheEntry* pEntry = (SDataCacheEntry*)pDispatcher->nextOutput.pData;
|
||||||
|
*pLen = pEntry->dataLen;
|
||||||
|
|
||||||
|
ASSERT(pEntry->numOfRows == *(int32_t*)(pEntry->data+8));
|
||||||
|
ASSERT(pEntry->numOfCols == *(int32_t*)(pEntry->data+8+4));
|
||||||
|
|
||||||
*pQueryEnd = pDispatcher->queryEnd;
|
*pQueryEnd = pDispatcher->queryEnd;
|
||||||
qDebug("got data len %" PRId64 ", row num %d in sink", *pLen, ((SDataCacheEntry*)(pDispatcher->nextOutput.pData))->numOfRows);
|
qDebug("got data len %" PRId64 ", row num %d in sink", *pLen, ((SDataCacheEntry*)(pDispatcher->nextOutput.pData))->numOfRows);
|
||||||
}
|
}
|
||||||
|
@ -191,6 +199,9 @@ static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) {
|
||||||
pOutput->numOfCols = pEntry->numOfCols;
|
pOutput->numOfCols = pEntry->numOfCols;
|
||||||
pOutput->compressed = pEntry->compressed;
|
pOutput->compressed = pEntry->compressed;
|
||||||
|
|
||||||
|
ASSERT(pEntry->numOfRows == *(int32_t*)(pEntry->data+8));
|
||||||
|
ASSERT(pEntry->numOfCols == *(int32_t*)(pEntry->data+8+4));
|
||||||
|
|
||||||
atomic_sub_fetch_64(&pDispatcher->cachedSize, pEntry->dataLen);
|
atomic_sub_fetch_64(&pDispatcher->cachedSize, pEntry->dataLen);
|
||||||
atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pEntry->dataLen);
|
atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pEntry->dataLen);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue