Docs Menu
Docs Home
/ / /
Java Reactive Streams Driver

Aggregation Framework

On this page

  • Prerequisites
  • Connect to a MongoDB Deployment
  • Perform Aggregation
  • Use Aggregation Expressions
  • Explain an Aggregation
  • Atlas Search
  • Create Pipeline Search Stages
  • Additional Information
  • API Documentation

The aggregation pipeline is a framework for data aggregation, modeled on the concept of data processing pipelines.

To learn more about aggregation, see Aggregation Pipeline in the Server manual.

You must set up the following components to run the code examples in this guide:

  • A test.restaurants collection populated with documents from the restaurants.json file in the documentation assets GitHub.

  • The following import statements:

import com.mongodb.reactivestreams.client.MongoClients;
import com.mongodb.reactivestreams.client.MongoClient;
import com.mongodb.reactivestreams.client.MongoCollection;
import com.mongodb.reactivestreams.client.MongoDatabase;
import com.mongodb.client.model.Aggregates;
import com.mongodb.client.model.Accumulators;
import com.mongodb.client.model.Projections;
import com.mongodb.client.model.Filters;
import org.bson.Document;

Important

This guide uses custom Subscriber implementations, which are described in the Sample Custom Subscriber Implementations guide.

First, connect to a MongoDB deployment, then declare and define MongoDatabase and MongoCollection instances.

The following code connects to a standalone MongoDB deployment running on localhost on port 27017. Then, it defines the database variable to refer to the test database and the collection variable to refer to the restaurants collection:

MongoClient mongoClient = MongoClients.create();
MongoDatabase database = mongoClient.getDatabase("test");
MongoCollection<Document> collection = database.getCollection("restaurants");

To learn more about connecting to MongoDB deployments, see the Connect to MongoDB tutorial.

To perform aggregation, pass a list of aggregation stages to the MongoCollection.aggregate() method. The driver provides the Aggregates helper class that contains builders for aggregation stages.

In this example, the aggregation pipeline performs the following tasks:

  • Uses a $match stage to filter for documents in which the categories array field contains the element "Bakery". The example uses Aggregates.match() to build the $match stage.

  • Uses a $group stage to group the matching documents by the stars field, accumulating a count of documents for each distinct value of stars. The example uses Aggregates.group() to build the $group stage and Accumulators.sum() to build the accumulator expression. For the accumulator expressions for use within the $group stage, the driver provides Accumulators helper class.

collection.aggregate(
Arrays.asList(
Aggregates.match(Filters.eq("categories", "Bakery")),
Aggregates.group("$stars", Accumulators.sum("count", 1))
)
).subscribe(new PrintDocumentSubscriber());

For $group accumulator expressions, the driver provides the Accumulators helper class. For other aggregation expressions, manually build the expression by using the Document class.

In the following example, the aggregation pipeline uses a $project stage to return only the name field and the calculated field firstCategory whose value is the first element in the categories array. The example uses Aggregates.project() and various Projections class methods to build the $project stage:

collection.aggregate(
Arrays.asList(
Aggregates.project(
Projections.fields(
Projections.excludeId(),
Projections.include("name"),
Projections.computed(
"firstCategory",
new Document("$arrayElemAt", Arrays.asList("$categories", 0))
)
)
)
)
).subscribe(new PrintDocumentSubscriber());

To $explain an aggregation pipeline, call the AggregatePublisher.explain() method:

collection.aggregate(
Arrays.asList(
Aggregates.match(Filters.eq("categories", "Bakery")),
Aggregates.group("$stars", Accumulators.sum("count", 1))))
.explain()
.subscribe(new PrintDocumentSubscriber());

You can perform an Atlas Search query by creating and running an aggregation pipeline that contains one of the following pipeline stages:

  • $search

  • $searchMeta

The Java Reactive Streams driver provides the Aggregates.search() and Aggregates.searchMeta() methods to perform Atlas Search queries.

To learn more about Atlas Search pipeline stages, see Choose the Aggregation Pipeline Stage in the Atlas documentation.

You can create the search criteria in your Atlas Search pipeline stage by using Search operators.

The Java Reactive Streams driver provides helper methods for the following operators:

Operator
Description

Performs a search for a word or phrase that contains a sequence of characters from an incomplete input string.

Combines two or more operators into a single query.

Checks whether a field matches a value you specify. Maps to the equals() and equalsNull() methods

Tests if a path to a specified indexed field name exists in a document.

Performs a search for an array of BSON number, date, boolean, objectId, uuid, or string values at the given path and returns documents where the value of the field equals any value in the specified array.

Returns documents similar to input documents.

Supports querying and scoring numeric, date, and GeoJSON point values.

Performs a search for documents containing an ordered sequence of terms using the analyzer specified in the index configuration.

Supports querying a combination of indexed fields and values.

Supports querying and scoring numeric, date, and string values. Maps to the numberRange() and dateRange() methods

Interprets the query field as a regular expression.

Performs a full-text search using the analyzer that you specify in the index configuration.

Enables queries which use special characters in the search string that can match any character.

Note

Atlas Sample Dataset

This example uses the sample_mflix.movies collection from the Atlas sample datasets. To learn how to set up a free-tier Atlas cluster and load the sample dataset, see the Get Started with Atlas tutorial in the Atlas documentation.

Before you can run this example, you must create an Atlas Search index on the movies collection that has the following definition:

{
"mappings": {
"dynamic": true,
"fields": {
"title": {
"analyzer": "lucene.keyword",
"type": "string"
},
"genres": {
"normalizer": "lowercase",
"type": "token"
}
}
}
}

To learn more about creating Atlas Search indexes, see the Atlas Search Index Management section of the Indexes guide.

The following code creates a $search stage that has the following specifications:

  • Checks that the genres array includes "Comedy"

  • Searches the fullplot field for the phrase "new york"

  • Matches year values between 1950 and 2000, inclusive

  • Searches for title values that begins with the term "Love"

Bson searchStageFilters = Aggregates.search(
SearchOperator.compound()
.filter(
List.of(
SearchOperator.in(fieldPath("genres"), List.of("Comedy")),
SearchOperator.phrase(fieldPath("fullplot"), "new york"),
SearchOperator.numberRange(fieldPath("year")).gtLt(1950, 2000),
SearchOperator.wildcard(fieldPath("title"), "Love *")
)));
Bson projection = Aggregates.project(Projections.fields(
Projections.include("title", "year", "genres")
));
List<Bson> aggregateStages = List.of(searchStageFilters, projection);
Publisher<Document> publisher = movies.aggregate(aggregateStages);
publisher.subscribe(new SubscriberHelpers.PrintDocumentSubscriber());
Mono.from(publisher).block();
{"_id": ..., "genres": ["Comedy", "Romance"], "title": "Love at First Bite", "year": 1979}
{"_id": ..., "genres": ["Comedy", "Drama"], "title": "Love Affair", "year": 1994}

To learn more about the Atlas Search helper methods, see the SearchOperator interface reference in the Driver Core API documentation.

To view a full list of expression operators, see Aggregation Operators in the MongoDB Server manual.

To learn about assembling an aggregation pipeline and view examples, see Aggregation Pipeline in the MongoDB Server manual.

To learn more about creating pipeline stages, see Aggregation Stages in the MongoDB Server manual.

To learn more about explaining MongoDB operations, see Explain Output and Query Plans in the MongoDB Server manual.

To learn more about the classes and methods mentioned in this guide, see the following API documentation:

Back

Indexes