root/MPI/mpi_iprscan

Revision 283, 26.2 kB (checked in by cholt, 5 days ago)

mpi_iprscan fixes

  • Property svn:executable set to *
Line 
1 #!/usr/bin/perl
2
3 eval 'exec /usr/bin/perl -w -S $0 ${1+"$@"}'
4     if 0; # not running under some shell
5
6 use strict "vars";
7 use strict "refs";
8
9 use FindBin;
10 use lib "$FindBin::Bin/../lib";
11 use lib "$FindBin::Bin/../perl/lib";
12 use vars qw($RANK $LOG $EXE $TMP);
13 use Storable qw(freeze thaw);
14 use threads;
15 use threads::shared;
16
17 BEGIN{
18     #find iprscan
19     $EXE = `which iprscan 2> /dev/null`;
20     chomp $EXE;
21     if (! $EXE || $EXE =~ /^no iprscan/) {
22         die "ERROR: You must have iprscan installed and in you path to use mpi_iprscan\n";
23     }
24
25     #build temp directory
26     my $n = umask();
27     umask(0000);
28     $TMP = "/tmp/iprscanTMP";
29     mkdir($TMP, 07777) if(! -d $TMP);
30     mkdir("$TMP/tmp", 07777) if(! -d "$TMP/tmp");
31     umask($n);
32
33    #what to do on ^C
34     $SIG{'INT'} = sub {
35         print STDERR "\n\nmpi_iprscan aborted by user!!\n\n";
36         my @threads = threads->list;
37         foreach my $thr (@threads){
38             $thr->detach;
39         }
40         exit (1);
41     };
42    
43     #supress warnings from storable module
44     $SIG{'__WARN__'} = sub {
45         warn $_[0] if ( $_[0] !~ /Not a CODE reference/ &&
46                         $_[0] !~ /Can\'t store item CODE/
47                        );
48     };
49    
50     #output to log file of seq that caused rank to die
51     $SIG{'__DIE__'} = sub {
52         if (defined ($LOG) && defined $_[0]) {
53             my $die_count = $LOG->get_die_count();
54             $die_count++;
55             $LOG->add_entry("DIED","RANK",$RANK);
56             $LOG->add_entry("DIED","COUNT",$die_count);
57         }
58         my @threads = threads->list;
59         foreach my $thr (@threads){
60             $thr->detach;
61         }
62        
63         die $_[0];
64     };
65 }
66
67 use Cwd;
68 use Storable;
69 use FileHandle;
70 use File::Copy;
71 use File::Path;
72 use Getopt::Long qw(:config pass_through);
73 use File::Temp qw(tempfile tempdir);
74 use Iterator::Fasta;
75 use Fasta;
76 use ds_utility;
77 use Error qw(:try);
78 use Error::Simple;
79 use Process::IPRchunk;
80 use Process::MpiTiers;
81 use Process::IPRchunk;
82 use Parallel::MPIcar qw(:all);
83 use iprscan::runlog;
84
85 unless($threads::VERSION >= 1.67){
86    die "mpi_iprscan requires threads version 1.67 or greater\n",
87    "You have version ". $threads::VERSION ."\n";
88 }
89
90 #--MPI_Init requires there to be arguments on @ARGV
91 #--This is a logic problem by the Package Authors
92 #--This is a hack to solve the problem
93 if (not @ARGV) {
94     push (@ARGV, 'null');
95     MPI_Init(); #initiate the MPI
96     shift @ARGV;
97 }
98 else {
99     MPI_Init(); #initiate the MPI
100 }
101
102 select((select(STDOUT), $|=1)[0]);  #make STDOUT buffer flush immediately
103
104 #------------------------------------------------------------------------------------                                               
105 #-----------------------------------------MAIN---------------------------------------                                               
106 #------------------------------------------------------------------------------------
107
108 #--set object variables for serialization of data
109 $Storable::forgive_me = 1; #allows serializaion of objects with code refs
110
111 #------INITIATE MPI VARIABLES------
112 my $rank = MPI_Comm_rank(MPI_COMM_WORLD); #my proccess number
113 my $size = MPI_Comm_size(MPI_COMM_WORLD); #how many proccesses
114 $RANK = $rank;
115
116 #MPI SIGNAL CODES
117 #--mpi message tags
118 my $who_I_am       = 1111;
119 my $what_I_want    = 2222;
120 my $result_status  = 3333;
121 my $request_status = 4444;
122 my $c_res_status   = 5555;
123 my $chunk_status   = 6666;
124 my $work_order     = 7777; #generic data tag
125 my $mpi_data       = 8888;
126 my $message_length = 9999;
127
128 #--what_I_want type signals
129 my $need_tier   = 1;
130 my $need_helper = 2;
131 my $have_c_res  = 3;
132 my $need_c_res  = 4;
133
134 #--request_status signals
135 my $wait_as_helper = 1;
136 my $yes_tier       = 2;
137 my $yes_helper     = 3;
138 my $no_helper      = 4;
139 my $go_chunk       = 5;
140 my $reset          = 6;
141 my $terminate      = 0;
142
143 #--results_status signals
144 my $yes_result = 1;
145 my $no_result  = 0;
146
147 #--c_res_status signal
148 my $yes_c_res      = 1;
149 my $no_c_res      = 0;
150
151 #--chunk_status signals
152 my $yes_chunk = 1;
153 my $no_chunk  = 0;
154
155 #---variables for thread and the root node
156 my @c_results;
157 my @failed;
158 my @res_loc;
159 my @helper_stack;
160 my @active;
161 my @chunks : shared;
162 my @returned_chunks :shared;
163 my $t_need_flag :shared;
164 my $t_tier :shared;
165 my $t_tier_result :shared;
166 my $t_chunk :shared;
167 my $t_chunk_result :shared;
168 my $t_terminate :shared;
169
170 #---global variables
171 my %CTL_OPT;
172 my @appl;
173
174 my $root = 0; #define root node (only changed for debugging)
175
176 $CTL_OPT{retry} = 2;
177 $CTL_OPT{datastore} = 1;
178 $CTL_OPT{always_try} = 1;
179 $CTL_OPT{seqtype} = 'p'; #default
180
181 #---Process options on the command line
182 GetOptions("i=s"       => \$CTL_OPT{infile},
183            "o=s"       => \$CTL_OPT{outfile},
184            "appl=s"    => \@appl,
185            "nocrc"     => \$CTL_OPT{nocrc},
186            "seqtype=s" => \$CTL_OPT{seqtype},
187            "trtable=i" => \$CTL_OPT{trtable},
188            "goterms"   => \$CTL_OPT{goterms},
189            "iprlookup" => \$CTL_OPT{iprlookup},
190            "format=s"  => \$CTL_OPT{format},
191            "verbose"   => \$CTL_OPT{verbose},
192            "retry"     => \$CTL_OPT{retry},
193            "chpc"      => \$CTL_OPT{chpc}, #hidden option for retrying
194            "cli"       => \$CTL_OPT{cli}, #just used to strip off the option
195            );
196
197 $main::quiet = 1 unless($CTL_OPT{verbose}); #suppress status messages
198
199 if(! @appl){
200     my $conf = Cwd::abs_path($EXE);
201     $conf =~ s/[^\/]+$//;
202     $conf .= "../conf/iprscan.conf";
203     die "ERROR: Cannot find iprscan.conf\n" if(! -e $conf);
204
205     open(my $IN, "< $conf");
206     my @data = <$IN>;
207     close($IN);
208
209     my ($apps) = grep {/^applications\=/} @data;
210     $apps =~ s/^applications\=//;
211     chomp($apps);
212     @appl = split(',', $apps);
213     die "ERROR: Cannot determine default applications\n" if(! @appl);
214 }
215
216 $CTL_OPT{appl} = \@appl; #apply apps to CTL_OPT
217
218 #build out_base and out_name for datastore
219 $CTL_OPT{out_name} = $CTL_OPT{infile};
220 $CTL_OPT{out_name} =~ /([^\/]+)$/;
221 $CTL_OPT{out_name} = $1;
222 $CTL_OPT{out_name} =~ s/(.+)\.[^\.]+$/$1/;
223 $CTL_OPT{out_base} = Cwd::cwd()."/$CTL_OPT{out_name}.iprscan.output";
224
225 #build localized iprscan database for each node
226 if($CTL_OPT{chpc} && ! -e "$TMP/iprscan/data/ok"){
227     my $lock = new File::NFSLock("$TMP/.iprscan_lock", 'EX', 1200, 1205);
228    
229     #build local db in /tmp
230     if(! -e "$TMP/iprscan/data/ok" && $lock){
231         #remove old failed directories
232         if(-d "$TMP/iprscan"){
233             move("$TMP/iprscan", "$TMP/old");
234             File::Path::rmtree("$TMP/old");
235         }
236         if(-d "$TMP/test"){
237             move("$TMP/test", "$TMP/old");
238             File::Path::rmtree("$TMP/old");
239         }
240
241         my $free = `df /tmp | grep -v \"Filesystem\" | awk \'{print \$4}\'`;
242         if($free > 16000000){
243             my @tar_db = split (":", $ENV{'TAR_DB'});
244             @tar_db = grep {-d $_} @tar_db;
245            
246             if (@tar_db){
247                 my $db = $tar_db[int(rand(@tar_db))];
248                
249                 my @files = ('latest_match.tar.gz',
250                              'latest_pthr.tar.gz',
251                              'latest_nopthr.tar.gz');
252                
253                 if(-e "$db/$files[0]" &&
254                    -e "$db/$files[1]" &&
255                    -e "$db/$files[1]"
256                    ){
257                    
258                     my $fail;
259                     foreach my $file (@files){
260                         last if($fail);
261                         mkdir("$TMP/test") unless(-d "$TMP/test");
262                         copy("$db/$file", "$TMP/test");
263                         system("cd $TMP/test\n".
264                                "tar -zxvmf $file"
265                                );
266                        
267                         $fail = $?;
268                        
269                         unlink("$TMP/test/$file");
270                     }
271                    
272                     if($fail){
273                         File::Path::rmtree("$TMP/test");
274                          
275                     }
276                     else{
277                         move("$TMP/test/iprscan", "$TMP/iprscan");
278                         File::Path::rmtree("$TMP/test");
279                         system("index_data.pl -p $TMP/iprscan/data -inx -bin");
280                         system("touch $TMP/iprscan/data/ok");
281                     }
282                 }
283             }
284         }
285     }
286    
287     $lock->unlock if($lock);
288 }
289
290 #--------------------------------------
291 #---------PRIMARY MPI PROCCESS---------
292 #--------------------------------------
293
294 #--check if root node
295 if ($rank == $root){
296     try{
297         #---test command line  options here
298         #test input files existance
299         if($CTL_OPT{infile} && ! -e $CTL_OPT{infile}){
300             die "ERROR: The input file \'$CTL_OPT{infile}\' does not exist.\n";
301         }
302        
303         #let iprscan test all other options
304         my (undef, $tfile) = tempfile(); #empty dummy test file
305         my $exe = "$EXE -cli";
306         my $command = "$exe " . join(' ', @ARGV);
307         $command .= " -nocrc" if($CTL_OPT{nocrc});
308         $command .= " -seqtype $CTL_OPT{seqtype}" if(defined $CTL_OPT{seqtype});
309         $command .= " -trtable $CTL_OPT{trtable}" if(defined $CTL_OPT{trtable});
310         $command .= " -goterms" if($CTL_OPT{goterms});
311         $command .= " -iprlookup" if($CTL_OPT{iprlookup});
312         $command .= " -format $CTL_OPT{format}" if(defined $CTL_OPT{format});
313         $command .= " -verbose" if($CTL_OPT{verbose});
314         $command .= " -appl " . join(" -appl ", @{$CTL_OPT{appl}}) if(@{$CTL_OPT{appl}});
315         $command .= " -i $tfile" if($CTL_OPT{infile}); #test options on dummy file
316         open(my $PAR, "$command 2>&1 |");
317         my @err = <$PAR>;
318         close($PAR);
319         unlink($tfile);
320        
321         #report errors from iprscan
322         if(! grep {/Error\: Unable to read sequence/} @err){
323             foreach (@err){
324                 $_ =~ s/\/.*iprscan\s+\-cli/mpi_iprscan/;
325             }
326            
327             die join('', @err);
328         }
329     }
330     catch Error::Simple with{
331         my $E = shift;
332         print STDERR $E->{-text};
333         my $code = 2;
334         $code = $E->{-value} if (defined($E->{-value}));
335        
336         exit($code);
337     };   
338
339     #====ACTUAL MPI COMMUNICATION
340     #---main code for distribution of mpi data starts here
341     my $DS_CTL = new ds_utility(\%CTL_OPT);
342     my $iterator = new Iterator::Fasta($CTL_OPT{infile});
343     unlink($CTL_OPT{outfile}) if(-e $CTL_OPT{outfile}); #clear preexisting outfile
344
345     #thread for root node to do other things than just manage mpi
346     my $thr = threads->create(\&node_thread);
347     my $go_mpi_status = 1;
348     $t_need_flag = ($main::no_thread) ? 0 : 1; #set to true for threads false for no threads
349     
350     while($go_mpi_status){
351         #====INTERNAL TIER THREAD
352         #check on results from internal thread
353         if (defined($t_tier_result)){
354             my $t_res = ${thaw($t_tier_result)};
355             $t_tier_result = undef;
356             $active[$root] = 0;
357            
358             $DS_CTL->add_entry($t_res->{-DS});
359            
360             if ($t_res->{-failed}){
361                 push(@failed, $t_res->{-fasta});
362             }
363         }
364         if (defined($t_chunk_result)){
365             my $chunk =  ${thaw($t_chunk_result)};
366             $t_chunk_result = undef;
367             my $id = $chunk->id();
368             ($id) = split (":", $id);
369             push (@{$c_results[$id]}, $chunk);
370             unshift (@{$res_loc[$id]}, $root);
371         }
372        
373         #see if there are chunks to get from the internal thread
374         while((@helper_stack > 0) && (@chunks > 0) && (my $chunk = shift @chunks)){
375             my $helper = shift @helper_stack;
376             $chunk = ${thaw($chunk)};
377            
378             #tell helper node I need help
379             MPI_Send(\$rank, 1,  MPI_INT, $helper, $who_I_am, MPI_COMM_WORLD);
380            
381             #tell helper node a chunk is coming
382             MPI_Send(\$go_chunk, 1, MPI_INT, $helper, $request_status, MPI_COMM_WORLD );
383            
384             #send the chunk
385             MPI_SendII(\$chunk, $helper, $mpi_data, MPI_COMM_WORLD);
386         }
387        
388         #get tier for internal thread
389         if($t_need_flag > 0){
390             my $tier;
391             while (my $fasta = $iterator->nextFasta() || shift @failed){
392                 $tier = Process::MpiTiers->new({fasta     => $fasta,
393                                                 CTL_OPT   => \%CTL_OPT,
394                                                 params    => \@ARGV,
395                                                 iprscan   => "$EXE -cli",
396                                                 DS_CTL    => $DS_CTL},
397                                                $root,
398                                                'Process::IPRchunk'
399                                               );
400                
401                 last if(! $tier->terminated);
402             }
403             if(defined $tier && ! $tier->terminated){
404                 $t_need_flag = 0;
405                 my $t_val = freeze(\$tier);
406                 $t_tier = $t_val;
407                 $active[$root] = 1;
408             }
409             else{
410                 $t_need_flag = 2; #take tier or chunk
411             }
412         }
413        
414        
415         #take a node out of limbo if there are failed contigs
416         if(@helper_stack && @failed){
417             my $helper = shift @helper_stack;
418            
419             #tell helper node who I am
420             MPI_Send(\$rank, 1,  MPI_INT, $helper, $who_I_am, MPI_COMM_WORLD);
421            
422             #tell helper node that no chunk is coming (resets to ask for tier)
423             MPI_Send(\$reset, 1, MPI_INT, $helper, $request_status, MPI_COMM_WORLD );
424         }
425        
426         #work with mpi nodes or skip mpi if all nodes are waiting in limbo
427         if (@helper_stack < $size - 1){
428             my $who;
429             my $what;
430             my $rs_type;
431            
432             #see who asks for a file
433             MPI_Recv(\$who, 1,  MPI_INT, -2, $who_I_am, MPI_COMM_WORLD);
434            
435             #see what the mpi node wants
436             MPI_Recv(\$what, 1, MPI_INT, $who, $what_I_want, MPI_COMM_WORLD);
437            
438             #if the node wants a tier to process, do this
439             if($what == $need_tier){
440                 #receive result status
441                 MPI_Recv(\$rs_type, 1,  MPI_INT, $who, $result_status, MPI_COMM_WORLD);
442                
443                 #get result if available
444                 if($rs_type == $yes_result){
445                     my $result;
446                     MPI_RecvII(\$result, $who, $mpi_data, MPI_COMM_WORLD);
447                     $DS_CTL->add_entry($result->{-DS});
448                    
449                     if ($result->{-failed}){
450                         push(@failed, $result->{-fasta});
451                     }
452                 }
453                
454                 #if a contig is available send tier
455                 my $tier;
456                
457                 while (my $fasta = $iterator->nextFasta() || shift @failed){
458                 $tier = Process::MpiTiers->new({fasta     => $fasta,
459                                                 CTL_OPT   => \%CTL_OPT,
460                                                 params    => \@ARGV,
461                                                 iprscan   => "$EXE -cli",
462                                                 DS_CTL    => $DS_CTL},
463                                                $who,
464                                                'Process::IPRchunk'
465                                               );
466                    
467                     last if(! $tier->terminated);
468                 }
469                 if(defined $tier && ! $tier->terminated){
470                     #say tier is available and send it
471                     MPI_Send(\$yes_tier, 1, MPI_INT, $who, $request_status, MPI_COMM_WORLD);
472                     MPI_SendII(\$tier, $who, $mpi_data, MPI_COMM_WORLD );
473                     $active[$who] = 1;
474                 }
475                 else{
476                     MPI_Send(\$wait_as_helper, 1, MPI_INT, $who, $request_status, MPI_COMM_WORLD);
477                     push(@helper_stack, $who);
478                     $active[$who] = 0;
479                 }
480             }
481             #if the node wants a helper or needs a chunk result, do this
482             elsif($what == $need_helper || $what == $need_c_res){
483                 #--first send c_res_status
484                 # send ids of nodes with chunk results
485                 if(defined ($res_loc[$who])){
486                     MPI_Send(\$yes_c_res, 1, MPI_INT, $who, $c_res_status, MPI_COMM_WORLD);
487                     MPI_SendII(\$res_loc[$who], $who, $mpi_data, MPI_COMM_WORLD);
488                    
489                     my @locs = @{$res_loc[$who]};
490                     $res_loc[$who] = undef;
491                     #if primary node has chunk result to send then send them
492                     while (defined(my $loc = shift @locs)){
493                         if ($loc == $root){
494                             my $res = shift @{$c_results[$who]};
495                             MPI_SendII(\$res, $who, $mpi_data, MPI_COMM_WORLD);
496                         }
497                     }
498                 }
499                 #no one has anything yet
500                 else{
501                     MPI_Send(\$no_c_res, 1, MPI_INT, $who, $c_res_status, MPI_COMM_WORLD);
502                 }
503                 #continue the rest if the node needs a helper
504                 if($what == $need_helper){
505                     #find the number of helpers required
506                     my $num_helpers_req;
507                     MPI_Recv(\$num_helpers_req, 1, MPI_INT, $who, $work_order, MPI_COMM_WORLD);
508                    
509                     #number of secondary node helpers available
510                     my $sec_node_avail = @helper_stack;
511                    
512                     #number of primary node threads available
513                     my $thr_avail = ($t_need_flag == 2 && ! defined $t_chunk) ? 1 : 0;
514                    
515                     #signal that no helpers are available
516                     if($sec_node_avail == 0 && $thr_avail == 0){
517                         MPI_Send(\$no_helper, 1, MPI_INT, $who, $request_status, MPI_COMM_WORLD);
518                     }
519                     else{ #if node helpers are available
520                         #helpers to send
521                         my $helpers = [];
522                        
523                         #secondary node helpers
524                         if($sec_node_avail > 0){
525                             #seperate the helpers
526                             while(@{$helpers} < $num_helpers_req && @helper_stack > 0){
527                                 my $helper = shift @helper_stack;
528                                 push(@{$helpers}, $helper);
529                             }
530                            
531                             $num_helpers_req -= @{$helpers};
532                         }
533                        
534                         #primary node thread helper
535                         my $root_helper_flag = 0;
536                         if ($thr_avail && $num_helpers_req > 0){
537                             my $helper = $root;
538                             #aways make root node first
539                             unshift(@{$helpers}, $helper);
540                             $root_helper_flag = 1;
541                         }
542                        
543                         #say helper is available and send ids of the helpers
544                         MPI_Send(\$yes_helper, 1, MPI_INT, $who, $request_status, MPI_COMM_WORLD);
545                         MPI_SendII(\$helpers, $who, $mpi_data, MPI_COMM_WORLD);
546                        
547                         #take chunk as a helper
548                         if($root_helper_flag){
549                             #see who's one who needs help
550                             my $who2;
551                             MPI_Recv(\$who2, 1,  MPI_INT, $who, $who_I_am, MPI_COMM_WORLD);
552                            
553                             #get go_chunk request_status
554                             my $req_stat;
555                             MPI_Recv(\$req_stat, 1, MPI_INT, $who2, $request_status, MPI_COMM_WORLD );
556                             if ($req_stat == $go_chunk){
557                                 #get the chunk
558                                 my $chnk;
559                                 MPI_RecvII(\$chnk, $who2, $mpi_data, MPI_COMM_WORLD);
560                                
561                                 $t_need_flag = 0;
562                                 $t_chunk = freeze(\$chnk);
563                             }
564                             elsif($req_stat == $reset){
565                                 #do nothing
566                             }
567                             else{
568                                 die "ERROR: Logic error in getting chunk as a helper\n";
569                             }
570                         }
571                     }
572                 }
573             }
574             #if the node has a chunk result, do this
575             elsif($what == $have_c_res){
576                 #get the owner of the result
577                 my $owner;
578                 MPI_Recv(\$owner, 1, MPI_INT, $who, $work_order, MPI_COMM_WORLD);
579                
580                 if($owner == $root){ #if root is owner get result
581                     my $chunk_res;
582                     MPI_RecvII(\$chunk_res, $who, $mpi_data, MPI_COMM_WORLD);
583                     push(@returned_chunks, freeze(\$chunk_res));
584                 }
585                 else{ #take note of owner to tell him he has a result waiting
586                     push(@{$res_loc[$owner]}, $who);
587                 }
588             }
589             #if what the node wants is something else
590             else{
591                 die "ERROR: Invalid request type\n";
592             }
593         }
594         else{ #take a break if mpi was skipped
595             #this keeps the root node from hogging resources if only the thread is active
596             sleep 2;
597         }
598        
599         #see if all contigs are finished
600         $go_mpi_status = 0;
601         foreach my $n (@active ){
602             if(@helper_stack < $size - 1){
603                 $go_mpi_status = 1;
604                 last;
605             }
606             if((defined($n) && $n == 1)){
607                 $go_mpi_status = 1;
608                 last;
609             }
610         }
611         if(! $iterator->finished || @failed > 0){
612             $go_mpi_status = 1;
613         }
614     }
615    
616     #---tell mpi nodes to terminate
617     for(my $i = 1; $i < $size; $i++){
618         #tell chunks waiting for helper who I am
619         MPI_Send(\$rank, 1,  MPI_INT, $i, $who_I_am, MPI_COMM_WORLD);
620        
621         #send termination signal
622         MPI_Send(\$terminate, 1, MPI_INT, $i, $request_status, MPI_COMM_WORLD);
623     }
624    
625     #---release thread
626     $t_terminate = 1; #signals to thread to clean up
627     $thr->detach() unless($thr->is_detached);
628    
629     print STDERR "\n\nmpi_iprscan is now finished!!!\n\n" unless($main::quiet);
630 }
631 #------SECONDARY MPI PROCESSES------
632 else {
633     my $go_mpi_status = 1;
634     my $tier_result;
635     my $tier;
636     my $chunk_result;
637    
638     while ($go_mpi_status) {
639         #tell the  primary process what node it is speaking to
640         MPI_Send(\$rank, 1, MPI_INT, $root, $who_I_am, MPI_COMM_WORLD );
641        
642         #decide what this node needs
643         my $what;
644         my $chunk;
645        
646         if(defined $chunk_result){
647             $what = $have_c_res;
648         }
649         elsif(!defined($tier) || $tier->terminated){
650             $what = $need_tier;
651             if(defined($tier)){
652                 #collect errors and failures if any
653                 $tier_result->{-error} = $tier->error;
654                 $tier_result->{-failed} = $tier->failed;
655                 $tier_result->{-DS} = $tier->DS;
656                 $tier_result->{-fasta} = $tier->fasta if($tier->failed);
657                 $tier = undef;
658             }
659         }
660         elsif(($chunk = $tier->next_chunk) && ($tier->num_chunks > 0)){
661             $what = $need_helper;
662         }
663         else{
664             $what = $need_c_res;
665         }
666        
667         #--tell primary node what this node needs
668         MPI_Send(\$what, 1, MPI_INT, $root, $what_I_want, MPI_COMM_WORLD );
669        
670         #if what I want is a tier do this
671         if($what == $need_tier){
672             #Send result status
673             my $rs_type = (defined($tier_result)) ? $yes_result: $no_result;
674             MPI_Send(\$rs_type, 1, MPI_INT, $root, $result_status, MPI_COMM_WORLD );
675            
676             #Send result if available
677             if($rs_type == $yes_result){
678                 MPI_SendII(\$tier_result, $root, $mpi_data, MPI_COMM_WORLD);
679                 $tier_result = undef;
680             }
681            
682             #get request_status for the tier
683             my $req_status;
684             MPI_Recv(\$req_status, 1, MPI_INT, $root, $request_status, MPI_COMM_WORLD );
685            
686             #get tier and run if it if there is one
687             if($req_status == $yes_tier){
688                 MPI_RecvII(\$tier, $root, $mpi_data, MPI_COMM_WORLD );
689                 $tier->run;
690             } #wait as helper if asked to
691             elsif($req_status == $wait_as_helper){
692                 #see who needs help
693                 my $who;
694                 MPI_Recv(\$who, 1,  MPI_INT, -2, $who_I_am, MPI_COMM_WORLD);
695                
696                 #get request_status for chunk
697                 my $chunk_status;
698                 MPI_Recv(\$chunk_status, 1, MPI_INT, $who, $request_status, MPI_COMM_WORLD );
699                
700                 #if there is a chunk do this
701                 if($chunk_status == $go_chunk){
702                     #get chunk to process
703                     my $chnk;
704                     MPI_RecvII(\$chnk, $who, $mpi_data, MPI_COMM_WORLD);
705                    
706                     #run chunk
707                     $chnk->run($rank);
708                     $chunk_result = $chnk;
709                 }
710                 #if the reset signal is received do this
711                 elsif($chunk_status == $reset){
712                     #do nothing
713                 }
714                 #if the terminate signal is received do this
715                 elsif($chunk_status == $terminate){
716                     $go_mpi_status = 0;
717                     last;
718                 }
719                 else{
720                     die "ERROR: Invalid chunk status signal\n;";
721                 }
722             }
723             else{
724                 die "ERROR: Invalid request status type\n";
725             }
726         } #if what I want is help or the result from a helper do this
727         elsif ($what == $need_helper || $what == $need_c_res){
728             # check c_result_status
729             my $c_res_stat;
730             MPI_Recv(\$c_res_stat, 1, MPI_INT, $root, $c_res_status, MPI_COMM_WORLD);
731            
732             #if there are chunk results, do this
733             my $locs;
734             if($c_res_stat == $yes_c_res){
735                 #get ids of nodes with chunk result
736                 MPI_RecvII(\$locs, $root, $mpi_data, MPI_COMM_WORLD);
737                
738                 #get chunk results from only the root node for now
739                 foreach my $loc (@{$locs}){
740                     next if ($loc != $root);
741                     my $c_res;
742                     MPI_RecvII(\$c_res, $loc, $mpi_data, MPI_COMM_WORLD);
743                     $tier->update_chunk($c_res);
744                 }
745             }
746            
747             #continue the rest if the node needs a helper
748             if ($what == $need_helper){
749                 #send the number of helpers required
750                 my $num_helpers_req = $tier->num_chunks;
751                 MPI_Send(\$num_helpers_req, 1, MPI_INT, $root, $work_order, MPI_COMM_WORLD);
752                
753                 #see if helper is available
754                 my $help_stat;
755                 MPI_Recv(\$help_stat, 1, MPI_INT, $root, $request_status, MPI_COMM_WORLD);
756                
757                 if($help_stat == $yes_helper){
758                     my $helpers;
759                     MPI_RecvII(\$helpers, $root, $mpi_data, MPI_COMM_WORLD);
760                    
761                     #send chunk to helper
762                     foreach my $helper (@{$helpers}){
763                         #say I'm the one who needs help
764                         MPI_Send(\$rank, 1,  MPI_INT, $helper, $who_I_am, MPI_COMM_WORLD);
765                        
766                         #send go_chunk request_status
767                         MPI_Send(\$go_chunk, 1, MPI_INT, $helper, $request_status, MPI_COMM_WORLD);
768                        
769                         #send chunk
770                         my $chnk = $tier->next_chunk;
771                         MPI_SendII(\$chnk, $helper, $mpi_data, MPI_COMM_WORLD);
772                     }
773                 }
774             }
775            
776             #get chunk results from non root nodes since root comm has terminated
777             foreach my $loc (@{$locs}){
778                 next if ($loc == $root);
779                 my $c_res;
780                 MPI_RecvII(\$c_res, $loc, $mpi_data, MPI_COMM_WORLD);
781                 $tier->update_chunk($c_res);
782             }
783            
784             #run the chunk if there is one
785             if(defined($chunk)){
786                 $chunk->run($rank);
787                 $tier->update_chunk($chunk);
788                 $tier->run();
789                 $chunk = undef;
790             }
791         }
792         #if just finished a helper chunk, inform that it is finished
793         elsif($what == $have_c_res){
794             #send the owner id of the result
795             my $owner = $chunk_result->id();
796             ($owner) = split(":", $owner);
797             MPI_Send(\$owner, 1, MPI_INT, $root, $work_order, MPI_COMM_WORLD);
798            
799             #send the result
800             MPI_SendII(\$chunk_result, $owner, $mpi_data, MPI_COMM_WORLD);
801             $chunk_result = undef;
802         }
803     }
804 }
805
806 #---------ALL NODES----------
807 MPI_Finalize(); #terminate MPI
808
809 exit(0);
810
811 #-----------------------------------------------------------------------------
812 #----------------------------------- SUBS ------------------------------------
813 #-----------------------------------------------------------------------------
814 #other things for root node to do
815 #(thread allows root to process tiers like a secondary node)
816 sub node_thread {
817     my $tier;
818     my $chunk;
819    
820     if($main::no_thread){ #pause and return if no thread is needed
821         $t_need_flag = 0;
822         return;
823     }
824
825     $t_need_flag = 1;
826
827     while(not $t_terminate){
828         #load serialized tier into tier
829         if(! defined ($tier) && defined ($t_tier)){
830             $t_need_flag = 0;
831             $tier = ${thaw($t_tier)};
832             $t_tier = undef;
833             next;
834         } #process tier
835         elsif(defined($tier)){
836             #get chunk results from other nodes
837             while(my $res = shift @returned_chunks){
838                 $res = ${thaw($res)};
839                 $tier->update_chunk($res);
840             }
841            
842             #run the tier as far as possible
843             $tier->run;
844            
845             #get all chunks available
846             my $chnk = $tier->next_chunk;
847             while(my $o_chnk = $tier->next_chunk){
848                 $o_chnk = freeze(\$o_chnk);
849                 push (@chunks, $o_chnk);
850             }
851            
852             #run chunks one at a time
853             $chnk->run($rank) if ($chnk);
854             $tier->update_chunk($chnk) if ($chnk);
855             while($chnk = shift @chunks){
856                 $chnk = ${thaw($chnk)};
857                 if($tier->failed){ #skip chunks after failure
858                     $tier->update_chunk($chnk);
859                     next;
860                 }
861                
862                 $chnk->run($rank);
863                 $tier->update_chunk($chnk);
864             }
865            
866             #let tier advance if possible
867             $tier->run();
868
869             #terminate tier, wait, or continue
870             if($tier->terminated){
871                 my $tier_result;
872                 $tier_result->{-error} = $tier->error;
873                 $tier_result->{-failed} = $tier->failed;
874                 $tier_result->{-DS} = $tier->DS;
875                 $tier_result->{-fasta} = $tier->fasta if ($tier->failed);
876                
877                 sleep 1 while (defined ($t_tier_result)); #pause incase result will be overwritten
878                 $t_tier_result = freeze(\$tier_result);
879                 $tier = undef;
880                 $t_need_flag = 1;
881             } #take a break
882             elsif($tier->num_chunks == 0){
883                 #keeps thread from hogging resources while waiting for external results
884                 sleep 1;
885             }
886            
887             next;
888         } #load serialized chunk into chunk
889         elsif(! defined ($chunk) && defined ($t_chunk)){
890             $t_need_flag = 0;
891             $chunk = ${thaw($t_chunk)};
892             $t_chunk = undef;
893             next;
894         } #process chunk
895         elsif(defined($chunk)){
896             $chunk->run($rank);
897             sleep 1 while (defined ($t_chunk_result)); #pause incase result will be overwritten
898             $t_chunk_result = freeze(\$chunk);
899             $chunk = undef;
900             $t_need_flag = 1;
901             next;
902         } #take a break
903         else{
904             #keeps thread form hogging resources when there is nothing to do
905             sleep 1;
906         }
907     }
908 }
909 #----------------------------------------------------------------------------
910 #easy dump of string to a tempfile
911 sub totemp{
912     my $data = shift @_;
913    
914     my ($fh, $name) = tempfile();
915     print $fh $data;
916     close ($fh);
917    
918     return $name;
919 }
920 #----------------------------------------------------------------------------
921 #sends scalar variable contents via serialization
922 #scalar can hold ref to other data structures
923 sub MPI_SendII{
924     my $msg = shift @_;
925     my $target = shift @_;
926     my $tag = shift @_;
927     my $communicator = shift @_;
928    
929     my $send = freeze($msg);
930     my $length = length($send);
931    
932     MPI_Send(\$length, 1, MPI_INT, $target, $message_length, $communicator);
933     MPI_Send(\$send, $length, MPI_CHAR, $target, $tag, $communicator);
934 }
935 #----------------------------------------------------------------------------
936 #receives serialized scalar variable
937 #scalar can hold ref to other data structures
938 sub MPI_RecvII{
939     my $ref = shift @_;
940     my $source = shift @_;
941     my $tag = shift @_;
942     my $communicator = shift @_;
943    
944     my $length;
945     my $recv;
946    
947    
948     MPI_Recv(\$length, 1, MPI_INT, $source, $message_length, $communicator);
949     MPI_Recv(\$recv, $length, MPI_CHAR, $source, $tag, $communicator); #receive line
950     
951     ${$ref} = ${thaw($recv)};
952 }
Note: See TracBrowser for help on using the browser.