diff --git a/psalm-baseline.xml b/psalm-baseline.xml
index f51206d0d..3a55977f8 100644
--- a/psalm-baseline.xml
+++ b/psalm-baseline.xml
@@ -191,6 +191,7 @@
+
@@ -198,6 +199,12 @@
+
+
+
+
+
+
@@ -220,9 +227,22 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
@@ -237,9 +257,22 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
@@ -857,6 +890,7 @@
+
diff --git a/src/Client.php b/src/Client.php
index 5b02aa44c..2e161707b 100644
--- a/src/Client.php
+++ b/src/Client.php
@@ -22,6 +22,7 @@
use MongoDB\BSON\Document;
use MongoDB\BSON\PackedArray;
use MongoDB\Builder\BuilderEncoder;
+use MongoDB\Builder\Pipeline;
use MongoDB\Codec\Encoder;
use MongoDB\Driver\ClientEncryption;
use MongoDB\Driver\Exception\InvalidArgumentException as DriverInvalidArgumentException;
@@ -391,6 +392,12 @@ public function startSession(array $options = [])
*/
public function watch(array $pipeline = [], array $options = [])
{
+ if (is_builder_pipeline($pipeline)) {
+ $pipeline = new Pipeline(...$pipeline);
+ }
+
+ $pipeline = $this->builderEncoder->encodeIfSupported($pipeline);
+
if (! isset($options['readPreference']) && ! is_in_transaction($options)) {
$options['readPreference'] = $this->readPreference;
}
diff --git a/src/Collection.php b/src/Collection.php
index 7769b9a5b..8ed460c4e 100644
--- a/src/Collection.php
+++ b/src/Collection.php
@@ -23,6 +23,7 @@
use MongoDB\BSON\JavascriptInterface;
use MongoDB\BSON\PackedArray;
use MongoDB\Builder\BuilderEncoder;
+use MongoDB\Builder\Pipeline;
use MongoDB\Codec\DocumentCodec;
use MongoDB\Codec\Encoder;
use MongoDB\Driver\CursorInterface;
@@ -223,6 +224,12 @@ public function __toString()
*/
public function aggregate(array $pipeline, array $options = [])
{
+ if (is_builder_pipeline($pipeline)) {
+ $pipeline = new Pipeline(...$pipeline);
+ }
+
+ $pipeline = $this->builderEncoder->encodeIfSupported($pipeline);
+
$hasWriteStage = is_last_pipeline_operator_write($pipeline);
$options = $this->inheritReadPreference($options);
@@ -1098,6 +1105,12 @@ public function updateSearchIndex(string $name, array|object $definition, array
*/
public function watch(array $pipeline = [], array $options = [])
{
+ if (is_builder_pipeline($pipeline)) {
+ $pipeline = new Pipeline(...$pipeline);
+ }
+
+ $pipeline = $this->builderEncoder->encodeIfSupported($pipeline);
+
$options = $this->inheritReadOptions($options);
$options = $this->inheritCodecOrTypeMap($options);
diff --git a/src/Database.php b/src/Database.php
index 8b7742b70..125012c81 100644
--- a/src/Database.php
+++ b/src/Database.php
@@ -21,6 +21,7 @@
use MongoDB\BSON\Document;
use MongoDB\BSON\PackedArray;
use MongoDB\Builder\BuilderEncoder;
+use MongoDB\Builder\Pipeline;
use MongoDB\Codec\Encoder;
use MongoDB\Driver\ClientEncryption;
use MongoDB\Driver\Cursor;
@@ -202,6 +203,12 @@ public function __toString()
*/
public function aggregate(array $pipeline, array $options = [])
{
+ if (is_builder_pipeline($pipeline)) {
+ $pipeline = new Pipeline(...$pipeline);
+ }
+
+ $pipeline = $this->builderEncoder->encodeIfSupported($pipeline);
+
$hasWriteStage = is_last_pipeline_operator_write($pipeline);
if (! isset($options['readPreference']) && ! is_in_transaction($options)) {
@@ -611,6 +618,12 @@ public function selectGridFSBucket(array $options = [])
*/
public function watch(array $pipeline = [], array $options = [])
{
+ if (is_builder_pipeline($pipeline)) {
+ $pipeline = new Pipeline(...$pipeline);
+ }
+
+ $pipeline = $this->builderEncoder->encodeIfSupported($pipeline);
+
if (! isset($options['readPreference']) && ! is_in_transaction($options)) {
$options['readPreference'] = $this->readPreference;
}
diff --git a/src/functions.php b/src/functions.php
index ca30fdbc0..a445467ba 100644
--- a/src/functions.php
+++ b/src/functions.php
@@ -21,6 +21,7 @@
use MongoDB\BSON\Document;
use MongoDB\BSON\PackedArray;
use MongoDB\BSON\Serializable;
+use MongoDB\Builder\Type\StageInterface;
use MongoDB\Driver\Exception\RuntimeException as DriverRuntimeException;
use MongoDB\Driver\Manager;
use MongoDB\Driver\ReadPreference;
@@ -327,6 +328,27 @@ function is_pipeline(array|object $pipeline, bool $allowEmpty = false): bool
return true;
}
+/**
+ * Returns whether the argument is a list that contains at least one
+ * {@see StageInterface} object.
+ *
+ * @internal
+ */
+function is_builder_pipeline(array $pipeline): bool
+{
+ if (! $pipeline || ! array_is_list($pipeline)) {
+ return false;
+ }
+
+ foreach ($pipeline as $stage) {
+ if (is_object($stage) && $stage instanceof StageInterface) {
+ return true;
+ }
+ }
+
+ return false;
+}
+
/**
* Returns whether we are currently in a transaction.
*
diff --git a/tests/ClientFunctionalTest.php b/tests/ClientFunctionalTest.php
index 3c04ecd67..ca0ea2722 100644
--- a/tests/ClientFunctionalTest.php
+++ b/tests/ClientFunctionalTest.php
@@ -2,6 +2,9 @@
namespace MongoDB\Tests;
+use MongoDB\Builder\Pipeline;
+use MongoDB\Builder\Query;
+use MongoDB\Builder\Stage;
use MongoDB\Client;
use MongoDB\Driver\BulkWrite;
use MongoDB\Driver\Command;
@@ -13,6 +16,7 @@
use function call_user_func;
use function is_callable;
+use function iterator_to_array;
use function sprintf;
/**
@@ -137,4 +141,25 @@ public function testAddAndRemoveSubscriber(): void
$client->getManager()->executeCommand('admin', new Command(['ping' => 1]));
}
+
+ public function testWatchWithBuilderPipeline(): void
+ {
+ $this->skipIfChangeStreamIsNotSupported();
+
+ if ($this->isShardedCluster()) {
+ $this->markTestSkipped('Test does not apply on sharded clusters: need more than a single getMore call on the change stream.');
+ }
+
+ $pipeline = new Pipeline(
+ Stage::match(operationType: Query::eq('insert')),
+ );
+ // Extract the list of stages for arg type restriction
+ $pipeline = iterator_to_array($pipeline);
+
+ $changeStream = $this->client->watch($pipeline);
+ $this->client->selectCollection($this->getDatabaseName(), $this->getCollectionName())->insertOne(['x' => 3]);
+ $changeStream->next();
+ $this->assertTrue($changeStream->valid());
+ $this->assertEquals('insert', $changeStream->current()->operationType);
+ }
}
diff --git a/tests/Collection/BuilderCollectionFunctionalTest.php b/tests/Collection/BuilderCollectionFunctionalTest.php
index 15a89cda5..2ace15109 100644
--- a/tests/Collection/BuilderCollectionFunctionalTest.php
+++ b/tests/Collection/BuilderCollectionFunctionalTest.php
@@ -2,10 +2,13 @@
namespace MongoDB\Tests\Collection;
+use MongoDB\Builder\Expression;
use MongoDB\Builder\Pipeline;
use MongoDB\Builder\Query;
use MongoDB\Builder\Stage;
+use function iterator_to_array;
+
class BuilderCollectionFunctionalTest extends FunctionalTestCase
{
public function setUp(): void
@@ -17,7 +20,18 @@ public function setUp(): void
public function testAggregate(): void
{
- $this->markTestSkipped('Not supported yet');
+ $this->collection->insertMany([['x' => 10], ['x' => 10], ['x' => 10]]);
+ $pipeline = new Pipeline(
+ Stage::bucketAuto(
+ groupBy: Expression::intFieldPath('x'),
+ buckets: 2,
+ ),
+ );
+ // Extract the list of stages for arg type restriction
+ $pipeline = iterator_to_array($pipeline);
+
+ $results = $this->collection->aggregate($pipeline)->toArray();
+ $this->assertCount(2, $results);
}
public function testBulkWriteDeleteMany(): void
@@ -245,6 +259,22 @@ public function testUpdateManyWithPipeline(): void
public function testWatch(): void
{
- $this->markTestSkipped('Not supported yet');
+ $this->skipIfChangeStreamIsNotSupported();
+
+ if ($this->isShardedCluster()) {
+ $this->markTestSkipped('Test does not apply on sharded clusters: need more than a single getMore call on the change stream.');
+ }
+
+ $pipeline = new Pipeline(
+ Stage::match(operationType: Query::eq('insert')),
+ );
+ // Extract the list of stages for arg type restriction
+ $pipeline = iterator_to_array($pipeline);
+
+ $changeStream = $this->collection->watch($pipeline);
+ $this->collection->insertOne(['x' => 3]);
+ $changeStream->next();
+ $this->assertTrue($changeStream->valid());
+ $this->assertEquals('insert', $changeStream->current()->operationType);
}
}
diff --git a/tests/Database/BuilderDatabaseFunctionalTest.php b/tests/Database/BuilderDatabaseFunctionalTest.php
new file mode 100644
index 000000000..9b89d87e1
--- /dev/null
+++ b/tests/Database/BuilderDatabaseFunctionalTest.php
@@ -0,0 +1,63 @@
+dropCollection($this->getDatabaseName(), $this->getCollectionName());
+
+ parent::tearDown();
+ }
+
+ public function testAggregate(): void
+ {
+ $this->skipIfServerVersion('<', '6.0.0', '$documents stage is not supported');
+
+ $pipeline = new Pipeline(
+ Stage::documents([
+ ['x' => 1],
+ ['x' => 2],
+ ['x' => 3],
+ ]),
+ Stage::bucketAuto(
+ groupBy: Expression::intFieldPath('x'),
+ buckets: 2,
+ ),
+ );
+ // Extract the list of stages for arg type restriction
+ $pipeline = iterator_to_array($pipeline);
+
+ $results = $this->database->aggregate($pipeline)->toArray();
+ $this->assertCount(2, $results);
+ }
+
+ public function testWatch(): void
+ {
+ $this->skipIfChangeStreamIsNotSupported();
+
+ if ($this->isShardedCluster()) {
+ $this->markTestSkipped('Test does not apply on sharded clusters: need more than a single getMore call on the change stream.');
+ }
+
+ $pipeline = new Pipeline(
+ Stage::match(operationType: Query::eq('insert')),
+ );
+ // Extract the list of stages for arg type restriction
+ $pipeline = iterator_to_array($pipeline);
+
+ $changeStream = $this->database->watch($pipeline);
+ $this->database->selectCollection($this->getCollectionName())->insertOne(['x' => 3]);
+ $changeStream->next();
+ $this->assertTrue($changeStream->valid());
+ $this->assertEquals('insert', $changeStream->current()->operationType);
+ }
+}
diff --git a/tests/FunctionsTest.php b/tests/FunctionsTest.php
index 7098cb24a..a673ef719 100644
--- a/tests/FunctionsTest.php
+++ b/tests/FunctionsTest.php
@@ -4,6 +4,8 @@
use MongoDB\BSON\Document;
use MongoDB\BSON\PackedArray;
+use MongoDB\Builder\Stage\LimitStage;
+use MongoDB\Builder\Stage\MatchStage;
use MongoDB\Driver\WriteConcern;
use MongoDB\Model\BSONArray;
use MongoDB\Model\BSONDocument;
@@ -12,6 +14,7 @@
use function MongoDB\apply_type_map_to_document;
use function MongoDB\create_field_path_type_map;
use function MongoDB\document_to_array;
+use function MongoDB\is_builder_pipeline;
use function MongoDB\is_first_key_operator;
use function MongoDB\is_last_pipeline_operator_write;
use function MongoDB\is_mapreduce_output_inline;
@@ -311,6 +314,21 @@ public function providePipelines(): array
];
}
+ /** @dataProvider provideStagePipelines */
+ public function testIsBuilderPipeline($expected, $pipeline): void
+ {
+ $this->assertSame($expected, is_builder_pipeline($pipeline));
+ }
+
+ public function provideStagePipelines(): iterable
+ {
+ yield 'empty array' => [false, []];
+ yield 'array of arrays' => [false, [['$match' => ['x' => 1]]]];
+ yield 'map of stages' => [false, [1 => new MatchStage([])]];
+ yield 'stages' => [true, [new MatchStage([]), new LimitStage(1)]];
+ yield 'stages and operators' => [true, [new MatchStage([]), ['$limit' => 1]]];
+ }
+
/** @dataProvider provideWriteConcerns */
public function testIsWriteConcernAcknowledged($expected, WriteConcern $writeConcern): void
{