Subversion Repositories Projects

Rev

Details | Last modification | View Log | RSS feed

Rev Author Line No. Line
365 rain-er 1
package Thread::Queue;
2
 
3
use strict;
4
use warnings;
5
 
6
our $VERSION = '2.11';
7
 
8
use threads::shared 1.21;
9
use Scalar::Util 1.10 qw(looks_like_number blessed reftype refaddr);
10
 
11
# Carp errors from threads::shared calls should complain about caller
12
our @CARP_NOT = ("threads::shared");
13
 
14
# Predeclarations for internal functions
15
my ($validate_count, $validate_index);
16
 
17
# Create a new queue possibly pre-populated with items
18
sub new
19
{
20
    my $class = shift;
21
    my @queue :shared = map { shared_clone($_) } @_;
22
    return bless(\@queue, $class);
23
}
24
 
25
# Add items to the tail of a queue
26
sub enqueue
27
{
28
    my $queue = shift;
29
    lock(@$queue);
30
    push(@$queue, map { shared_clone($_) } @_)
31
        and cond_signal(@$queue);
32
}
33
 
34
# Return a count of the number of items on a queue
35
sub pending
36
{
37
    my $queue = shift;
38
    lock(@$queue);
39
    return scalar(@$queue);
40
}
41
 
42
# Return 1 or more items from the head of a queue, blocking if needed
43
sub dequeue
44
{
45
    my $queue = shift;
46
    lock(@$queue);
47
 
48
    my $count = @_ ? $validate_count->(shift) : 1;
49
 
50
    # Wait for requisite number of items
51
    cond_wait(@$queue) until (@$queue >= $count);
52
    cond_signal(@$queue) if (@$queue > $count);
53
 
54
    # Return single item
55
    return shift(@$queue) if ($count == 1);
56
 
57
    # Return multiple items
58
    my @items;
59
    push(@items, shift(@$queue)) for (1..$count);
60
    return @items;
61
}
62
 
63
# Return items from the head of a queue with no blocking
64
sub dequeue_nb
65
{
66
    my $queue = shift;
67
    lock(@$queue);
68
 
69
    my $count = @_ ? $validate_count->(shift) : 1;
70
 
71
    # Return single item
72
    return shift(@$queue) if ($count == 1);
73
 
74
    # Return multiple items
75
    my @items;
76
    for (1..$count) {
77
        last if (! @$queue);
78
        push(@items, shift(@$queue));
79
    }
80
    return @items;
81
}
82
 
83
# Return an item without removing it from a queue
84
sub peek
85
{
86
    my $queue = shift;
87
    lock(@$queue);
88
    my $index = @_ ? $validate_index->(shift) : 0;
89
    return $$queue[$index];
90
}
91
 
92
# Insert items anywhere into a queue
93
sub insert
94
{
95
    my $queue = shift;
96
    lock(@$queue);
97
 
98
    my $index = $validate_index->(shift);
99
 
100
    return if (! @_);   # Nothing to insert
101
 
102
    # Support negative indices
103
    if ($index < 0) {
104
        $index += @$queue;
105
        if ($index < 0) {
106
            $index = 0;
107
        }
108
    }
109
 
110
    # Dequeue items from $index onward
111
    my @tmp;
112
    while (@$queue > $index) {
113
        unshift(@tmp, pop(@$queue))
114
    }
115
 
116
    # Add new items to the queue
117
    push(@$queue, map { shared_clone($_) } @_);
118
 
119
    # Add previous items back onto the queue
120
    push(@$queue, @tmp);
121
 
122
    # Soup's up
123
    cond_signal(@$queue);
124
}
125
 
126
# Remove items from anywhere in a queue
127
sub extract
128
{
129
    my $queue = shift;
130
    lock(@$queue);
131
 
132
    my $index = @_ ? $validate_index->(shift) : 0;
133
    my $count = @_ ? $validate_count->(shift) : 1;
134
 
135
    # Support negative indices
136
    if ($index < 0) {
137
        $index += @$queue;
138
        if ($index < 0) {
139
            $count += $index;
140
            return if ($count <= 0);            # Beyond the head of the queue
141
            return $queue->dequeue_nb($count);  # Extract from the head
142
        }
143
    }
144
 
145
    # Dequeue items from $index+$count onward
146
    my @tmp;
147
    while (@$queue > ($index+$count)) {
148
        unshift(@tmp, pop(@$queue))
149
    }
150
 
151
    # Extract desired items
152
    my @items;
153
    unshift(@items, pop(@$queue)) while (@$queue > $index);
154
 
155
    # Add back any removed items
156
    push(@$queue, @tmp);
157
 
158
    # Return single item
159
    return $items[0] if ($count == 1);
160
 
161
    # Return multiple items
162
    return @items;
163
}
164
 
165
### Internal Functions ###
166
 
167
# Check value of the requested index
168
$validate_index = sub {
169
    my $index = shift;
170
 
171
    if (! defined($index) ||
172
        ! looks_like_number($index) ||
173
        (int($index) != $index))
174
    {
175
        require Carp;
176
        my ($method) = (caller(1))[3];
177
        $method =~ s/Thread::Queue:://;
178
        $index = 'undef' if (! defined($index));
179
        Carp::croak("Invalid 'index' argument ($index) to '$method' method");
180
    }
181
 
182
    return $index;
183
};
184
 
185
# Check value of the requested count
186
$validate_count = sub {
187
    my $count = shift;
188
 
189
    if (! defined($count) ||
190
        ! looks_like_number($count) ||
191
        (int($count) != $count) ||
192
        ($count < 1))
193
    {
194
        require Carp;
195
        my ($method) = (caller(1))[3];
196
        $method =~ s/Thread::Queue:://;
197
        $count = 'undef' if (! defined($count));
198
        Carp::croak("Invalid 'count' argument ($count) to '$method' method");
199
    }
200
 
201
    return $count;
202
};
203
 
204
1;
205
 
206
=head1 NAME
207
 
208
Thread::Queue - Thread-safe queues
209
 
210
=head1 VERSION
211
 
212
This document describes Thread::Queue version 2.11
213
 
214
=head1 SYNOPSIS
215
 
216
    use strict;
217
    use warnings;
218
 
219
    use threads;
220
    use Thread::Queue;
221
 
222
    my $q = Thread::Queue->new();    # A new empty queue
223
 
224
    # Worker thread
225
    my $thr = threads->create(sub {
226
                                while (my $item = $q->dequeue()) {
227
                                    # Do work on $item
228
                                }
229
                             })->detach();
230
 
231
    # Send work to the thread
232
    $q->enqueue($item1, ...);
233
 
234
 
235
    # Count of items in the queue
236
    my $left = $q->pending();
237
 
238
    # Non-blocking dequeue
239
    if (defined(my $item = $q->dequeue_nb())) {
240
        # Work on $item
241
    }
242
 
243
    # Get the second item in the queue without dequeuing anything
244
    my $item = $q->peek(1);
245
 
246
    # Insert two items into the queue just behind the head
247
    $q->insert(1, $item1, $item2);
248
 
249
    # Extract the last two items on the queue
250
    my ($item1, $item2) = $q->extract(-2, 2);
251
 
252
=head1 DESCRIPTION
253
 
254
This module provides thread-safe FIFO queues that can be accessed safely by
255
any number of threads.
256
 
257
Any data types supported by L<threads::shared> can be passed via queues:
258
 
259
=over
260
 
261
=item Ordinary scalars
262
 
263
=item Array refs
264
 
265
=item Hash refs
266
 
267
=item Scalar refs
268
 
269
=item Objects based on the above
270
 
271
=back
272
 
273
Ordinary scalars are added to queues as they are.
274
 
275
If not already thread-shared, the other complex data types will be cloned
276
(recursively, if needed, and including any C<bless>ings and read-only
277
settings) into thread-shared structures before being placed onto a queue.
278
 
279
For example, the following would cause L<Thread::Queue> to create a empty,
280
shared array reference via C<&shared([])>, copy the elements 'foo', 'bar'
281
and 'baz' from C<@ary> into it, and then place that shared reference onto
282
the queue:
283
 
284
    my @ary = qw/foo bar baz/;
285
    $q->enqueue(\@ary);
286
 
287
However, for the following, the items are already shared, so their references
288
are added directly to the queue, and no cloning takes place:
289
 
290
    my @ary :shared = qw/foo bar baz/;
291
    $q->enqueue(\@ary);
292
 
293
    my $obj = &shared({});
294
    $$obj{'foo'} = 'bar';
295
    $$obj{'qux'} = 99;
296
    bless($obj, 'My::Class');
297
    $q->enqueue($obj);
298
 
299
See L</"LIMITATIONS"> for caveats related to passing objects via queues.
300
 
301
=head1 QUEUE CREATION
302
 
303
=over
304
 
305
=item ->new()
306
 
307
Creates a new empty queue.
308
 
309
=item ->new(LIST)
310
 
311
Creates a new queue pre-populated with the provided list of items.
312
 
313
=back
314
 
315
=head1 BASIC METHODS
316
 
317
The following methods deal with queues on a FIFO basis.
318
 
319
=over
320
 
321
=item ->enqueue(LIST)
322
 
323
Adds a list of items onto the end of the queue.
324
 
325
=item ->dequeue()
326
 
327
=item ->dequeue(COUNT)
328
 
329
Removes the requested number of items (default is 1) from the head of the
330
queue, and returns them.  If the queue contains fewer than the requested
331
number of items, then the thread will be blocked until the requisite number
332
of items are available (i.e., until other threads <enqueue> more items).
333
 
334
=item ->dequeue_nb()
335
 
336
=item ->dequeue_nb(COUNT)
337
 
338
Removes the requested number of items (default is 1) from the head of the
339
queue, and returns them.  If the queue contains fewer than the requested
340
number of items, then it immediately (i.e., non-blocking) returns whatever
341
items there are on the queue.  If the queue is empty, then C<undef> is
342
returned.
343
 
344
=item ->pending()
345
 
346
Returns the number of items still in the queue.
347
 
348
=back
349
 
350
=head1 ADVANCED METHODS
351
 
352
The following methods can be used to manipulate items anywhere in a queue.
353
 
354
To prevent the contents of a queue from being modified by another thread
355
while it is being examined and/or changed, L<lock|threads::shared/"lock
356
VARIABLE"> the queue inside a local block:
357
 
358
    {
359
        lock($q);   # Keep other threads from changing the queue's contents
360
        my $item = $q->peek();
361
        if ($item ...) {
362
            ...
363
        }
364
    }
365
    # Queue is now unlocked
366
 
367
=over
368
 
369
=item ->peek()
370
 
371
=item ->peek(INDEX)
372
 
373
Returns an item from the queue without dequeuing anything.  Defaults to the
374
the head of queue (at index position 0) if no index is specified.  Negative
375
index values are supported as with L<arrays|perldata/"Subscripts"> (i.e., -1
376
is the end of the queue, -2 is next to last, and so on).
377
 
378
If no items exists at the specified index (i.e., the queue is empty, or the
379
index is beyond the number of items on the queue), then C<undef> is returned.
380
 
381
Remember, the returned item is not removed from the queue, so manipulating a
382
C<peek>ed at reference affects the item on the queue.
383
 
384
=item ->insert(INDEX, LIST)
385
 
386
Adds the list of items to the queue at the specified index position (0
387
is the head of the list).  Any existing items at and beyond that position are
388
pushed back past the newly added items:
389
 
390
    $q->enqueue(1, 2, 3, 4);
391
    $q->insert(1, qw/foo bar/);
392
    # Queue now contains:  1, foo, bar, 2, 3, 4
393
 
394
Specifying an index position greater than the number of items in the queue
395
just adds the list to the end.
396
 
397
Negative index positions are supported:
398
 
399
    $q->enqueue(1, 2, 3, 4);
400
    $q->insert(-2, qw/foo bar/);
401
    # Queue now contains:  1, 2, foo, bar, 3, 4
402
 
403
Specifying a negative index position greater than the number of items in the
404
queue adds the list to the head of the queue.
405
 
406
=item ->extract()
407
 
408
=item ->extract(INDEX)
409
 
410
=item ->extract(INDEX, COUNT)
411
 
412
Removes and returns the specified number of items (defaults to 1) from the
413
specified index position in the queue (0 is the head of the queue).  When
414
called with no arguments, C<extract> operates the same as C<dequeue_nb>.
415
 
416
This method is non-blocking, and will return only as many items as are
417
available to fulfill the request:
418
 
419
    $q->enqueue(1, 2, 3, 4);
420
    my $item  = $q->extract(2)     # Returns 3
421
                                   # Queue now contains:  1, 2, 4
422
    my @items = $q->extract(1, 3)  # Returns (2, 4)
423
                                   # Queue now contains:  1
424
 
425
Specifying an index position greater than the number of items in the
426
queue results in C<undef> or an empty list being returned.
427
 
428
    $q->enqueue('foo');
429
    my $nada = $q->extract(3)      # Returns undef
430
    my @nada = $q->extract(1, 3)   # Returns ()
431
 
432
Negative index positions are supported.  Specifying a negative index position
433
greater than the number of items in the queue may return items from the head
434
of the queue (similar to C<dequeue_nb>) if the count overlaps the head of the
435
queue from the specified position (i.e. if queue size + index + count is
436
greater than zero):
437
 
438
    $q->enqueue(qw/foo bar baz/);
439
    my @nada = $q->extract(-6, 2);   # Returns ()         - (3+(-6)+2) <= 0
440
    my @some = $q->extract(-6, 4);   # Returns (foo)      - (3+(-6)+4) > 0
441
                                     # Queue now contains:  bar, baz
442
    my @rest = $q->extract(-3, 4);   # Returns (bar, baz) - (2+(-3)+4) > 0
443
 
444
=back
445
 
446
=head1 NOTES
447
 
448
Queues created by L<Thread::Queue> can be used in both threaded and
449
non-threaded applications.
450
 
451
=head1 LIMITATIONS
452
 
453
Passing objects on queues may not work if the objects' classes do not support
454
sharing.  See L<threads::shared/"BUGS AND LIMITATIONS"> for more.
455
 
456
Passing array/hash refs that contain objects may not work for Perl prior to
457
5.10.0.
458
 
459
=head1 SEE ALSO
460
 
461
Thread::Queue Discussion Forum on CPAN:
462
L<http://www.cpanforum.com/dist/Thread-Queue>
463
 
464
Annotated POD for Thread::Queue:
465
L<http://annocpan.org/~JDHEDDEN/Thread-Queue-2.11/lib/Thread/Queue.pm>
466
 
467
Source repository:
468
L<http://code.google.com/p/thread-queue/>
469
 
470
L<threads>, L<threads::shared>
471
 
472
=head1 MAINTAINER
473
 
474
Jerry D. Hedden, S<E<lt>jdhedden AT cpan DOT orgE<gt>>
475
 
476
=head1 LICENSE
477
 
478
This program is free software; you can redistribute it and/or modify it under
479
the same terms as Perl itself.
480
 
481
=cut