Details | Last modification | View Log | RSS feed
Rev | Author | Line No. | Line |
---|---|---|---|
532 | rain-er | 1 | package Thread::Queue; |
2 | |||
3 | use strict; |
||
4 | use warnings; |
||
5 | |||
6 | our $VERSION = '2.11'; |
||
7 | |||
8 | use threads::shared 1.21; |
||
9 | use Scalar::Util 1.10 qw(looks_like_number blessed reftype refaddr); |
||
10 | |||
11 | # Carp errors from threads::shared calls should complain about caller |
||
12 | our @CARP_NOT = ("threads::shared"); |
||
13 | |||
14 | # Predeclarations for internal functions |
||
15 | my ($validate_count, $validate_index); |
||
16 | |||
17 | # Create a new queue possibly pre-populated with items |
||
18 | sub new |
||
19 | { |
||
20 | my $class = shift; |
||
21 | my @queue :shared = map { shared_clone($_) } @_; |
||
22 | return bless(\@queue, $class); |
||
23 | } |
||
24 | |||
25 | # Add items to the tail of a queue |
||
26 | sub enqueue |
||
27 | { |
||
28 | my $queue = shift; |
||
29 | lock(@$queue); |
||
30 | push(@$queue, map { shared_clone($_) } @_) |
||
31 | and cond_signal(@$queue); |
||
32 | } |
||
33 | |||
34 | # Return a count of the number of items on a queue |
||
35 | sub pending |
||
36 | { |
||
37 | my $queue = shift; |
||
38 | lock(@$queue); |
||
39 | return scalar(@$queue); |
||
40 | } |
||
41 | |||
42 | # Return 1 or more items from the head of a queue, blocking if needed |
||
43 | sub dequeue |
||
44 | { |
||
45 | my $queue = shift; |
||
46 | lock(@$queue); |
||
47 | |||
48 | my $count = @_ ? $validate_count->(shift) : 1; |
||
49 | |||
50 | # Wait for requisite number of items |
||
51 | cond_wait(@$queue) until (@$queue >= $count); |
||
52 | cond_signal(@$queue) if (@$queue > $count); |
||
53 | |||
54 | # Return single item |
||
55 | return shift(@$queue) if ($count == 1); |
||
56 | |||
57 | # Return multiple items |
||
58 | my @items; |
||
59 | push(@items, shift(@$queue)) for (1..$count); |
||
60 | return @items; |
||
61 | } |
||
62 | |||
63 | # Return items from the head of a queue with no blocking |
||
64 | sub dequeue_nb |
||
65 | { |
||
66 | my $queue = shift; |
||
67 | lock(@$queue); |
||
68 | |||
69 | my $count = @_ ? $validate_count->(shift) : 1; |
||
70 | |||
71 | # Return single item |
||
72 | return shift(@$queue) if ($count == 1); |
||
73 | |||
74 | # Return multiple items |
||
75 | my @items; |
||
76 | for (1..$count) { |
||
77 | last if (! @$queue); |
||
78 | push(@items, shift(@$queue)); |
||
79 | } |
||
80 | return @items; |
||
81 | } |
||
82 | |||
83 | # Return an item without removing it from a queue |
||
84 | sub peek |
||
85 | { |
||
86 | my $queue = shift; |
||
87 | lock(@$queue); |
||
88 | my $index = @_ ? $validate_index->(shift) : 0; |
||
89 | return $$queue[$index]; |
||
90 | } |
||
91 | |||
92 | # Insert items anywhere into a queue |
||
93 | sub insert |
||
94 | { |
||
95 | my $queue = shift; |
||
96 | lock(@$queue); |
||
97 | |||
98 | my $index = $validate_index->(shift); |
||
99 | |||
100 | return if (! @_); # Nothing to insert |
||
101 | |||
102 | # Support negative indices |
||
103 | if ($index < 0) { |
||
104 | $index += @$queue; |
||
105 | if ($index < 0) { |
||
106 | $index = 0; |
||
107 | } |
||
108 | } |
||
109 | |||
110 | # Dequeue items from $index onward |
||
111 | my @tmp; |
||
112 | while (@$queue > $index) { |
||
113 | unshift(@tmp, pop(@$queue)) |
||
114 | } |
||
115 | |||
116 | # Add new items to the queue |
||
117 | push(@$queue, map { shared_clone($_) } @_); |
||
118 | |||
119 | # Add previous items back onto the queue |
||
120 | push(@$queue, @tmp); |
||
121 | |||
122 | # Soup's up |
||
123 | cond_signal(@$queue); |
||
124 | } |
||
125 | |||
126 | # Remove items from anywhere in a queue |
||
127 | sub extract |
||
128 | { |
||
129 | my $queue = shift; |
||
130 | lock(@$queue); |
||
131 | |||
132 | my $index = @_ ? $validate_index->(shift) : 0; |
||
133 | my $count = @_ ? $validate_count->(shift) : 1; |
||
134 | |||
135 | # Support negative indices |
||
136 | if ($index < 0) { |
||
137 | $index += @$queue; |
||
138 | if ($index < 0) { |
||
139 | $count += $index; |
||
140 | return if ($count <= 0); # Beyond the head of the queue |
||
141 | return $queue->dequeue_nb($count); # Extract from the head |
||
142 | } |
||
143 | } |
||
144 | |||
145 | # Dequeue items from $index+$count onward |
||
146 | my @tmp; |
||
147 | while (@$queue > ($index+$count)) { |
||
148 | unshift(@tmp, pop(@$queue)) |
||
149 | } |
||
150 | |||
151 | # Extract desired items |
||
152 | my @items; |
||
153 | unshift(@items, pop(@$queue)) while (@$queue > $index); |
||
154 | |||
155 | # Add back any removed items |
||
156 | push(@$queue, @tmp); |
||
157 | |||
158 | # Return single item |
||
159 | return $items[0] if ($count == 1); |
||
160 | |||
161 | # Return multiple items |
||
162 | return @items; |
||
163 | } |
||
164 | |||
165 | ### Internal Functions ### |
||
166 | |||
167 | # Check value of the requested index |
||
168 | $validate_index = sub { |
||
169 | my $index = shift; |
||
170 | |||
171 | if (! defined($index) || |
||
172 | ! looks_like_number($index) || |
||
173 | (int($index) != $index)) |
||
174 | { |
||
175 | require Carp; |
||
176 | my ($method) = (caller(1))[3]; |
||
177 | $method =~ s/Thread::Queue:://; |
||
178 | $index = 'undef' if (! defined($index)); |
||
179 | Carp::croak("Invalid 'index' argument ($index) to '$method' method"); |
||
180 | } |
||
181 | |||
182 | return $index; |
||
183 | }; |
||
184 | |||
185 | # Check value of the requested count |
||
186 | $validate_count = sub { |
||
187 | my $count = shift; |
||
188 | |||
189 | if (! defined($count) || |
||
190 | ! looks_like_number($count) || |
||
191 | (int($count) != $count) || |
||
192 | ($count < 1)) |
||
193 | { |
||
194 | require Carp; |
||
195 | my ($method) = (caller(1))[3]; |
||
196 | $method =~ s/Thread::Queue:://; |
||
197 | $count = 'undef' if (! defined($count)); |
||
198 | Carp::croak("Invalid 'count' argument ($count) to '$method' method"); |
||
199 | } |
||
200 | |||
201 | return $count; |
||
202 | }; |
||
203 | |||
204 | 1; |
||
205 | |||
206 | =head1 NAME |
||
207 | |||
208 | Thread::Queue - Thread-safe queues |
||
209 | |||
210 | =head1 VERSION |
||
211 | |||
212 | This document describes Thread::Queue version 2.11 |
||
213 | |||
214 | =head1 SYNOPSIS |
||
215 | |||
216 | use strict; |
||
217 | use warnings; |
||
218 | |||
219 | use threads; |
||
220 | use Thread::Queue; |
||
221 | |||
222 | my $q = Thread::Queue->new(); # A new empty queue |
||
223 | |||
224 | # Worker thread |
||
225 | my $thr = threads->create(sub { |
||
226 | while (my $item = $q->dequeue()) { |
||
227 | # Do work on $item |
||
228 | } |
||
229 | })->detach(); |
||
230 | |||
231 | # Send work to the thread |
||
232 | $q->enqueue($item1, ...); |
||
233 | |||
234 | |||
235 | # Count of items in the queue |
||
236 | my $left = $q->pending(); |
||
237 | |||
238 | # Non-blocking dequeue |
||
239 | if (defined(my $item = $q->dequeue_nb())) { |
||
240 | # Work on $item |
||
241 | } |
||
242 | |||
243 | # Get the second item in the queue without dequeuing anything |
||
244 | my $item = $q->peek(1); |
||
245 | |||
246 | # Insert two items into the queue just behind the head |
||
247 | $q->insert(1, $item1, $item2); |
||
248 | |||
249 | # Extract the last two items on the queue |
||
250 | my ($item1, $item2) = $q->extract(-2, 2); |
||
251 | |||
252 | =head1 DESCRIPTION |
||
253 | |||
254 | This module provides thread-safe FIFO queues that can be accessed safely by |
||
255 | any number of threads. |
||
256 | |||
257 | Any data types supported by L<threads::shared> can be passed via queues: |
||
258 | |||
259 | =over |
||
260 | |||
261 | =item Ordinary scalars |
||
262 | |||
263 | =item Array refs |
||
264 | |||
265 | =item Hash refs |
||
266 | |||
267 | =item Scalar refs |
||
268 | |||
269 | =item Objects based on the above |
||
270 | |||
271 | =back |
||
272 | |||
273 | Ordinary scalars are added to queues as they are. |
||
274 | |||
275 | If not already thread-shared, the other complex data types will be cloned |
||
276 | (recursively, if needed, and including any C<bless>ings and read-only |
||
277 | settings) into thread-shared structures before being placed onto a queue. |
||
278 | |||
279 | For example, the following would cause L<Thread::Queue> to create a empty, |
||
280 | shared array reference via C<&shared([])>, copy the elements 'foo', 'bar' |
||
281 | and 'baz' from C<@ary> into it, and then place that shared reference onto |
||
282 | the queue: |
||
283 | |||
284 | my @ary = qw/foo bar baz/; |
||
285 | $q->enqueue(\@ary); |
||
286 | |||
287 | However, for the following, the items are already shared, so their references |
||
288 | are added directly to the queue, and no cloning takes place: |
||
289 | |||
290 | my @ary :shared = qw/foo bar baz/; |
||
291 | $q->enqueue(\@ary); |
||
292 | |||
293 | my $obj = &shared({}); |
||
294 | $$obj{'foo'} = 'bar'; |
||
295 | $$obj{'qux'} = 99; |
||
296 | bless($obj, 'My::Class'); |
||
297 | $q->enqueue($obj); |
||
298 | |||
299 | See L</"LIMITATIONS"> for caveats related to passing objects via queues. |
||
300 | |||
301 | =head1 QUEUE CREATION |
||
302 | |||
303 | =over |
||
304 | |||
305 | =item ->new() |
||
306 | |||
307 | Creates a new empty queue. |
||
308 | |||
309 | =item ->new(LIST) |
||
310 | |||
311 | Creates a new queue pre-populated with the provided list of items. |
||
312 | |||
313 | =back |
||
314 | |||
315 | =head1 BASIC METHODS |
||
316 | |||
317 | The following methods deal with queues on a FIFO basis. |
||
318 | |||
319 | =over |
||
320 | |||
321 | =item ->enqueue(LIST) |
||
322 | |||
323 | Adds a list of items onto the end of the queue. |
||
324 | |||
325 | =item ->dequeue() |
||
326 | |||
327 | =item ->dequeue(COUNT) |
||
328 | |||
329 | Removes the requested number of items (default is 1) from the head of the |
||
330 | queue, and returns them. If the queue contains fewer than the requested |
||
331 | number of items, then the thread will be blocked until the requisite number |
||
332 | of items are available (i.e., until other threads <enqueue> more items). |
||
333 | |||
334 | =item ->dequeue_nb() |
||
335 | |||
336 | =item ->dequeue_nb(COUNT) |
||
337 | |||
338 | Removes the requested number of items (default is 1) from the head of the |
||
339 | queue, and returns them. If the queue contains fewer than the requested |
||
340 | number of items, then it immediately (i.e., non-blocking) returns whatever |
||
341 | items there are on the queue. If the queue is empty, then C<undef> is |
||
342 | returned. |
||
343 | |||
344 | =item ->pending() |
||
345 | |||
346 | Returns the number of items still in the queue. |
||
347 | |||
348 | =back |
||
349 | |||
350 | =head1 ADVANCED METHODS |
||
351 | |||
352 | The following methods can be used to manipulate items anywhere in a queue. |
||
353 | |||
354 | To prevent the contents of a queue from being modified by another thread |
||
355 | while it is being examined and/or changed, L<lock|threads::shared/"lock |
||
356 | VARIABLE"> the queue inside a local block: |
||
357 | |||
358 | { |
||
359 | lock($q); # Keep other threads from changing the queue's contents |
||
360 | my $item = $q->peek(); |
||
361 | if ($item ...) { |
||
362 | ... |
||
363 | } |
||
364 | } |
||
365 | # Queue is now unlocked |
||
366 | |||
367 | =over |
||
368 | |||
369 | =item ->peek() |
||
370 | |||
371 | =item ->peek(INDEX) |
||
372 | |||
373 | Returns an item from the queue without dequeuing anything. Defaults to the |
||
374 | the head of queue (at index position 0) if no index is specified. Negative |
||
375 | index values are supported as with L<arrays|perldata/"Subscripts"> (i.e., -1 |
||
376 | is the end of the queue, -2 is next to last, and so on). |
||
377 | |||
378 | If no items exists at the specified index (i.e., the queue is empty, or the |
||
379 | index is beyond the number of items on the queue), then C<undef> is returned. |
||
380 | |||
381 | Remember, the returned item is not removed from the queue, so manipulating a |
||
382 | C<peek>ed at reference affects the item on the queue. |
||
383 | |||
384 | =item ->insert(INDEX, LIST) |
||
385 | |||
386 | Adds the list of items to the queue at the specified index position (0 |
||
387 | is the head of the list). Any existing items at and beyond that position are |
||
388 | pushed back past the newly added items: |
||
389 | |||
390 | $q->enqueue(1, 2, 3, 4); |
||
391 | $q->insert(1, qw/foo bar/); |
||
392 | # Queue now contains: 1, foo, bar, 2, 3, 4 |
||
393 | |||
394 | Specifying an index position greater than the number of items in the queue |
||
395 | just adds the list to the end. |
||
396 | |||
397 | Negative index positions are supported: |
||
398 | |||
399 | $q->enqueue(1, 2, 3, 4); |
||
400 | $q->insert(-2, qw/foo bar/); |
||
401 | # Queue now contains: 1, 2, foo, bar, 3, 4 |
||
402 | |||
403 | Specifying a negative index position greater than the number of items in the |
||
404 | queue adds the list to the head of the queue. |
||
405 | |||
406 | =item ->extract() |
||
407 | |||
408 | =item ->extract(INDEX) |
||
409 | |||
410 | =item ->extract(INDEX, COUNT) |
||
411 | |||
412 | Removes and returns the specified number of items (defaults to 1) from the |
||
413 | specified index position in the queue (0 is the head of the queue). When |
||
414 | called with no arguments, C<extract> operates the same as C<dequeue_nb>. |
||
415 | |||
416 | This method is non-blocking, and will return only as many items as are |
||
417 | available to fulfill the request: |
||
418 | |||
419 | $q->enqueue(1, 2, 3, 4); |
||
420 | my $item = $q->extract(2) # Returns 3 |
||
421 | # Queue now contains: 1, 2, 4 |
||
422 | my @items = $q->extract(1, 3) # Returns (2, 4) |
||
423 | # Queue now contains: 1 |
||
424 | |||
425 | Specifying an index position greater than the number of items in the |
||
426 | queue results in C<undef> or an empty list being returned. |
||
427 | |||
428 | $q->enqueue('foo'); |
||
429 | my $nada = $q->extract(3) # Returns undef |
||
430 | my @nada = $q->extract(1, 3) # Returns () |
||
431 | |||
432 | Negative index positions are supported. Specifying a negative index position |
||
433 | greater than the number of items in the queue may return items from the head |
||
434 | of the queue (similar to C<dequeue_nb>) if the count overlaps the head of the |
||
435 | queue from the specified position (i.e. if queue size + index + count is |
||
436 | greater than zero): |
||
437 | |||
438 | $q->enqueue(qw/foo bar baz/); |
||
439 | my @nada = $q->extract(-6, 2); # Returns () - (3+(-6)+2) <= 0 |
||
440 | my @some = $q->extract(-6, 4); # Returns (foo) - (3+(-6)+4) > 0 |
||
441 | # Queue now contains: bar, baz |
||
442 | my @rest = $q->extract(-3, 4); # Returns (bar, baz) - (2+(-3)+4) > 0 |
||
443 | |||
444 | =back |
||
445 | |||
446 | =head1 NOTES |
||
447 | |||
448 | Queues created by L<Thread::Queue> can be used in both threaded and |
||
449 | non-threaded applications. |
||
450 | |||
451 | =head1 LIMITATIONS |
||
452 | |||
453 | Passing objects on queues may not work if the objects' classes do not support |
||
454 | sharing. See L<threads::shared/"BUGS AND LIMITATIONS"> for more. |
||
455 | |||
456 | Passing array/hash refs that contain objects may not work for Perl prior to |
||
457 | 5.10.0. |
||
458 | |||
459 | =head1 SEE ALSO |
||
460 | |||
461 | Thread::Queue Discussion Forum on CPAN: |
||
462 | L<http://www.cpanforum.com/dist/Thread-Queue> |
||
463 | |||
464 | Annotated POD for Thread::Queue: |
||
465 | L<http://annocpan.org/~JDHEDDEN/Thread-Queue-2.11/lib/Thread/Queue.pm> |
||
466 | |||
467 | Source repository: |
||
468 | L<http://code.google.com/p/thread-queue/> |
||
469 | |||
470 | L<threads>, L<threads::shared> |
||
471 | |||
472 | =head1 MAINTAINER |
||
473 | |||
474 | Jerry D. Hedden, S<E<lt>jdhedden AT cpan DOT orgE<gt>> |
||
475 | |||
476 | =head1 LICENSE |
||
477 | |||
478 | This program is free software; you can redistribute it and/or modify it under |
||
479 | the same terms as Perl itself. |
||
480 | |||
481 | =cut |