Skip to content

Commit 4fd7cee

Browse files
authored
Merge pull request #6 from RunOpenCode/dev
Dev
2 parents 5274758 + c19f315 commit 4fd7cee

File tree

5 files changed

+243
-3
lines changed

5 files changed

+243
-3
lines changed
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace RunOpenCode\Component\Dataset\Operator;
6+
7+
use RunOpenCode\Component\Dataset\AbstractStream;
8+
use RunOpenCode\Component\Dataset\Contract\OperatorInterface;
9+
10+
/**
11+
* Compress join operator.
12+
*
13+
* Compress join operator iterates over given collection and compresses items based on the predicate and join functions.
14+
*
15+
* Example usage:
16+
*
17+
* ```php
18+
* use RunOpenCode\Component\Dataset\Operator\CompressJoin;
19+
*
20+
* $compressJoin = new CompressJoin(
21+
* collection: new Dataset([1 => [1, 2], 2 => [1, 3], 3 => [1, 4], 4 => [2, 1], 5 => [2, 2]]),
22+
* predicate: static fn(array $values): bool => $values[0][0] === $values[1][0],
23+
* join: static fn(array $buffer): iterable => [
24+
* $buffer[0][1][0] => array_map(static fn(array $record): int => $record[1][1], $buffer)
25+
* ],
26+
* );
27+
* // $compressJoin will yield [1 => [2, 3, 4], 2 => [1, 2]]
28+
* ```
29+
*
30+
* @template TKey
31+
* @template TValue
32+
* @template TModifiedKey
33+
* @template TModifiedValue
34+
*
35+
* @phpstan-type PredicateValues = array{TValue, TValue}
36+
* @phpstan-type PredicateKeys = array{TKey, TKey}
37+
* @phpstan-type Record = array{TKey, TValue}
38+
* @phpstan-type Buffer = list<Record>
39+
* @phpstan-type PredicateCallable = callable(PredicateValues, PredicateKeys=, Buffer=): bool
40+
* @phpstan-type JoinCallable = callable(Buffer): iterable<TModifiedKey, TModifiedValue>
41+
*
42+
* @extends AbstractStream<TModifiedKey, TModifiedValue>
43+
* @implements OperatorInterface<TModifiedKey, TModifiedValue>
44+
*/
45+
final class CompressJoin extends AbstractStream implements OperatorInterface
46+
{
47+
private readonly \Closure $predicate;
48+
49+
private readonly \Closure $join;
50+
51+
/**
52+
* @param iterable<TKey, TValue> $collection Collection to iterate over.
53+
* @param PredicateCallable $predicate Callable predicate function to evaluate.
54+
* @param JoinCallable $join Callable join function to produce joined records.
55+
*/
56+
public function __construct(
57+
private readonly iterable $collection,
58+
callable $predicate,
59+
callable $join,
60+
) {
61+
parent::__construct($collection);
62+
$this->predicate = $predicate(...);
63+
$this->join = $join(...);
64+
}
65+
66+
/**
67+
* {@inheritdoc}
68+
*/
69+
protected function iterate(): \Traversable
70+
{
71+
/** @var Buffer $buffer */
72+
$buffer = [];
73+
/** @var Record|null $previous */
74+
$previous = null;
75+
76+
foreach ($this->collection as $key => $value) {
77+
if (0 === \count($buffer)) {
78+
$previous = [$key, $value];
79+
$buffer[] = $previous;
80+
continue;
81+
}
82+
83+
\assert(null !== $previous);
84+
85+
if (($this->predicate)([$previous[1], $value], [$previous[0], $key], $buffer)) {
86+
$previous = [$key, $value];
87+
$buffer[] = $previous;
88+
continue;
89+
}
90+
91+
yield from ($this->join)($buffer);
92+
$previous = [$key, $value];
93+
$buffer = [$previous];
94+
}
95+
96+
if (0 !== \count($buffer)) {
97+
yield from ($this->join)($buffer);
98+
}
99+
}
100+
}

src/RunOpenCode/Component/Dataset/src/Stream.php

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
use function RunOpenCode\Component\Dataset\aggregate as dataset_aggregate;
1111
use function RunOpenCode\Component\Dataset\batch as dataset_batch;
1212
use function RunOpenCode\Component\Dataset\collect as dataset_collect;
13+
use function RunOpenCode\Component\Dataset\compress_join as dataset_compress_join;
1314
use function RunOpenCode\Component\Dataset\distinct as dataset_distinct;
1415
use function RunOpenCode\Component\Dataset\filter as dataset_filter;
1516
use function RunOpenCode\Component\Dataset\flatten as dataset_flatten;
@@ -80,6 +81,24 @@ public function batch(callable $onBatch, int $size = 1000): self
8081
return dataset_batch($this, $onBatch, $size);
8182
}
8283

84+
/**
85+
* Applies compress join operator on current stream.
86+
*
87+
* @template TModifiedKey
88+
* @template TModifiedValue
89+
*
90+
* @param callable(array{TValue, TValue}, array{TKey, TKey}=, list<array{TKey, TValue}>=): bool $predicate Callable predicate function to evaluate.
91+
* @param callable(list<array{TKey, TValue}>): iterable<TModifiedKey, TModifiedValue> $join Callable join function to produce joined records.
92+
*
93+
* @return self<TModifiedKey, TModifiedValue>
94+
*
95+
* @see Operator\CompressJoin
96+
*/
97+
public function compressJoin(callable $predicate, callable $join): self
98+
{
99+
return dataset_compress_join($this, $predicate, $join);
100+
}
101+
83102
/**
84103
* Applies distinct operator on current stream.
85104
*
@@ -346,4 +365,4 @@ protected function iterate(): \Traversable
346365
{
347366
yield from $this->collection;
348367
}
349-
}
368+
}

src/RunOpenCode/Component/Dataset/src/functions.php

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,29 @@ function batch(iterable $collection, callable $onBatch, int $size = 1000): Strea
8282
);
8383
}
8484

85+
/**
86+
* Create compress join operator.
87+
*
88+
* @template TKey
89+
* @template TValue
90+
* @template TModifiedKey
91+
* @template TModifiedValue
92+
*
93+
* @param iterable<TKey, TValue> $collection Collection to iterate over.
94+
* @param callable(array{TValue, TValue}, array{TKey, TKey}=, list<array{TKey, TValue}>=): bool $predicate Callable predicate function to evaluate.
95+
* @param callable(list<array{TKey, TValue}>): iterable<TModifiedKey, TModifiedValue> $join Callable join function to produce joined records.
96+
*
97+
* @return Stream<TModifiedKey, TModifiedValue>
98+
*
99+
* @see Operator\CompressJoin
100+
*/
101+
function compress_join(iterable $collection, callable $predicate, callable $join): Stream
102+
{
103+
return new Stream(
104+
new Operator\CompressJoin($collection, $predicate, $join)
105+
);
106+
}
107+
85108
/**
86109
* Create distinct operator.
87110
*
@@ -453,4 +476,4 @@ function reduce(iterable $collection, callable|string $reducer, mixed ...$args):
453476
}
454477

455478
return $reducer->value;
456-
}
479+
}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace RunOpenCode\Component\Dataset\Tests\Operator;
6+
7+
use PHPUnit\Framework\Attributes\Test;
8+
use PHPUnit\Framework\TestCase;
9+
use RunOpenCode\Component\Dataset\Operator\CompressJoin;
10+
11+
final class CompressJoinTest extends TestCase
12+
{
13+
#[Test]
14+
public function compress_join(): void
15+
{
16+
$operator = new CompressJoin(
17+
[
18+
1 => [10, 2],
19+
2 => [10, 3],
20+
3 => [10, 4],
21+
4 => [20, 1],
22+
5 => [20, 2],
23+
6 => [30, 5],
24+
],
25+
static fn(array $values): bool => $values[0][0] === $values[1][0],
26+
static fn(array $buffer): iterable => [
27+
$buffer[0][1][0] => \array_map(static fn(array $record): int => $record[1][1], $buffer),
28+
],
29+
);
30+
31+
$this->assertSame([
32+
10 => [2, 3, 4],
33+
20 => [1, 2],
34+
30 => [5],
35+
], \iterator_to_array($operator));
36+
}
37+
38+
#[Test]
39+
public function compress_join_with_single_element(): void
40+
{
41+
$operator = new CompressJoin(
42+
[
43+
1 => [10, 2],
44+
],
45+
static fn(array $values): bool => $values[0][0] === $values[1][0], // @phpstan-ignore-line
46+
static fn(array $buffer): iterable => [
47+
$buffer[0][1][0] => \array_map(static fn(array $record): int => $record[1][1], $buffer),
48+
],
49+
);
50+
51+
$this->assertSame([
52+
10 => [2],
53+
], \iterator_to_array($operator));
54+
}
55+
56+
#[Test]
57+
public function compress_join_with_empty(): void
58+
{
59+
$operator = new CompressJoin(
60+
[],
61+
static fn(array $values): bool => $values[0][0] === $values[1][0],
62+
static fn(array $buffer): iterable => [
63+
$buffer[0][1][0] => \array_map(static fn(array $record): int => $record[1][1], $buffer), //@phpstan-ignore-line
64+
],
65+
);
66+
67+
$this->assertSame([], \iterator_to_array($operator));
68+
}
69+
}

src/RunOpenCode/Component/Dataset/tests/StreamTest.php

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,35 @@ public function batch(): void
5353
], $data);
5454
}
5555

56+
#[Test]
57+
public function compressJoin(): void
58+
{
59+
$dataset = [
60+
1 => [10, 2],
61+
2 => [10, 3],
62+
3 => [10, 4],
63+
4 => [20, 1],
64+
5 => [20, 2],
65+
6 => [30, 5],
66+
];
67+
68+
$data = new Stream($dataset)
69+
->compressJoin(
70+
static fn(array $values): bool => $values[0][0] === $values[1][0],
71+
static fn(array $buffer): iterable => [
72+
$buffer[0][1][0] => \array_map(static fn(array $record): int => $record[1][1], $buffer),
73+
],
74+
)
75+
->collect(ArrayCollector::class)
76+
->value;
77+
78+
$this->assertSame([
79+
10 => [2, 3, 4],
80+
20 => [1, 2],
81+
30 => [5],
82+
], $data);
83+
}
84+
5685
#[Test]
5786
public function distinct(): void
5887
{
@@ -371,4 +400,4 @@ public function throws_exception_when_iterating_closed_stream(): void
371400
iterable_to_array($stream);
372401
\iterator_to_array($stream);
373402
}
374-
}
403+
}

0 commit comments

Comments
 (0)