Aggregation Framework
On this page
The aggregation pipeline is a framework for data aggregation, modeled on the concept of data processing pipelines.
To learn more about aggregation, see Aggregation Pipeline in the Server manual.
Prerequisites
You must set up the following components to run the code examples in this guide:
A
test.restaurants
collection populated with documents from therestaurants.json
file in the documentation assets GitHub.The following import statements:
import com.mongodb.reactivestreams.client.MongoClients; import com.mongodb.reactivestreams.client.MongoClient; import com.mongodb.reactivestreams.client.MongoCollection; import com.mongodb.reactivestreams.client.MongoDatabase; import com.mongodb.client.model.Aggregates; import com.mongodb.client.model.Accumulators; import com.mongodb.client.model.Projections; import com.mongodb.client.model.Filters; import org.bson.Document;
Important
This guide uses custom Subscriber
implementations, which are
described in the Sample Custom Subscriber Implementations guide.
Connect to a MongoDB Deployment
First, connect to a MongoDB deployment, then declare and define
MongoDatabase
and MongoCollection
instances.
The following code connects to a standalone
MongoDB deployment running on localhost
on port 27017
. Then, it
defines the database
variable to refer to the test
database and
the collection
variable to refer to the restaurants
collection:
MongoClient mongoClient = MongoClients.create(); MongoDatabase database = mongoClient.getDatabase("test"); MongoCollection<Document> collection = database.getCollection("restaurants");
To learn more about connecting to MongoDB deployments, see the Connect to MongoDB tutorial.
Perform Aggregation
To perform aggregation, pass a list of aggregation stages to the
MongoCollection.aggregate()
method. The driver provides the
Aggregates
helper class that contains builders for aggregation
stages.
In this example, the aggregation pipeline performs the following tasks:
Uses a
$match
stage to filter for documents in which thecategories
array field contains the element"Bakery"
. The example usesAggregates.match()
to build the$match
stage.
Uses a
$group
stage to group the matching documents by thestars
field, accumulating a count of documents for each distinct value ofstars
. The example usesAggregates.group()
to build the$group
stage andAccumulators.sum()
to build the accumulator expression. For the accumulator expressions for use within the$group
stage, the driver providesAccumulators
helper class.
collection.aggregate( Arrays.asList( Aggregates.match(Filters.eq("categories", "Bakery")), Aggregates.group("$stars", Accumulators.sum("count", 1)) ) ).subscribe(new PrintDocumentSubscriber());
Use Aggregation Expressions
For $group
accumulator expressions, the driver provides the
Accumulators
helper class. For other aggregation expressions,
manually build the expression by using the Document
class.
In the following example, the aggregation pipeline uses a
$project
stage to return only the name
field and the calculated
field firstCategory
whose value is the first element in the
categories
array. The example uses Aggregates.project()
and
various Projections
class methods to build the $project
stage:
collection.aggregate( Arrays.asList( Aggregates.project( Projections.fields( Projections.excludeId(), Projections.include("name"), Projections.computed( "firstCategory", new Document("$arrayElemAt", Arrays.asList("$categories", 0)) ) ) ) ) ).subscribe(new PrintDocumentSubscriber());
Explain an Aggregation
To $explain
an aggregation pipeline, call the
AggregatePublisher.explain()
method:
collection.aggregate( Arrays.asList( Aggregates.match(Filters.eq("categories", "Bakery")), Aggregates.group("$stars", Accumulators.sum("count", 1)))) .explain() .subscribe(new PrintDocumentSubscriber());
Atlas Search
You can perform an Atlas Search query by creating and running an aggregation pipeline that contains one of the following pipeline stages:
$search
$searchMeta
The Java Reactive Streams driver provides the Aggregates.search() and Aggregates.searchMeta() methods to perform Atlas Search queries.
To learn more about Atlas Search pipeline stages, see Choose the Aggregation Pipeline Stage in the Atlas documentation.
Create Pipeline Search Stages
You can create the search criteria in your Atlas Search pipeline stage by using Search operators.
The Java Reactive Streams driver provides helper methods for the following operators:
Operator | Description |
---|---|
Performs a search for a word or phrase that contains a sequence of characters from an incomplete input string. | |
Combines two or more operators into a single query. | |
Checks whether a field matches a value you specify.
Maps to the | |
Tests if a path to a specified indexed field name exists in a document. | |
Performs a search for an array of BSON number, date, boolean, objectId, uuid, or string values at the given path and returns documents where the value of the field equals any value in the specified array. | |
Returns documents similar to input documents. | |
Supports querying and scoring numeric, date, and GeoJSON point values. | |
Performs a search for documents containing an ordered sequence of terms using the analyzer specified in the index configuration. | |
Supports querying a combination of indexed fields and values. | |
Supports querying and scoring numeric, date, and string values.
Maps to the | |
Interprets the query field as a regular expression. | |
Performs a full-text search using the analyzer that you specify in the index configuration. | |
Enables queries which use special characters in the search string that can match any character. |
Example Pipeline Search Stage
Note
Atlas Sample Dataset
This example uses the sample_mflix.movies
collection from the Atlas sample
datasets. To learn how to set up a free-tier Atlas cluster and load the
sample dataset, see the Get Started with Atlas tutorial
in the Atlas documentation.
Before you can run this example, you must create an Atlas Search index on the movies
collection that has the following definition:
{ "mappings": { "dynamic": true, "fields": { "title": { "analyzer": "lucene.keyword", "type": "string" }, "genres": { "normalizer": "lowercase", "type": "token" } } } }
To learn more about creating Atlas Search indexes, see the Atlas Search Index Management section of the Indexes guide.
The following code creates a $search
stage that has the following
specifications:
Checks that the
genres
array includes"Comedy"
Searches the
fullplot
field for the phrase"new york"
Matches
year
values between1950
and2000
, inclusiveSearches for
title
values that begins with the term"Love"
Bson searchStageFilters = Aggregates.search( SearchOperator.compound() .filter( List.of( SearchOperator.in(fieldPath("genres"), List.of("Comedy")), SearchOperator.phrase(fieldPath("fullplot"), "new york"), SearchOperator.numberRange(fieldPath("year")).gtLt(1950, 2000), SearchOperator.wildcard(fieldPath("title"), "Love *") ))); Bson projection = Aggregates.project(Projections.fields( Projections.include("title", "year", "genres") )); List<Bson> aggregateStages = List.of(searchStageFilters, projection); Publisher<Document> publisher = movies.aggregate(aggregateStages); publisher.subscribe(new SubscriberHelpers.PrintDocumentSubscriber()); Mono.from(publisher).block();
{"_id": ..., "genres": ["Comedy", "Romance"], "title": "Love at First Bite", "year": 1979} {"_id": ..., "genres": ["Comedy", "Drama"], "title": "Love Affair", "year": 1994}
To learn more about the Atlas Search helper methods, see the SearchOperator interface reference in the Driver Core API documentation.
Additional Information
To view a full list of expression operators, see Aggregation Operators in the MongoDB Server manual.
To learn about assembling an aggregation pipeline and view examples, see Aggregation Pipeline in the MongoDB Server manual.
To learn more about creating pipeline stages, see Aggregation Stages in the MongoDB Server manual.
To learn more about explaining MongoDB operations, see Explain Output and Query Plans in the MongoDB Server manual.
API Documentation
To learn more about the classes and methods mentioned in this guide, see the following API documentation: