[td-1373]
This commit is contained in:
parent
0569672503
commit
0f2c6f3181
|
@ -4025,11 +4025,11 @@ static void ts_comp_function(SQLFunctionCtx *pCtx) {
|
|||
|
||||
// primary ts must be existed, so no need to check its existance
|
||||
if (pCtx->order == TSDB_ORDER_ASC) {
|
||||
tsBufAppend(pTSbuf, 0, &pCtx->tag, input, pCtx->size * TSDB_KEYSIZE);
|
||||
tsBufAppend(pTSbuf, pCtx->param[0].i64Key, &pCtx->tag, input, pCtx->size * TSDB_KEYSIZE);
|
||||
} else {
|
||||
for (int32_t i = pCtx->size - 1; i >= 0; --i) {
|
||||
char *d = GET_INPUT_CHAR_INDEX(pCtx, i);
|
||||
tsBufAppend(pTSbuf, 0, &pCtx->tag, d, TSDB_KEYSIZE);
|
||||
tsBufAppend(pTSbuf, pCtx->param[0].i64Key, &pCtx->tag, d, TSDB_KEYSIZE);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -158,9 +158,9 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJ
|
|||
tsBufDestroy(pSupporter1->pTSBuf);
|
||||
tsBufDestroy(pSupporter2->pTSBuf);
|
||||
|
||||
tscDebug("%p input1:%" PRId64 ", input2:%" PRId64 ", final:%" PRId64 " for secondary query after ts blocks "
|
||||
"intersecting, skey:%" PRId64 ", ekey:%" PRId64, pSql, numOfInput1, numOfInput2, output1->numOfTotal,
|
||||
win->skey, win->ekey);
|
||||
tscDebug("%p input1:%" PRId64 ", input2:%" PRId64 ", final:%" PRId64 " in %d vnodes for secondary query after ts blocks "
|
||||
"intersecting, skey:%" PRId64 ", ekey:%" PRId64 ", numOfVnode:%d", pSql, numOfInput1, numOfInput2, output1->numOfTotal,
|
||||
output1->numOfVnodes, win->skey, win->ekey, tsBufGetNumOfVnodes(output1));
|
||||
|
||||
return output1->numOfTotal;
|
||||
}
|
||||
|
|
|
@ -136,6 +136,10 @@ void tsBufSetCursor(STSBuf* pTSBuf, STSCursor* pCur);
|
|||
*/
|
||||
void tsBufDisplay(STSBuf* pTSBuf);
|
||||
|
||||
int32_t tsBufGetNumOfVnodes(STSBuf* pTSBuf);
|
||||
|
||||
void tsBufGetVnodeIdList(STSBuf* pTSBuf, int32_t* num, int32_t** vnodeId);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -1005,9 +1005,10 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
|
|||
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
SQInfo* pQInfo = GET_QINFO_ADDR(pQuery);
|
||||
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
|
||||
char *dataBlock = getDataBlock(pRuntimeEnv, &sasArray[k], k, pDataBlockInfo->rows, pDataBlock);
|
||||
setExecParams(pQuery, &pCtx[k], dataBlock, tsCols, pDataBlockInfo, pStatis, &sasArray[k], k);
|
||||
setExecParams(pQuery, &pCtx[k], dataBlock, tsCols, pDataBlockInfo, pStatis, &sasArray[k], k, pQInfo->vgId);
|
||||
}
|
||||
|
||||
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
|
||||
|
@ -1469,7 +1470,7 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBl
|
|||
}
|
||||
|
||||
void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inputData, TSKEY *tsCol, SDataBlockInfo* pBlockInfo,
|
||||
SDataStatis *pStatis, void *param, int32_t colIndex) {
|
||||
SDataStatis *pStatis, void *param, int32_t colIndex, int32_t vgId) {
|
||||
|
||||
int32_t functionId = pQuery->pSelectExpr[colIndex].base.functionId;
|
||||
int32_t colId = pQuery->pSelectExpr[colIndex].base.colInfo.colId;
|
||||
|
@ -1542,6 +1543,9 @@ void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inputData, TSKEY
|
|||
}
|
||||
}
|
||||
}
|
||||
} else if (functionId == TSDB_FUNC_TS_COMP) {
|
||||
pCtx->param[0].i64Key = vgId;
|
||||
pCtx->param[0].nType = TSDB_DATA_TYPE_BIGINT;
|
||||
}
|
||||
|
||||
#if defined(_DEBUG_VIEW)
|
||||
|
|
|
@ -561,6 +561,19 @@ static void tsBufGetBlock(STSBuf* pTSBuf, int32_t vnodeIndex, int32_t blockIndex
|
|||
pCur->tsIndex = (pCur->order == TSDB_ORDER_ASC) ? 0 : pBlock->numOfElem - 1;
|
||||
}
|
||||
|
||||
static int32_t doUpdateVnodeInfo(STSBuf* pTSBuf, int64_t offset, STSVnodeBlockInfo* pVInfo) {
|
||||
if (offset < 0 || offset >= getDataStartOffset()) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (fseek(pTSBuf->f, (int32_t)offset, SEEK_SET) != 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
fwrite(pVInfo, sizeof(STSVnodeBlockInfo), 1, pTSBuf->f);
|
||||
return 0;
|
||||
}
|
||||
|
||||
STSVnodeBlockInfo* tsBufGetVnodeBlockInfo(STSBuf* pTSBuf, int32_t vnodeId) {
|
||||
int32_t j = tsBufFindVnodeIndexFromId(pTSBuf->pData, pTSBuf->numOfVnodes, vnodeId);
|
||||
if (j == -1) {
|
||||
|
@ -915,19 +928,6 @@ static int32_t getDataStartOffset() {
|
|||
return sizeof(STSBufFileHeader) + TS_COMP_FILE_VNODE_MAX * sizeof(STSVnodeBlockInfo);
|
||||
}
|
||||
|
||||
static int32_t doUpdateVnodeInfo(STSBuf* pTSBuf, int64_t offset, STSVnodeBlockInfo* pVInfo) {
|
||||
if (offset < 0 || offset >= getDataStartOffset()) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (fseek(pTSBuf->f, (int32_t)offset, SEEK_SET) != 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
fwrite(pVInfo, sizeof(STSVnodeBlockInfo), 1, pTSBuf->f);
|
||||
return 0;
|
||||
}
|
||||
|
||||
// update prev vnode length info in file
|
||||
static void TSBufUpdateVnodeInfo(STSBuf* pTSBuf, int32_t index, STSVnodeBlockInfo* pBlockInfo) {
|
||||
int32_t offset = sizeof(STSBufFileHeader) + index * sizeof(STSVnodeBlockInfo);
|
||||
|
@ -969,3 +969,29 @@ static STSBuf* allocResForTSBuf(STSBuf* pTSBuf) {
|
|||
pTSBuf->fileSize += getDataStartOffset();
|
||||
return pTSBuf;
|
||||
}
|
||||
|
||||
int32_t tsBufGetNumOfVnodes(STSBuf* pTSBuf) {
|
||||
if (pTSBuf == NULL) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
return pTSBuf->pData->len;
|
||||
}
|
||||
|
||||
void tsBufGetVnodeIdList(STSBuf* pTSBuf, int32_t* num, int32_t** vnodeId) {
|
||||
int32_t size = tsBugGetNumOfVnodes(pTSBuf);
|
||||
if (num != NULL) {
|
||||
*num = size;
|
||||
}
|
||||
|
||||
*vnodeId = NULL;
|
||||
if (size == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
(*vnodeId) = malloc(tsBugGetNumOfVnodes(pTSBuf) * sizeof(int32_t));
|
||||
|
||||
for(int32_t i = 0; i < size; ++i) {
|
||||
(*vnodeId)[i] = pTSBuf->pData[i].info.vnode;
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue