root/MPI/mpi_maker

Revision 277, 26.7 kB (checked in by cholt, 3 weeks ago)

attempt to fix exonerate holdover errors

  • Property svn:executable set to *
Line 
1 #!/usr/bin/perl -w
2
3 eval 'exec /usr/bin/perl -w -S $0 ${1+"$@"}'
4     if 0; # not running under some shell
5
6 use strict "vars";
7 use strict "refs";
8
9 use FindBin;
10 use lib "$FindBin::Bin/../lib";
11 use lib "$FindBin::Bin/../perl/lib";
12 use vars qw($RANK $LOG $CMD_ARGS);
13 use Storable qw(freeze thaw);
14 use threads;
15 use threads::shared;
16
17 BEGIN{
18    if (not ($ENV{CGL_SO_SOURCE})) {
19       $ENV{CGL_SO_SOURCE} = "$FindBin::Bin/../lib/CGL/so.obo";
20    }
21    if (not ($ENV{CGL_GO_SOURCE})) {
22       $ENV{CGL_GO_SOURCE} = "$FindBin::Bin/../lib/CGL/gene_ontology.obo"
23    }
24
25    $CMD_ARGS = join(' ', @ARGV);
26
27    #what to do on ^C
28    $SIG{'INT'} = sub {
29       print STDERR "\n\nMaker aborted by user!!\n\n";
30       my @threads = threads->list;
31       foreach my $thr (@threads){
32          $thr->detach;
33       }
34       exit (1);
35    };
36
37    #supress warnings from storable module
38    $SIG{'__WARN__'} = sub {
39       warn $_[0] if ( $_[0] !~ /Not a CODE reference/ &&
40                       $_[0] !~ /Can\'t store item CODE/
41                     );
42    };
43
44    #output to log file of seq that caused rank to die
45    $SIG{'__DIE__'} =
46    sub {
47       if (defined ($LOG) && defined $_[0]) {
48          my $die_count = $LOG->get_die_count();
49          $die_count++;
50
51          $LOG->add_entry("DIED","RANK",$RANK);
52          $LOG->add_entry("DIED","COUNT",$die_count);
53       }
54
55       my @threads = threads->list;
56       foreach my $thr (@threads){
57          $thr->detach;
58       }
59
60       die "#----------------------\n",
61           "FATAL: failed!!\n",
62           "#----------------------\n",
63           $_[0] . "\n";
64    };
65 }
66
67 use Cwd;
68 use Storable;
69 use FileHandle;
70 use File::Path;
71 use Getopt::Long qw(:config no_ignore_case);
72 use File::Temp qw(tempfile tempdir);
73 use Bio::DB::Fasta;
74 use GI;
75 use Dumper::GFF::GFFV3;
76 use Iterator::Any;
77 use Iterator::Fasta;
78 use Iterator::GFF3;
79 use Fasta;
80 use FastaChunker;
81 use maker::auto_annotator;
82 use cluster;
83 use repeat_mask_seq;
84 use runlog;
85 use ds_utility;
86 use GFFDB;
87 use Error qw(:try);
88 use Error::Simple;
89 use Process::MpiChunk;
90 use Process::MpiTiers;
91 use Parallel::MPIcar qw(:all);
92
93 unless($threads::VERSION >= 1.67){
94    die "mpi_maker requires threads version 1.67 or greater\n",
95        "You have version ". $threads::VERSION ."\n";
96 }
97
98 #--MPI_Init requires there to be arguments on @ARGV
99 #--This is a logic problem by the Package Authors
100 #--This is a hack to solve the problem
101 if (not @ARGV) {
102    push (@ARGV, 'null');
103    MPI_Init();                  #initiate the MPI
104    shift @ARGV;
105 }
106 else {
107    MPI_Init();                  #initiate the MPI
108 }
109
110 $| = 1;
111
112 my $usage = "
113 Usage:
114
115      mpi_maker [options] <maker_opts> <maker_bopts> <maker_exe>
116
117      Maker is a program that produces gene annotations in GFF3 file format using
118      evidence such as EST alignments and protein homology.  Maker can be used to
119      produce gene annotations for new genomes as well as update annoations from
120      existing genome databases.
121
122      The four input arguments are user control files that specify how maker
123      should behave. The evaluator options file contains control options specific
124      for the evaluation of gene annotations. All options for maker should be set
125      in the control files, but a few can also be set on the command line.
126      Command line options provide a convenient machanism to override commonly
127      altered control file values.
128
129      Input files listed in the control options files must be in fasta format.
130      Please see maker documentation to learn more about control file
131      configuration.  Maker will automatically try and locate the user control
132      files in the current working directory if these arguments are not supplied
133      when initializing maker.
134
135      It is important to note that maker does not try and recalculated data that
136      it has already calculated.  For example, if you run an analysis twice on
137      the same dataset file you will notice that maker does not rerun any of the
138      blast analyses, but instead uses the blast analyses stored from the
139      previous run.  To force maker to rerun all analyses, use the -f flag.
140
141
142 Options:
143
144      -genome|g <filename> Specify the genome file.
145
146      -predictor|p <type>  Selects the predictor(s) to use when building
147                           annotations.  Defines a pool of gene models for
148                           annotation selection.
149
150                           types: snap
151                                  augustus
152                                  fgenesh
153                                  genemark
154                                  est2genome (Uses EST's directly)
155                                  model_gff (Pass through GFF3 annotations)
156                                  pred_gff (Uses passed through GFF3 predictions)
157
158                           Use a ',' to seperate types (nospaces)
159                           i.e. -predictor=snap,augustus,fgenesh
160
161      -RM_off|R           Turns all repeat masking off.
162
163      -retry|r <integer>  Rerun failed contigs up to the specified count.
164
165      -cpus|c  <integer>  Tells how many cpus to use for BLAST analysis.
166
167      -force|f            Forces maker to delete old files before running again.
168                          This will require all blast analyses to be rerun.
169
170      -again|a            Caculate all annotations and output files again even if
171                          no settings have changed.
172
173      -evaluate|e         Run Evaluator on final annotations (under development).
174
175      -quiet|q            Silences most of maker's status messages.
176
177      -CTL                Generate empty control files in the current directory.
178
179      -help|?             Prints this usage statement.
180
181
182 ";
183
184 #-------------------------------------------------------------------------------
185 #------------------------------------ MAIN -------------------------------------
186 #-------------------------------------------------------------------------------
187
188 #--set object variables for serialization of data
189 $Storable::forgive_me = 1; #allows serializaion of objects with code refs
190
191 #------INITIATE MPI VARIABLES------
192 my $rank = MPI_Comm_rank(MPI_COMM_WORLD); #my proccess number
193 my $size = MPI_Comm_size(MPI_COMM_WORLD); #how many proccesses
194 $RANK = $rank;
195
196 #MPI SIGNAL CODES
197 #--mpi message tags
198 my $who_I_am       = 1111;
199 my $what_I_want    = 2222;
200 my $result_status  = 3333;
201 my $request_status = 4444;
202 my $c_res_status   = 5555;
203 my $chunk_status   = 6666;
204 my $work_order     = 7777; #generic data tag
205 my $mpi_data       = 8888;
206 my $message_length = 9999;
207
208 #--what_I_want type signals
209 my $need_tier   = 1;
210 my $need_helper = 2;
211 my $have_c_res  = 3;
212 my $need_c_res  = 4;
213
214 #--request_status signals
215 my $wait_as_helper = 1;
216 my $yes_tier       = 2;
217 my $yes_helper     = 3;
218 my $no_helper      = 4;
219 my $go_chunk       = 5;
220 my $reset          = 6;
221 my $terminate      = 0;
222
223 #--results_status signals
224 my $yes_result = 1;
225 my $no_result  = 0;
226
227 #--c_res_status signal
228 my $yes_c_res      = 1;
229 my $no_c_res      = 0;
230
231 #--chunk_status signals
232 my $yes_chunk = 1;
233 my $no_chunk  = 0;
234
235 #---variables for thread and the root node
236 my @c_results;
237 my @failed;
238 my @res_loc;
239 my @helper_stack;
240 my @active;
241 my @chunks : shared;
242 my @returned_chunks :shared;
243 my $t_need_flag :shared;
244 my $t_tier :shared;
245 my $t_tier_result :shared;
246 my $t_chunk :shared;
247 my $t_chunk_result :shared;
248 my $t_terminate :shared;
249
250 #---global variables
251 my %OPT;
252 my $root = 0; #define root node (only changed for debugging)
253
254 #---Process options on the command line
255 try{
256     GetOptions("RM_off|R" => \$OPT{R},
257                "force|f" => \$OPT{force},
258                "genome|g=s" => \$OPT{genome},
259                "cpus|c=i" => \$OPT{cpus},
260                "predictor=s" =>\$OPT{predictor},
261                "retry=i" =>\$OPT{retry},
262                "evaluate" =>\$OPT{evaluate},
263                "again|a" =>\$OPT{again},
264                "quiet" =>\$main::quiet,
265                "no_thread" =>\$main::no_thread,
266                "CTL" => sub {GI::generate_control_files() if($rank == $root); MPI_Finalize(); exit(0);},
267                "help|?" => sub {print $usage if($rank == $root); MPI_Finalize(); exit(0)}
268                );
269 }
270 catch Error::Simple with{
271     my $E = shift;
272
273     print STDERR $E->{-text};
274     die "\n\nMaker failed parsing command line options!!\n\n";
275 };
276
277 #--------------------------------------
278 #---------PRIMARY MPI PROCCESS---------
279 #--------------------------------------
280
281 #--check if root node
282 if ($rank == $root) {
283    #varibles that are persistent outside of try
284    my %CTL_OPT;
285    my $iterator;
286    my $DS_CTL;
287    my $GFF_DB;
288    my $build;
289
290    try{
291       #get arguments off the command line
292       my @ctlfiles = @ARGV;
293
294       if (not @ctlfiles) {
295          if (-e "maker_opts.ctl" &&
296              -e "maker_bopts.ctl" &&
297              -e "maker_exe.ctl"
298             ) {
299
300             @ctlfiles = ("maker_opts.ctl",
301                          "maker_bopts.ctl",
302                          "maker_exe.ctl"
303                         );
304          }
305          else {
306             print STDERR  "ERROR: Control files not found\n";
307             print $usage;
308             exit(0);
309          }
310       }
311
312       #--Control file processing
313
314       #set up control options from control files
315       %CTL_OPT = GI::load_control_files(\@ctlfiles, \%OPT, $size);
316
317       #--open datastructure controller
318       $DS_CTL = ds_utility->new(\%CTL_OPT);
319
320       #--set up gff database
321       $GFF_DB = new GFFDB(\%CTL_OPT);
322       $build = $GFF_DB->next_build;
323
324       #---load genome multifasta/GFF3 file
325       $iterator = new Iterator::Any( -fasta => $CTL_OPT{'genome'},
326                                      -gff => $CTL_OPT{'genome_gff'},
327                                    );
328    }
329    catch Error::Simple with{
330       my $E = shift;
331       print STDERR $E->{-text};
332       print STDERR "\n\nMaker failed while examining startup data\n",
333                    "(control files and input fasta files)!!\n\n";
334       my $code = 2;
335       $code = $E->{-value} if (defined($E->{-value}));
336
337       exit($code);
338    };
339
340    #build indexes of databases
341    #Shared_Functions::build_all_indexes($CTL_OPT{old_protein},
342    #                                   $CTL_OPT{old_est}
343    #                                  );
344
345    #====ACTUAL MPI COMMUNICATION
346
347    #---main code for distribution of mpi data starts here
348
349    #thread for root node to do other things than just manage mpi
350    my $thr = threads->create(\&node_thread);
351    my $go_mpi_status = 1;
352    $t_need_flag = ($main::no_thread) ? 0 : 1; #set to true for threads false for no threads
353
354    while($go_mpi_status){
355       #====INTERNAL TIER THREAD
356       #check on results from internal thread
357       if (defined($t_tier_result)){
358          my $t_res = ${thaw($t_tier_result)};
359          $t_tier_result = undef;
360          $active[$root] = 0;
361
362          $DS_CTL->add_entry($t_res->{-DS});
363
364          if ($t_res->{-failed}){
365             push(@failed, $t_res->{-fasta});
366          }
367       }
368       if (defined($t_chunk_result)){
369          my $chunk =  ${thaw($t_chunk_result)};
370          $t_chunk_result = undef;
371          my $id = $chunk->id();
372          ($id) = split (":", $id);
373          push (@{$c_results[$id]}, $chunk);
374          unshift (@{$res_loc[$id]}, $root);
375       }
376
377       #see if there are chunks to get from the internal thread
378       while((@helper_stack > 0) && (@chunks > 0) && (my $chunk = shift @chunks)){
379          my $helper = shift @helper_stack;
380          $chunk = ${thaw($chunk)};
381
382          #tell helper node I need help
383          MPI_Send(\$rank, 1,  MPI_INT, $helper, $who_I_am, MPI_COMM_WORLD);
384
385          #tell helper node a chunk is coming
386          MPI_Send(\$go_chunk, 1, MPI_INT, $helper, $request_status, MPI_COMM_WORLD );
387
388          #send the chunk
389          MPI_SendII(\$chunk, $helper, $mpi_data, MPI_COMM_WORLD);
390       }
391
392       #get tier for internal thread
393       if($t_need_flag > 0){
394          my $tier;
395          while (my $fasta = $iterator->nextFasta() || shift @failed){
396             $tier = Process::MpiTiers->new({fasta =>$fasta,
397                                             CTL_OPT => \%CTL_OPT,
398                                             DS_CTL  => $DS_CTL,
399                                             GFF_DB  => $GFF_DB,
400                                             build   => $build},
401                                            $root,
402                                            'Process::MpiChunk'
403                                           );
404
405             last if(! $tier->terminated);
406          }
407          if(defined $tier && ! $tier->terminated){
408             $t_need_flag = 0;
409             my $t_val = freeze(\$tier);
410             $t_tier = $t_val;
411             $active[$root] = 1;
412          }
413          else{
414             $t_need_flag = 2; #take tier or chunk
415          }
416       }
417
418
419       #take a node out of limbo if there are failed contigs
420       if(@helper_stack && @failed){
421          my $helper = shift @helper_stack;
422
423          #tell helper node who I am
424          MPI_Send(\$rank, 1,  MPI_INT, $helper, $who_I_am, MPI_COMM_WORLD);
425
426          #tell helper node that no chunk is coming (resets to ask for tier)
427          MPI_Send(\$reset, 1, MPI_INT, $helper, $request_status, MPI_COMM_WORLD );
428       }
429
430
431       #work with mpi nodes or skip mpi if all nodes are waiting in limbo
432       if (@helper_stack < $size - 1){
433          my $who;
434          my $what;
435          my $rs_type;
436
437          #see who asks for a file
438          MPI_Recv(\$who, 1,  MPI_INT, -2, $who_I_am, MPI_COMM_WORLD);
439
440          #see what the mpi node wants
441          MPI_Recv(\$what, 1, MPI_INT, $who, $what_I_want, MPI_COMM_WORLD);
442
443          #if the node wants a tier to process, do this
444          if($what == $need_tier){
445             #receive result status
446             MPI_Recv(\$rs_type, 1,  MPI_INT, $who, $result_status, MPI_COMM_WORLD);
447
448             #get result if available
449             if($rs_type == $yes_result){
450                my $result;
451                MPI_RecvII(\$result, $who, $mpi_data, MPI_COMM_WORLD);
452                $DS_CTL->add_entry($result->{-DS});
453
454                if ($result->{-failed}){
455                   push(@failed, $result->{-fasta});
456                }
457             }
458
459             #if a contig is available send tier
460             my $tier;
461
462             while (my $fasta = $iterator->nextFasta() || shift @failed){
463                $tier = Process::MpiTiers->new({fasta => $fasta,
464                                                CTL_OPT => \%CTL_OPT,
465                                                DS_CTL  => $DS_CTL,
466                                                GFF_DB  => $GFF_DB,
467                                                build   => $build},
468                                               $who,
469                                               'Process::MpiChunk'
470                                              );
471
472                last if(! $tier->terminated);
473             }
474             if(defined $tier && ! $tier->terminated){
475                #say tier is available and send it
476                MPI_Send(\$yes_tier, 1, MPI_INT, $who, $request_status, MPI_COMM_WORLD);
477                MPI_SendII(\$tier, $who, $mpi_data, MPI_COMM_WORLD );
478                $active[$who] = 1;
479             }
480             else{
481                MPI_Send(\$wait_as_helper, 1, MPI_INT, $who, $request_status, MPI_COMM_WORLD);
482                push(@helper_stack, $who);
483                $active[$who] = 0;
484             }
485          }
486          #if the node wants a helper or needs a chunk result, do this
487          elsif($what == $need_helper || $what == $need_c_res){
488             #--first send c_res_status
489             # send ids of nodes with chunk results
490             if(defined ($res_loc[$who])){
491                MPI_Send(\$yes_c_res, 1, MPI_INT, $who, $c_res_status, MPI_COMM_WORLD);
492                MPI_SendII(\$res_loc[$who], $who, $mpi_data, MPI_COMM_WORLD);
493
494                my @locs = @{$res_loc[$who]};
495                $res_loc[$who] = undef;
496
497                #if primary node has chunk result to send then send them
498                while (defined(my $loc = shift @locs)){
499                   if ($loc == $root){
500                      my $res = shift @{$c_results[$who]};
501                      MPI_SendII(\$res, $who, $mpi_data, MPI_COMM_WORLD);
502                   }
503                }
504             }
505             #no one has anything yet
506             else{
507                MPI_Send(\$no_c_res, 1, MPI_INT, $who, $c_res_status, MPI_COMM_WORLD);
508             }
509
510             #continue the rest if the node needs a helper
511             if($what == $need_helper){
512                #find the number of helpers required
513                my $num_helpers_req;
514                MPI_Recv(\$num_helpers_req, 1, MPI_INT, $who, $work_order, MPI_COMM_WORLD);
515
516                #number of secondary node helpers available
517                my $sec_node_avail = @helper_stack;
518
519                #number of primary node threads available
520                my $thr_avail = ($t_need_flag == 2 && ! defined $t_chunk) ? 1 : 0;
521
522                #signal that no helpers are available
523                if($sec_node_avail == 0 && $thr_avail == 0){
524                   MPI_Send(\$no_helper, 1, MPI_INT, $who, $request_status, MPI_COMM_WORLD);
525                }
526                else{#if node helpers are available
527                   #helpers to send
528                   my $helpers = [];
529
530                   #secondary node helpers
531                   if($sec_node_avail > 0){
532                      #seperate the helpers
533                      while(@{$helpers} < $num_helpers_req && @helper_stack > 0){
534                         my $helper = shift @helper_stack;
535                         push(@{$helpers}, $helper);
536                      }
537
538                      $num_helpers_req -= @{$helpers};
539                   }
540
541                   #primary node thread helper
542                   my $root_helper_flag = 0;
543                   if ($thr_avail && $num_helpers_req > 0){
544                      my $helper = $root;
545                      #aways make root node first
546                      unshift(@{$helpers}, $helper);
547                      $root_helper_flag = 1;
548                   }
549
550                   #say helper is available and send ids of the helpers
551                   MPI_Send(\$yes_helper, 1, MPI_INT, $who, $request_status, MPI_COMM_WORLD);
552                   MPI_SendII(\$helpers, $who, $mpi_data, MPI_COMM_WORLD);
553
554                   #take chunk as a helper
555                   if($root_helper_flag){
556                      #see who's one who needs help
557                      my $who2;
558                      MPI_Recv(\$who2, 1,  MPI_INT, $who, $who_I_am, MPI_COMM_WORLD);
559
560                      #get go_chunk request_status
561                      my $req_stat;
562                      MPI_Recv(\$req_stat, 1, MPI_INT, $who2, $request_status, MPI_COMM_WORLD );
563                      if ($req_stat == $go_chunk){
564                         #get the chunk
565                         my $chnk;
566                         MPI_RecvII(\$chnk, $who2, $mpi_data, MPI_COMM_WORLD);
567
568                         $t_need_flag = 0;
569                         $t_chunk = freeze(\$chnk);
570                      }
571                      elsif($req_stat == $reset){
572                         #do nothing
573                      }
574                      else{
575                         die "ERROR: Logic error in getting chunk as a helper\n";
576                      }
577                   }
578                }
579             }
580          }
581          #if the node has a chunk result, do this
582          elsif($what == $have_c_res){
583             #get the owner of the result
584             my $owner;
585             MPI_Recv(\$owner, 1, MPI_INT, $who, $work_order, MPI_COMM_WORLD);
586
587             if($owner == $root){#if root is owner get result
588                my $chunk_res;
589                MPI_RecvII(\$chunk_res, $who, $mpi_data, MPI_COMM_WORLD);
590                push(@returned_chunks, freeze(\$chunk_res));
591             }
592             else{#take note of owner to tell him he has a result waiting
593                push(@{$res_loc[$owner]}, $who);
594             }
595          }
596          #if what the node wants is something else
597          else{
598             die "ERROR: Invalid request type\n";
599          }
600       }
601       else{ #take a break if mpi was skipped
602          #this keeps the root node from hogging resources if only the thread is active
603          sleep 2;
604       }
605
606       #see if all contigs are finished
607       $go_mpi_status = 0;
608       foreach my $n (@active ){
609          if(@helper_stack < $size - 1){
610             $go_mpi_status = 1;
611             last;
612          }
613          if((defined($n) && $n == 1)){
614             $go_mpi_status = 1;
615             last;
616          }
617       }
618       if(! $iterator->finished || @failed > 0){
619           $go_mpi_status = 1;
620       }
621    }
622
623    #---tell mpi nodes to terminate
624    for(my $i = 1; $i < $size; $i++){
625       #tell chunks waiting for helper who I am
626       MPI_Send(\$rank, 1,  MPI_INT, $i, $who_I_am, MPI_COMM_WORLD);
627
628       #send termination signal
629       MPI_Send(\$terminate, 1, MPI_INT, $i, $request_status, MPI_COMM_WORLD);
630    }
631
632    #---release thread
633    $t_terminate = 1; #signals to thread to clean up
634    $thr->detach() unless($thr->is_detached);
635
636    print STDERR "\n\nMaker is now finished!!!\n\n";
637 }
638 #------SECONDARY MPI PROCESSES------
639 else {
640    my $go_mpi_status = 1;
641    my $tier_result;
642    my $tier;
643    my $chunk_result;
644
645    while ($go_mpi_status) {
646       #tell the  primary process what node it is speaking to
647       MPI_Send(\$rank, 1, MPI_INT, $root, $who_I_am, MPI_COMM_WORLD );
648
649       #decide what this node needs
650       my $what;
651       my $chunk;
652
653       if(defined $chunk_result){
654          $what = $have_c_res;
655       }
656       elsif(!defined($tier) || $tier->terminated){
657          $what = $need_tier;
658          if(defined($tier)){
659             #collect errors and failures if any
660             $tier_result->{-error} = $tier->error;
661             $tier_result->{-failed} = $tier->failed;
662             $tier_result->{-DS} = $tier->DS;
663             $tier_result->{-fasta} = $tier->fasta if($tier->failed);
664             $tier = undef;
665          }
666       }
667       elsif(($chunk = $tier->next_chunk) && ($tier->num_chunks > 0)){
668          $what = $need_helper;
669       }
670       else{
671          $what = $need_c_res;
672       }
673
674       #--tell primary node what this node needs
675       MPI_Send(\$what, 1, MPI_INT, $root, $what_I_want, MPI_COMM_WORLD );
676
677       #if what I want is a tier do this
678       if($what == $need_tier){
679          #Send result status
680          my $rs_type = (defined($tier_result)) ? $yes_result: $no_result;
681          MPI_Send(\$rs_type, 1, MPI_INT, $root, $result_status, MPI_COMM_WORLD );
682
683          #Send result if available
684          if($rs_type == $yes_result){
685             MPI_SendII(\$tier_result, $root, $mpi_data, MPI_COMM_WORLD);
686             $tier_result = undef;
687          }
688
689          #get request_status for the tier
690          my $req_status;
691          MPI_Recv(\$req_status, 1, MPI_INT, $root, $request_status, MPI_COMM_WORLD );
692
693          #get tier and run if it if there is one
694          if($req_status == $yes_tier){
695             MPI_RecvII(\$tier, $root, $mpi_data, MPI_COMM_WORLD );
696             $tier->run;
697          }#wait as helper if asked to
698          elsif($req_status == $wait_as_helper){
699             #see who needs help
700             my $who;
701             MPI_Recv(\$who, 1,  MPI_INT, -2, $who_I_am, MPI_COMM_WORLD);
702
703             #get request_status for chunk
704             my $chunk_status;
705             MPI_Recv(\$chunk_status, 1, MPI_INT, $who, $request_status, MPI_COMM_WORLD );
706
707             #if there is a chunk do this
708             if($chunk_status == $go_chunk){
709                #get chunk to process
710                my $chnk;
711                MPI_RecvII(\$chnk, $who, $mpi_data, MPI_COMM_WORLD);
712
713                #run chunk
714                $chnk->run($rank);
715                $chunk_result = $chnk;
716             }
717             #if the reset signal is received do this
718             elsif($chunk_status == $reset){
719                #do nothing
720             }
721             #if the terminate signal is received do this
722             elsif($chunk_status == $terminate){
723                   $go_mpi_status = 0;
724                   last;
725             }
726             else{
727                die "ERROR: Invalid chunk status signal\n;";
728             }
729          }
730          else{
731             die "ERROR: Invalid request status type\n";
732          }
733       }#if what I want is help or the result from a helper do this
734       elsif ($what == $need_helper || $what == $need_c_res){
735          # check c_result_status
736          my $c_res_stat;
737          MPI_Recv(\$c_res_stat, 1, MPI_INT, $root, $c_res_status, MPI_COMM_WORLD);
738
739          #if there are chunk results, do this
740          my $locs;
741          if($c_res_stat == $yes_c_res){
742             #get ids of nodes with chunk result
743             MPI_RecvII(\$locs, $root, $mpi_data, MPI_COMM_WORLD);
744
745             #get chunk results from only the root node for now
746             foreach my $loc (@{$locs}){
747                next if ($loc != $root);
748                my $c_res;
749                MPI_RecvII(\$c_res, $loc, $mpi_data, MPI_COMM_WORLD);
750                $tier->update_chunk($c_res);
751             }
752          }
753
754          #continue the rest if the node needs a helper
755          if ($what == $need_helper){
756             #send the number of helpers required
757             my $num_helpers_req = $tier->num_chunks;
758             MPI_Send(\$num_helpers_req, 1, MPI_INT, $root, $work_order, MPI_COMM_WORLD);
759
760             #see if helper is available
761             my $help_stat;
762             MPI_Recv(\$help_stat, 1, MPI_INT, $root, $request_status, MPI_COMM_WORLD);
763
764             if($help_stat == $yes_helper){
765                my $helpers;
766                MPI_RecvII(\$helpers, $root, $mpi_data, MPI_COMM_WORLD);
767
768                #send chunk to helper
769                foreach my $helper (@{$helpers}){
770                   #say I'm the one who needs help
771                   MPI_Send(\$rank, 1,  MPI_INT, $helper, $who_I_am, MPI_COMM_WORLD);
772
773                   #send go_chunk request_status
774                   MPI_Send(\$go_chunk, 1, MPI_INT, $helper, $request_status, MPI_COMM_WORLD);
775
776                   #send chunk
777                   my $chnk = $tier->next_chunk;
778                   MPI_SendII(\$chnk, $helper, $mpi_data, MPI_COMM_WORLD);
779                }
780             }
781          }
782
783          #get chunk results from non root nodes since root comm has terminated
784          foreach my $loc (@{$locs}){
785             next if ($loc == $root);
786             my $c_res;
787             MPI_RecvII(\$c_res, $loc, $mpi_data, MPI_COMM_WORLD);
788             $tier->update_chunk($c_res);
789          }
790
791          #run the chunk if there is one
792          if(defined($chunk)){
793             $chunk->run($rank);
794             $tier->update_chunk($chunk);
795             $tier->run();
796             $chunk = undef;
797          }
798       }
799       #if just finished a helper chunk, inform that it is finished
800       elsif($what == $have_c_res){
801          #send the owner id of the result
802          my $owner = $chunk_result->id();
803          ($owner) = split(":", $owner);
804          MPI_Send(\$owner, 1, MPI_INT, $root, $work_order, MPI_COMM_WORLD);
805
806          #send the result
807          MPI_SendII(\$chunk_result, $owner, $mpi_data, MPI_COMM_WORLD);
808          $chunk_result = undef;
809       }
810    }
811 }
812
813 #---------ALL NODES----------
814 MPI_Finalize();                 #terminate MPI
815
816 exit(0);
817
818 #-----------------------------------------------------------------------------
819 #----------------------------------- SUBS ------------------------------------
820 #-----------------------------------------------------------------------------
821 #other things for root node to do
822 #(thread allows root to process tiers like a secondary node)
823 sub node_thread {
824    my $tier;
825    my $chunk;
826
827    if($main::no_thread){ #pause and return if no thread is needed
828        $t_need_flag = 0;
829        return;
830    }
831
832    $t_need_flag = 1;
833
834    while(not $t_terminate){
835       #load serialized tier into tier
836       if(! defined ($tier) && defined ($t_tier)){
837          $t_need_flag = 0;
838          $tier = ${thaw($t_tier)};
839          $t_tier = undef;
840          next;
841       }#process tier
842       elsif(defined($tier)){
843          #get chunk results from other nodes
844          while(my $res = shift @returned_chunks){
845             $res = ${thaw($res)};
846             $tier->update_chunk($res);
847          }
848
849          #run the tier as far as possible
850          $tier->run;
851
852          #get all chunks available
853          my $chnk = $tier->next_chunk;
854          while(my $o_chnk = $tier->next_chunk){
855             $o_chnk = freeze(\$o_chnk);
856             push (@chunks, $o_chnk);
857          }
858
859          #run chunks one at a time
860          $chnk->run($rank) if ($chnk);
861          $tier->update_chunk($chnk) if ($chnk);
862          while($chnk = shift @chunks){
863             $chnk = ${thaw($chnk)};
864             if($tier->failed){ #skip chunks after failure
865                 $tier->update_chunk($chnk);
866                 next;
867             }
868
869             $chnk->run($rank);
870             $tier->update_chunk($chnk);
871          }
872
873          #let tier advance if possible
874          $tier->run();
875
876          #terminate tier, wait, or continue
877          if($tier->terminated){
878             my $tier_result;
879             $tier_result->{-error} = $tier->error;
880             $tier_result->{-failed} = $tier->failed;
881             $tier_result->{-DS} = $tier->DS;
882             $tier_result->{-fasta} = $tier->fasta if ($tier->failed);
883
884             sleep 1 while (defined ($t_tier_result)); #pause incase result will be overwritten
885             $t_tier_result = freeze(\$tier_result);
886             $tier = undef;
887             $t_need_flag = 1;
888          }#take a break
889          elsif($tier->num_chunks == 0){
890             #keeps thread from hogging resources while waiting for external results
891             sleep 1;
892          }
893
894          next;
895       }#load serialized chunk into chunk
896       elsif(! defined ($chunk) && defined ($t_chunk)){
897          $t_need_flag = 0;
898          $chunk = ${thaw($t_chunk)};
899          $t_chunk = undef;
900          next;
901       }#process chunk
902       elsif(defined($chunk)){
903          $chunk->run($rank);
904          sleep 1 while (defined ($t_chunk_result)); #pause incase result will be overwritten
905          $t_chunk_result = freeze(\$chunk);
906          $chunk = undef;
907          $t_need_flag = 1;
908          next;
909       }#take a break
910       else{
911          #keeps thread form hogging resources when there is nothing to do
912          sleep 1;
913       }
914    }
915 }
916 #----------------------------------------------------------------------------
917 #easy dump of string to a tempfile
918 sub totemp{
919    my $data = shift @_;
920
921    my ($fh, $name) = tempfile();
922    print $fh $data;
923    close ($fh);
924
925    return $name;
926 }
927 #----------------------------------------------------------------------------
928 #sends scalar variable contents via serialization
929 #scalar can hold ref to other data structures
930 sub MPI_SendII{
931    my $msg = shift @_;
932    my $target = shift @_;
933    my $tag = shift @_;
934    my $communicator = shift @_;
935
936    my $send = freeze($msg);
937    my $length = length($send);
938
939    MPI_Send(\$length, 1, MPI_INT, $target, $message_length, $communicator);
940    MPI_Send(\$send, $length, MPI_CHAR, $target, $tag, $communicator);
941
942 }
943 #----------------------------------------------------------------------------
944 #receives serialized scalar variable
945 #scalar can hold ref to other data structures
946 sub MPI_RecvII{
947    my $ref = shift @_;
948    my $source = shift @_;
949    my $tag = shift @_;
950    my $communicator = shift @_;
951
952    my $length;
953    my $recv;
954
955
956    MPI_Recv(\$length, 1, MPI_INT, $source, $message_length, $communicator);
957    MPI_Recv(\$recv, $length, MPI_CHAR, $source, $tag, $communicator); #receive line
958
959    ${$ref} = ${thaw($recv)};
960 }
Note: See TracBrowser for help on using the browser.