Merge pull request #7007 from taosdata/fix/TD-5474
[TD-5474]<fix> fix runtime error
This commit is contained in:
commit
6f62e807a3
|
@ -199,7 +199,7 @@ bool gcBuildQueryJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result,
|
||||||
|
|
||||||
for (int32_t i = dataFields; i >= 0; i--) {
|
for (int32_t i = dataFields; i >= 0; i--) {
|
||||||
httpJsonItemToken(jsonBuf);
|
httpJsonItemToken(jsonBuf);
|
||||||
if (row[i] == NULL) {
|
if (row == NULL || i >= num_fields || row[i] == NULL) {
|
||||||
httpJsonOriginString(jsonBuf, "null", 4);
|
httpJsonOriginString(jsonBuf, "null", 4);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
|
@ -97,12 +97,47 @@ static UNUSED_FUNC void* u_realloc(void* p, size_t __size) {
|
||||||
#define GET_NUM_OF_TABLEGROUP(q) taosArrayGetSize((q)->tableqinfoGroupInfo.pGroupList)
|
#define GET_NUM_OF_TABLEGROUP(q) taosArrayGetSize((q)->tableqinfoGroupInfo.pGroupList)
|
||||||
#define QUERY_IS_INTERVAL_QUERY(_q) ((_q)->interval.interval > 0)
|
#define QUERY_IS_INTERVAL_QUERY(_q) ((_q)->interval.interval > 0)
|
||||||
|
|
||||||
|
#define TSKEY_MAX_ADD(a,b) \
|
||||||
|
do { \
|
||||||
|
if (a < 0) { a = a + b; break;} \
|
||||||
|
if (sizeof(a) == sizeof(int32_t)) { \
|
||||||
|
if((b) > 0 && ((b) >= INT32_MAX - (a))){\
|
||||||
|
a = INT32_MAX; \
|
||||||
|
} else { \
|
||||||
|
a = a + b; \
|
||||||
|
} \
|
||||||
|
} else { \
|
||||||
|
if((b) > 0 && ((b) >= INT64_MAX - (a))){\
|
||||||
|
a = INT64_MAX; \
|
||||||
|
} else { \
|
||||||
|
a = a + b; \
|
||||||
|
} \
|
||||||
|
} \
|
||||||
|
} while(0)
|
||||||
|
|
||||||
|
#define TSKEY_MIN_SUB(a,b) \
|
||||||
|
do { \
|
||||||
|
if (a >= 0) { a = a + b; break;} \
|
||||||
|
if (sizeof(a) == sizeof(int32_t)){ \
|
||||||
|
if((b) < 0 && ((b) <= INT32_MIN - (a))){\
|
||||||
|
a = INT32_MIN; \
|
||||||
|
} else { \
|
||||||
|
a = a + b; \
|
||||||
|
} \
|
||||||
|
} else { \
|
||||||
|
if((b) < 0 && ((b) <= INT64_MIN-(a))) {\
|
||||||
|
a = INT64_MIN; \
|
||||||
|
} else { \
|
||||||
|
a = a + b; \
|
||||||
|
} \
|
||||||
|
} \
|
||||||
|
} while (0)
|
||||||
|
|
||||||
uint64_t queryHandleId = 0;
|
uint64_t queryHandleId = 0;
|
||||||
|
|
||||||
int32_t getMaximumIdleDurationSec() {
|
int32_t getMaximumIdleDurationSec() {
|
||||||
return tsShellActivityTimer * 2;
|
return tsShellActivityTimer * 2;
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t genQueryId(void) {
|
int64_t genQueryId(void) {
|
||||||
int64_t uid = 0;
|
int64_t uid = 0;
|
||||||
int64_t did = tsDnodeId;
|
int64_t did = tsDnodeId;
|
||||||
|
@ -3124,7 +3159,9 @@ void setTagValue(SOperatorInfo* pOperatorInfo, void *pTable, SQLFunctionCtx* pCt
|
||||||
|| pLocalExprInfo->base.resType == TSDB_DATA_TYPE_TIMESTAMP) {
|
|| pLocalExprInfo->base.resType == TSDB_DATA_TYPE_TIMESTAMP) {
|
||||||
memcpy(pRuntimeEnv->tagVal + offset, &pCtx[idx].tag.i64, pLocalExprInfo->base.resBytes);
|
memcpy(pRuntimeEnv->tagVal + offset, &pCtx[idx].tag.i64, pLocalExprInfo->base.resBytes);
|
||||||
} else {
|
} else {
|
||||||
memcpy(pRuntimeEnv->tagVal + offset, pCtx[idx].tag.pz, pCtx[idx].tag.nLen);
|
if (pCtx[idx].tag.pz != NULL) {
|
||||||
|
memcpy(pRuntimeEnv->tagVal + offset, pCtx[idx].tag.pz, pCtx[idx].tag.nLen);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
offset += pLocalExprInfo->base.resBytes;
|
offset += pLocalExprInfo->base.resBytes;
|
||||||
|
@ -3934,8 +3971,8 @@ static void toSSDataBlock(SGroupResInfo *pGroupResInfo, SQueryRuntimeEnv* pRunti
|
||||||
|
|
||||||
// refactor : extract method
|
// refactor : extract method
|
||||||
SColumnInfoData* pInfoData = taosArrayGet(pBlock->pDataBlock, 0);
|
SColumnInfoData* pInfoData = taosArrayGet(pBlock->pDataBlock, 0);
|
||||||
|
//add condition (pBlock->info.rows >= 1) just to runtime happy
|
||||||
if (pInfoData->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
|
if (pInfoData->info.type == TSDB_DATA_TYPE_TIMESTAMP && pBlock->info.rows >= 1) {
|
||||||
STimeWindow* w = &pBlock->info.window;
|
STimeWindow* w = &pBlock->info.window;
|
||||||
w->skey = *(int64_t*)pInfoData->pData;
|
w->skey = *(int64_t*)pInfoData->pData;
|
||||||
w->ekey = *(int64_t*)(((char*)pInfoData->pData) + TSDB_KEYSIZE * (pBlock->info.rows - 1));
|
w->ekey = *(int64_t*)(((char*)pInfoData->pData) + TSDB_KEYSIZE * (pBlock->info.rows - 1));
|
||||||
|
@ -5273,7 +5310,15 @@ static SSDataBlock* doSTableAggregate(void* param, bool* newgroup) {
|
||||||
// the pDataBlock are always the same one, no need to call this again
|
// the pDataBlock are always the same one, no need to call this again
|
||||||
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order);
|
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order);
|
||||||
|
|
||||||
TSKEY key = QUERY_IS_ASC_QUERY(pQueryAttr)? pBlock->info.window.ekey + 1:pBlock->info.window.skey-1;
|
TSKEY key = 0;
|
||||||
|
if (QUERY_IS_ASC_QUERY(pQueryAttr)) {
|
||||||
|
key = pBlock->info.window.ekey;
|
||||||
|
TSKEY_MAX_ADD(key, 1);
|
||||||
|
} else {
|
||||||
|
key = pBlock->info.window.skey;
|
||||||
|
TSKEY_MIN_SUB(key, -1);
|
||||||
|
}
|
||||||
|
|
||||||
setExecutionContext(pRuntimeEnv, pInfo, pOperator->numOfOutput, pRuntimeEnv->current->groupIndex, key);
|
setExecutionContext(pRuntimeEnv, pInfo, pOperator->numOfOutput, pRuntimeEnv->current->groupIndex, key);
|
||||||
doAggregateImpl(pOperator, pQueryAttr->window.skey, pInfo->pCtx, pBlock);
|
doAggregateImpl(pOperator, pQueryAttr->window.skey, pInfo->pCtx, pBlock);
|
||||||
}
|
}
|
||||||
|
|
|
@ -722,7 +722,8 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea
|
||||||
// OK,let's load row from backward to get not-null column
|
// OK,let's load row from backward to get not-null column
|
||||||
for (int32_t rowId = pBlock->numOfRows - 1; rowId >= 0; rowId--) {
|
for (int32_t rowId = pBlock->numOfRows - 1; rowId >= 0; rowId--) {
|
||||||
SDataCol *pDataCol = pReadh->pDCols[0]->cols + i;
|
SDataCol *pDataCol = pReadh->pDCols[0]->cols + i;
|
||||||
tdAppendColVal(memRowDataBody(row), tdGetColDataOfRow(pDataCol, rowId), pCol->type, pCol->offset);
|
const void* pColData = tdGetColDataOfRow(pDataCol, rowId);
|
||||||
|
tdAppendColVal(memRowDataBody(row), pColData, pCol->type, pCol->offset);
|
||||||
//SDataCol *pDataCol = readh.pDCols[0]->cols + j;
|
//SDataCol *pDataCol = readh.pDCols[0]->cols + j;
|
||||||
void *value = tdGetRowDataOfCol(memRowDataBody(row), (int8_t)pCol->type, TD_DATA_ROW_HEAD_SIZE + pCol->offset);
|
void *value = tdGetRowDataOfCol(memRowDataBody(row), (int8_t)pCol->type, TD_DATA_ROW_HEAD_SIZE + pCol->offset);
|
||||||
if (isNull(value, pCol->type)) {
|
if (isNull(value, pCol->type)) {
|
||||||
|
@ -735,11 +736,12 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
// save not-null column
|
// save not-null column
|
||||||
|
uint16_t bytes = IS_VAR_DATA_TYPE(pCol->type) ? varDataTLen(pColData) : pCol->bytes;
|
||||||
SDataCol *pLastCol = &(pTable->lastCols[idx]);
|
SDataCol *pLastCol = &(pTable->lastCols[idx]);
|
||||||
pLastCol->pData = malloc(pCol->bytes);
|
pLastCol->pData = malloc(bytes);
|
||||||
pLastCol->bytes = pCol->bytes;
|
pLastCol->bytes = bytes;
|
||||||
pLastCol->colId = pCol->colId;
|
pLastCol->colId = pCol->colId;
|
||||||
memcpy(pLastCol->pData, value, pCol->bytes);
|
memcpy(pLastCol->pData, value, bytes);
|
||||||
|
|
||||||
// save row ts(in column 0)
|
// save row ts(in column 0)
|
||||||
pDataCol = pReadh->pDCols[0]->cols + 0;
|
pDataCol = pReadh->pDCols[0]->cols + 0;
|
||||||
|
|
|
@ -1019,7 +1019,7 @@ static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SMemRow ro
|
||||||
|
|
||||||
if (isDataRow) {
|
if (isDataRow) {
|
||||||
value = tdGetRowDataOfCol(memRowDataBody(row), (int8_t)pTCol->type,
|
value = tdGetRowDataOfCol(memRowDataBody(row), (int8_t)pTCol->type,
|
||||||
TD_DATA_ROW_HEAD_SIZE + pSchema->columns[j].offset);
|
TD_DATA_ROW_HEAD_SIZE + pTCol->offset);
|
||||||
} else {
|
} else {
|
||||||
// SKVRow
|
// SKVRow
|
||||||
SColIdx *pColIdx = tdGetKVRowIdxOfCol(memRowKvBody(row), pTCol->colId);
|
SColIdx *pColIdx = tdGetKVRowIdxOfCol(memRowKvBody(row), pTCol->colId);
|
||||||
|
@ -1034,14 +1034,17 @@ static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SMemRow ro
|
||||||
|
|
||||||
SDataCol *pDataCol = &(pLatestCols[idx]);
|
SDataCol *pDataCol = &(pLatestCols[idx]);
|
||||||
if (pDataCol->pData == NULL) {
|
if (pDataCol->pData == NULL) {
|
||||||
pDataCol->pData = malloc(pSchema->columns[j].bytes);
|
pDataCol->pData = malloc(pTCol->bytes);
|
||||||
pDataCol->bytes = pSchema->columns[j].bytes;
|
pDataCol->bytes = pTCol->bytes;
|
||||||
} else if (pDataCol->bytes < pSchema->columns[j].bytes) {
|
} else if (pDataCol->bytes < pTCol->bytes) {
|
||||||
pDataCol->pData = realloc(pDataCol->pData, pSchema->columns[j].bytes);
|
pDataCol->pData = realloc(pDataCol->pData, pTCol->bytes);
|
||||||
pDataCol->bytes = pSchema->columns[j].bytes;
|
pDataCol->bytes = pTCol->bytes;
|
||||||
}
|
}
|
||||||
|
// the actual value size
|
||||||
memcpy(pDataCol->pData, value, pDataCol->bytes);
|
uint16_t bytes = IS_VAR_DATA_TYPE(pTCol->type) ? varDataTLen(value) : pTCol->bytes;
|
||||||
|
// the actual data size CANNOT larger than column size
|
||||||
|
assert(pTCol->bytes >= bytes);
|
||||||
|
memcpy(pDataCol->pData, value, bytes);
|
||||||
//tsdbInfo("updateTableLatestColumn vgId:%d cache column %d for %d,%s", REPO_ID(pRepo), j, pDataCol->bytes, (char*)pDataCol->pData);
|
//tsdbInfo("updateTableLatestColumn vgId:%d cache column %d for %d,%s", REPO_ID(pRepo), j, pDataCol->bytes, (char*)pDataCol->pData);
|
||||||
pDataCol->ts = memRowKey(row);
|
pDataCol->ts = memRowKey(row);
|
||||||
}
|
}
|
||||||
|
|
|
@ -640,7 +640,7 @@ static STableGroupInfo* trimTableGroup(STimeWindow* window, STableGroupInfo* pGr
|
||||||
size_t numOfGroup = taosArrayGetSize(pGroupList->pGroupList);
|
size_t numOfGroup = taosArrayGetSize(pGroupList->pGroupList);
|
||||||
|
|
||||||
STableGroupInfo* pNew = calloc(1, sizeof(STableGroupInfo));
|
STableGroupInfo* pNew = calloc(1, sizeof(STableGroupInfo));
|
||||||
pNew->pGroupList = taosArrayInit(numOfGroup, sizeof(SArray));
|
pNew->pGroupList = taosArrayInit(numOfGroup, POINTER_BYTES);
|
||||||
|
|
||||||
for(int32_t i = 0; i < numOfGroup; ++i) {
|
for(int32_t i = 0; i < numOfGroup; ++i) {
|
||||||
SArray* oneGroup = taosArrayGetP(pGroupList->pGroupList, i);
|
SArray* oneGroup = taosArrayGetP(pGroupList->pGroupList, i);
|
||||||
|
@ -3383,11 +3383,13 @@ static int32_t tableGroupComparFn(const void *p1, const void *p2, const void *pa
|
||||||
type = TSDB_DATA_TYPE_BINARY;
|
type = TSDB_DATA_TYPE_BINARY;
|
||||||
bytes = tGetTbnameColumnSchema()->bytes;
|
bytes = tGetTbnameColumnSchema()->bytes;
|
||||||
} else {
|
} else {
|
||||||
STColumn* pCol = schemaColAt(pTableGroupSupp->pTagSchema, colIndex);
|
if (pTableGroupSupp->pTagSchema && colIndex < pTableGroupSupp->pTagSchema->numOfCols) {
|
||||||
bytes = pCol->bytes;
|
STColumn* pCol = schemaColAt(pTableGroupSupp->pTagSchema, colIndex);
|
||||||
type = pCol->type;
|
bytes = pCol->bytes;
|
||||||
f1 = tdGetKVRowValOfCol(pTable1->tagVal, pCol->colId);
|
type = pCol->type;
|
||||||
f2 = tdGetKVRowValOfCol(pTable2->tagVal, pCol->colId);
|
f1 = tdGetKVRowValOfCol(pTable1->tagVal, pCol->colId);
|
||||||
|
f2 = tdGetKVRowValOfCol(pTable2->tagVal, pCol->colId);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// this tags value may be NULL
|
// this tags value may be NULL
|
||||||
|
|
|
@ -159,7 +159,7 @@ int tsCompressINTImp(const char *const input, const int nelements, char *const o
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
// Get difference.
|
// Get difference.
|
||||||
if (!safeInt64Add(curr_value, -prev_value)) goto _copy_and_exit;
|
if (!safeInt64Add(curr_value, -prev_value_tmp)) goto _copy_and_exit;
|
||||||
|
|
||||||
int64_t diff = curr_value - prev_value_tmp;
|
int64_t diff = curr_value - prev_value_tmp;
|
||||||
// Zigzag encode the value.
|
// Zigzag encode the value.
|
||||||
|
|
Loading…
Reference in New Issue