root/MPI/mpi_evaluator

Revision 244, 24.7 kB (checked in by cholt, 3 months ago)

bug in repeat mask cause by prok upgrade

  • Property svn:executable set to *
Line 
1 #!/usr/bin/perl -w
2
3 eval 'exec /usr/bin/perl -w -S $0 ${1+"$@"}'
4     if 0; # not running under some shell
5
6 use strict "vars";
7 use strict "refs";
8
9 use FindBin;
10 use lib "$FindBin::Bin/../lib";
11 use lib "$FindBin::Bin/../perl/lib";
12 use vars qw($RANK $LOG $CMD_ARGS);
13 use Storable qw(freeze thaw);
14 use threads;
15 use threads::shared;
16
17 BEGIN{
18    $main::eva = 1; #signals that this is evaluator
19
20    if (not ($ENV{CGL_SO_SOURCE})) {
21       $ENV{CGL_SO_SOURCE} = "$FindBin::Bin/../lib/CGL/so.obo";
22    }
23    if (not ($ENV{CGL_GO_SOURCE})) {
24       $ENV{CGL_GO_SOURCE} = "$FindBin::Bin/../lib/CGL/gene_ontology.obo"
25    }
26    
27    $CMD_ARGS = join(' ', @ARGV);
28    
29    #what to do on ^C
30    $SIG{'INT'} = sub {
31       print STDERR "\n\nAborted by user!!\n\n";
32       my @threads = threads->list;
33       foreach my $thr (@threads){
34          $thr->detach;
35       }
36       exit (1);
37    };   
38    
39    #supress warnings from storable module
40    $SIG{'__WARN__'} = sub {
41       warn $_[0] if ( $_[0] !~ /Not a CODE reference/ &&
42                       $_[0] !~ /Can\'t store item CODE/
43                     );
44    };
45
46    #output to log file of seq that caused rank to die
47    $SIG{'__DIE__'} =
48    sub {
49       if (defined ($LOG) && defined $_[0]) {
50          my $die_count = $LOG->get_die_count();
51          $die_count++;
52          
53          $LOG->add_entry("DIED","RANK",$RANK);
54          $LOG->add_entry("DIED","COUNT",$die_count);
55       }
56
57       my @threads = threads->list;
58       foreach my $thr (@threads){
59          $thr->detach;
60       }
61
62       die "#----------------------\n",
63           "FATAL: failed!!\n",
64           "#----------------------\n",
65           $_[0] . "\n";
66    };
67 }
68
69 use Cwd;
70 use Storable;
71 use FileHandle;
72 use File::Path;
73 use Getopt::Long qw(:config no_ignore_case);
74 use File::Temp qw(tempfile tempdir);
75 use Bio::DB::Fasta;
76 use GI;
77 use Dumper::GFF::GFFV3;
78 use Iterator::Any;
79 use Iterator::Fasta;
80 use Iterator::GFF3;
81 use Fasta;
82 use FastaChunker;
83 use maker::auto_annotator;
84 use cluster;
85 use repeat_mask_seq;
86 use runlog;
87 use ds_utility;
88 use GFFDB;
89 use Error qw(:try);
90 use Error::Simple;
91 use Process::MpiChunk;
92 use Process::MpiTiers;
93 use Parallel::MPIcar qw(:all);
94
95 unless($threads::VERSION >= 1.67){
96    die "Program requires threads version 1.67 or greater\n",
97        "You have version ". $threads::VERSION ."\n";
98 }
99
100 #--MPI_Init requires there to be arguments on @ARGV
101 #--This is a logic problem by the Package Authors
102 #--This is a hack to solve the problem
103 if (not @ARGV) {
104    push (@ARGV, 'null');
105    MPI_Init();                  #initiate the MPI
106    shift @ARGV;
107 }
108 else {
109    MPI_Init();                  #initiate the MPI
110 }
111
112 $| = 1;
113
114 my $usage = "
115 Usage:
116
117      mpi_evaluator [options] <eval_opts> <eval_bopts> <eval_exe>
118
119
120 Options:
121
122      -genome_gff <file>  Specify the maker gff file to evaluate.
123
124      -model_gff  <file>  Specify the external gff file to evaluate.
125
126      -genome     <file>  Specify the genome fasta file.  This if optional if the
127                          fasta entries are also found in the gff file.
128
129      -RM_off|R           Turns all repeat masking off.
130
131      -retry   <integer>  Rerun failed contigs up to the specified count.
132
133      -cpus|c  <integer>  Tells how many cpus to use for BLAST analysis.
134
135      -force|f            Forces program to delete old files before running again.
136                          This will require all blast analyses to be rerun.
137
138      -again|a            Caculate all output files again even if no settings have
139                          changed.
140
141      -quiet|q            Silences most of the status messages.
142
143      -CTL                Generate empty control files in the current directory.
144
145      -help|?             Prints this usage statement.
146
147
148 ";
149
150 #-------------------------------------------------------------------------------
151 #------------------------------------ MAIN -------------------------------------
152 #-------------------------------------------------------------------------------
153
154 #--set object variables for serialization of data
155 $Storable::forgive_me = 1; #allows serializaion of objects with code refs
156
157 #------INITIATE MPI VARIABLES------
158 my $rank = MPI_Comm_rank(MPI_COMM_WORLD); #my proccess number
159 my $size = MPI_Comm_size(MPI_COMM_WORLD); #how many proccesses
160 $RANK = $rank;
161
162 #MPI SIGNAL CODES
163 #--mpi message tags
164 my $who_I_am       = 1111;
165 my $what_I_want    = 2222;
166 my $result_status  = 3333;
167 my $request_status = 4444;
168 my $c_res_status   = 5555;
169 my $chunk_status   = 6666;
170 my $work_order     = 7777; #generic data tag
171 my $mpi_data       = 8888;
172 my $message_length = 9999;
173
174 #--what_I_want type signals
175 my $need_tier   = 1;
176 my $need_helper = 2;
177 my $have_c_res  = 3;
178 my $need_c_res  = 4;
179
180 #--request_status signals
181 my $wait_as_helper = 1;
182 my $yes_tier       = 2;
183 my $yes_helper     = 3;
184 my $no_helper      = 4;
185 my $go_chunk       = 5;
186 my $reset          = 6;
187 my $terminate      = 0;
188
189 #--results_status signals
190 my $yes_result = 1;
191 my $no_result  = 0;
192
193 #--c_res_status signal
194 my $yes_c_res      = 1;
195 my $no_c_res      = 0;
196
197 #--chunk_status signals
198 my $yes_chunk = 1;
199 my $no_chunk  = 0;
200
201 #---variables for thread and the root node
202 my @c_results;
203 my @failed;
204 my @res_loc;
205 my @helper_stack;
206 my @active;
207 my @chunks : shared;
208 my @returned_chunks :shared;
209 my $t_need_flag :shared;
210 my $t_tier :shared;
211 my $t_tier_result :shared;
212 my $t_chunk :shared;
213 my $t_chunk_result :shared;
214 my $t_terminate :shared;
215
216 #---global variables
217 my %OPT;
218 my $root = 0; #define root node (only changed for debugging)
219
220 #---Process options on the command line
221 try{
222     GetOptions("RM_off|R" => \$OPT{R},
223                "force|f" => \$OPT{force},
224                "genome_gff=s" => \$OPT{genome_gff},
225                "genome=s" => \$OPT{genome},
226                "model_gff=s" => \$OPT{model_gff},
227                "cpus|c=i" => \$OPT{cpus},
228                "retry=i" =>\$OPT{retry},
229                "again|a" =>\$OPT{again},
230                "quiet" =>\$main::quiet,
231                "CTL" => sub {GI::generate_control_files() if($rank == $root); MPI_Finalize(); exit(0);},
232                "help|?" => sub {print $usage if($rank == $root); MPI_Finalize(); exit(0)}
233                );
234 }
235 catch Error::Simple with{
236     my $E = shift;
237    
238     print STDERR $E->{-text};
239     die "\n\nFailed parsing command line options!!\n\n";
240 };
241
242 #--------------------------------------
243 #---------PRIMARY MPI PROCCESS---------
244 #--------------------------------------
245
246 #--check if root node
247 if ($rank == $root) {
248    #varibles that are persistent outside of try
249    my %CTL_OPT;
250    my $iterator;
251    my $DS_CTL;
252    my $GFF_DB;
253    my $build;
254
255    try{
256       #get arguments off the command line
257       my @ctlfiles = @ARGV;
258      
259       if (not @ctlfiles) {
260          if (-e "eval_opts.ctl" &&
261              -e "eval_bopts.ctl" &&
262              -e "eval_exe.ctl"
263             ) {
264            
265             @ctlfiles = ("eval_opts.ctl",
266                          "eval_bopts.ctl",
267                          "eval_exe.ctl"
268                         );
269          }
270          else {
271             print STDERR  "ERROR: Control files not found\n";
272             print $usage;
273             exit(0);
274          }
275       }
276      
277       #--Control file processing
278       
279       #set up control options from control files
280       %CTL_OPT = GI::load_control_files(\@ctlfiles, \%OPT, $size);
281      
282       #--open datastructure controller
283       $DS_CTL = ds_utility->new(\%CTL_OPT);
284      
285       #--set up gff database
286       $GFF_DB = new GFFDB(\%CTL_OPT);
287       $build = $GFF_DB->next_build;
288      
289       #---load genome multifasta/GFF3 file
290       $iterator = new Iterator::Any( -fasta => $CTL_OPT{'genome'},
291                                      -gff => $CTL_OPT{'genome_gff'},
292                                    );
293    }
294    catch Error::Simple with{
295       my $E = shift;
296       print STDERR $E->{-text};
297       print STDERR "\n\nFailed while examining startup data\n",
298                    "(control files and input fasta files)!!\n\n";
299       my $code = 2;
300       $code = $E->{-value} if (defined($E->{-value}));
301      
302       exit($code);
303    };
304
305    #build indexes of databases
306    #Shared_Functions::build_all_indexes($CTL_OPT{old_protein},
307    #                                   $CTL_OPT{old_est}
308    #                                  );
309
310    #====ACTUAL MPI COMMUNICATION
311   
312    #---main code for distribution of mpi data starts here
313
314    #thread for root node to other things than just manage mpi
315    my $thr = threads->create(\&node_thread);
316    my $go_mpi_status = 1;
317    $t_need_flag = 1;
318
319    while($go_mpi_status){
320       #====INTERNAL TIER THREAD
321       #check on results from internal thread
322       if (defined($t_tier_result)){
323          my $t_res = ${thaw($t_tier_result)};
324          $t_tier_result = undef;
325          $active[$root] = 0;
326
327          $DS_CTL->add_entry($t_res->{-DS});
328
329          if ($t_res->{-failed}){
330             push(@failed, $t_res->{-fasta});
331          }
332       }
333       if (defined($t_chunk_result)){
334          my $chunk =  ${thaw($t_chunk_result)};
335          $t_chunk_result = undef;
336          my $id = $chunk->id();
337          ($id) = split (":", $id);
338          push (@{$c_results[$id]}, $chunk);
339          unshift (@{$res_loc[$id]}, $root);
340       }
341
342       #see if there are chunks to get from the internal thread
343       while((@helper_stack > 0) && (@chunks > 0) && (my $chunk = shift @chunks)){
344          my $helper = shift @helper_stack;
345          $chunk = ${thaw($chunk)};
346          
347          #tell helper node I need help
348          MPI_Send(\$rank, 1,  MPI_INT, $helper, $who_I_am, MPI_COMM_WORLD);
349          
350          #tell helper node a chunk is coming
351          MPI_Send(\$go_chunk, 1, MPI_INT, $helper, $request_status, MPI_COMM_WORLD );
352          
353          #send the chunk
354          MPI_SendII(\$chunk, $helper, $mpi_data, MPI_COMM_WORLD);
355       }
356
357       #get tier for internal thread
358       if($t_need_flag > 0){
359          my $tier;
360          while (my $fasta = $iterator->nextFasta() || shift @failed){
361             $tier = Process::MpiTiers->new({fasta =>$fasta,
362                                             CTL_OPT => \%CTL_OPT,
363                                             DS_CTL  => $DS_CTL,
364                                             GFF_DB  => $GFF_DB,
365                                             build   => $build},
366                                            $root
367                                           );
368
369             last if(! $tier->terminated);
370          }
371          if(defined $tier && ! $tier->terminated){
372             $t_need_flag = 0;
373             my $t_val = freeze(\$tier);
374             $t_tier = $t_val;
375             $active[$root] = 1;
376          }
377          else{
378             $t_need_flag = 2; #take tier or chunk
379          }
380       }
381
382
383       #take a node out of limbo if there are failed contigs
384       if(@helper_stack && @failed){
385          my $helper = shift @helper_stack;
386
387          #tell helper node who I am
388          MPI_Send(\$rank, 1,  MPI_INT, $helper, $who_I_am, MPI_COMM_WORLD);
389          
390          #tell helper node that no chunk is coming (resets to ask for tier)
391          MPI_Send(\$reset, 1, MPI_INT, $helper, $request_status, MPI_COMM_WORLD );
392       }
393
394
395       #work with mpi nodes or skip mpi if all nodes are waiting in limbo
396       if (@helper_stack < $size - 1){
397          my $who;
398          my $what;
399          my $rs_type;
400          
401          #see who asks for a file
402          MPI_Recv(\$who, 1,  MPI_INT, -2, $who_I_am, MPI_COMM_WORLD);
403          
404          #see what the mpi node wants
405          MPI_Recv(\$what, 1, MPI_INT, $who, $what_I_want, MPI_COMM_WORLD);
406          
407          #if the node wants a tier to process, do this
408          if($what == $need_tier){
409             #receive result status
410             MPI_Recv(\$rs_type, 1,  MPI_INT, $who, $result_status, MPI_COMM_WORLD);
411            
412             #get result if available
413             if($rs_type == $yes_result){
414                my $result;
415                MPI_RecvII(\$result, $who, $mpi_data, MPI_COMM_WORLD);
416                $DS_CTL->add_entry($result->{-DS});
417
418                if ($result->{-failed}){
419                   push(@failed, $result->{-fasta});
420                }
421             }
422            
423             #if a contig is available send tier
424             my $tier;
425
426             while (my $fasta = $iterator->nextFasta() || shift @failed){
427                $tier = Process::MpiTiers->new({fasta => $fasta,
428                                                CTL_OPT => \%CTL_OPT,
429                                                DS_CTL  => $DS_CTL,
430                                                GFF_DB  => $GFF_DB,
431                                                build   => $build},
432                                               $who
433                                              );
434
435                last if(! $tier->terminated);
436             }
437             if(defined $tier && ! $tier->terminated){
438                #say tier is available and send it
439                MPI_Send(\$yes_tier, 1, MPI_INT, $who, $request_status, MPI_COMM_WORLD);
440                MPI_SendII(\$tier, $who, $mpi_data, MPI_COMM_WORLD );
441                $active[$who] = 1;
442             }
443             else{
444                MPI_Send(\$wait_as_helper, 1, MPI_INT, $who, $request_status, MPI_COMM_WORLD);
445                push(@helper_stack, $who);
446                $active[$who] = 0;
447             }
448          }
449          #if the node wants a helper or needs a chunk result, do this
450          elsif($what == $need_helper || $what == $need_c_res){
451             #--first send c_res_status
452             # send ids of nodes with chunk results
453             if(defined ($res_loc[$who])){
454                MPI_Send(\$yes_c_res, 1, MPI_INT, $who, $c_res_status, MPI_COMM_WORLD);
455                MPI_SendII(\$res_loc[$who], $who, $mpi_data, MPI_COMM_WORLD);
456                
457                my @locs = @{$res_loc[$who]};
458                $res_loc[$who] = undef;
459                
460                #if primary node has chunk result to send then send them
461                while (defined(my $loc = shift @locs)){
462                   if ($loc == $root){
463                      my $res = shift @{$c_results[$who]};
464                      MPI_SendII(\$res, $who, $mpi_data, MPI_COMM_WORLD);
465                   }
466                }
467             }
468             #no one has anything yet
469             else{
470                MPI_Send(\$no_c_res, 1, MPI_INT, $who, $c_res_status, MPI_COMM_WORLD);
471             }
472            
473             #continue the rest if the node needs a helper
474             if($what == $need_helper){
475                #find the number of helpers required
476                my $num_helpers_req;
477                MPI_Recv(\$num_helpers_req, 1, MPI_INT, $who, $work_order, MPI_COMM_WORLD);
478                
479                #number of secondary node helpers available
480                my $sec_node_avail = @helper_stack;
481                
482                #number of primary node threads available
483                my $thr_avail = ($t_need_flag == 2 && ! defined $t_chunk) ? 1 : 0;
484                
485                #signal that no helpers are available
486                if($sec_node_avail == 0 && $thr_avail == 0){
487                   MPI_Send(\$no_helper, 1, MPI_INT, $who, $request_status, MPI_COMM_WORLD);
488                }
489                else{#if node helpers are available
490                   #helpers to send
491                   my $helpers = [];
492                  
493                   #secondary node helpers
494                   if($sec_node_avail > 0){
495                      #seperate the helpers
496                      while(@{$helpers} < $num_helpers_req && @helper_stack > 0){
497                         my $helper = shift @helper_stack;
498                         push(@{$helpers}, $helper);
499                      }
500                      
501                      $num_helpers_req -= @{$helpers};
502                   }
503                  
504                   #primary node thread helper
505                   my $root_helper_flag = 0;
506                   if ($thr_avail && $num_helpers_req > 0){
507                      my $helper = $root;
508                      #aways make root node first
509                      unshift(@{$helpers}, $helper);
510                      $root_helper_flag = 1;
511                   }
512                  
513                   #say helper is available and send ids of the helpers
514                   MPI_Send(\$yes_helper, 1, MPI_INT, $who, $request_status, MPI_COMM_WORLD);
515                   MPI_SendII(\$helpers, $who, $mpi_data, MPI_COMM_WORLD);
516                
517                   #take chunk as a helper
518                   if($root_helper_flag){
519                      #see who's one who needs help
520                      my $who2;
521                      MPI_Recv(\$who2, 1,  MPI_INT, $who, $who_I_am, MPI_COMM_WORLD);
522                      
523                      #get go_chunk request_status
524                      my $req_stat;
525                      MPI_Recv(\$req_stat, 1, MPI_INT, $who2, $request_status, MPI_COMM_WORLD );
526                      if ($req_stat == $go_chunk){
527                         #get the chunk
528                         my $chnk;
529                         MPI_RecvII(\$chnk, $who2, $mpi_data, MPI_COMM_WORLD);
530                        
531                         $t_need_flag = 0;
532                         $t_chunk = freeze(\$chnk);
533                      }
534                      elsif($req_stat == $reset){
535                         #do nothing
536                      }
537                      else{
538                         die "ERROR: Logic error in getting chunk as a helper\n";
539                      }
540                   }
541                }
542             }
543          }
544          #if the node has a chunk result, do this
545          elsif($what == $have_c_res){
546             #get the owner of the result
547             my $owner;
548             MPI_Recv(\$owner, 1, MPI_INT, $who, $work_order, MPI_COMM_WORLD);
549            
550             if($owner == $root){#if root is owner get result
551                my $chunk_res;
552                MPI_RecvII(\$chunk_res, $who, $mpi_data, MPI_COMM_WORLD);
553                push(@returned_chunks, freeze(\$chunk_res));
554             }
555             else{#take note of owner to tell him he has a result waiting
556                push(@{$res_loc[$owner]}, $who);
557             }
558          }
559          #if what the node wants is something else
560          else{
561             die "ERROR: Invalid request type\n";
562          }
563       }
564       else{ #take a break if mpi was skipped
565          #this keeps the root node from hogging resources if only the thread is active
566          sleep 2;
567       }
568
569       #see if all contigs are finished
570       $go_mpi_status = 0;
571       foreach my $n (@active ){
572          if(@helper_stack < $size - 1){
573             $go_mpi_status = 1;
574             last;
575          }
576          if((defined($n) && $n == 1)){
577             $go_mpi_status = 1;
578             last;
579          }
580       }
581       if(! $iterator->finished || @failed > 0){
582           $go_mpi_status = 1;
583       }
584    }
585
586    #---tell mpi nodes to terminate
587    for(my $i = 1; $i < $size; $i++){
588       #tell chunks waiting for helper who I am
589       MPI_Send(\$rank, 1,  MPI_INT, $i, $who_I_am, MPI_COMM_WORLD);
590
591       #send termination signal
592       MPI_Send(\$terminate, 1, MPI_INT, $i, $request_status, MPI_COMM_WORLD);
593    }
594    
595    #---release thread
596    $t_terminate = 1; #signals to thread to clean up
597    $thr->detach() unless($thr->is_detached);
598
599    print STDERR "\n\nProgram is now finished!!!\n\n";
600 }
601 #------SECONDARY MPI PROCESSES------
602 else {
603    my $go_mpi_status = 1;
604    my $tier_result;
605    my $tier;
606    my $chunk_result;
607
608    while ($go_mpi_status) {
609       #tell the  primary process what node it is speaking to
610       MPI_Send(\$rank, 1, MPI_INT, $root, $who_I_am, MPI_COMM_WORLD );
611
612       #decide what this node needs
613       my $what;
614       my $chunk;
615
616       if(defined $chunk_result){
617          $what = $have_c_res;
618       }
619       elsif(!defined($tier) || $tier->terminated){
620          $what = $need_tier;
621          if(defined($tier)){
622             #collect errors and failures if any
623             $tier_result->{-error} = $tier->error;
624             $tier_result->{-failed} = $tier->failed;
625             $tier_result->{-DS} = $tier->DS;
626             $tier_result->{-fasta} = $tier->fasta if($tier->failed);
627             $tier = undef;
628          }
629       }
630       elsif(($chunk = $tier->next_chunk) && ($tier->num_chunks > 0)){
631          $what = $need_helper;
632       }
633       else{
634          $what = $need_c_res;
635       }
636
637       #--tell primary node what this node needs
638       MPI_Send(\$what, 1, MPI_INT, $root, $what_I_want, MPI_COMM_WORLD );
639
640       #if what I want is a tier do this
641       if($what == $need_tier){
642          #Send result status
643          my $rs_type = (defined($tier_result)) ? $yes_result: $no_result;
644          MPI_Send(\$rs_type, 1, MPI_INT, $root, $result_status, MPI_COMM_WORLD );
645
646          #Send result if available
647          if($rs_type == $yes_result){
648             MPI_SendII(\$tier_result, $root, $mpi_data, MPI_COMM_WORLD);
649             $tier_result = undef;
650          }
651
652          #get request_status for the tier
653          my $req_status;
654          MPI_Recv(\$req_status, 1, MPI_INT, $root, $request_status, MPI_COMM_WORLD );
655
656          #get tier and run if it if there is one
657          if($req_status == $yes_tier){
658             MPI_RecvII(\$tier, $root, $mpi_data, MPI_COMM_WORLD );
659             $tier->run;
660          }#wait as helper if asked to
661          elsif($req_status == $wait_as_helper){
662             #see who needs help
663             my $who;
664             MPI_Recv(\$who, 1,  MPI_INT, -2, $who_I_am, MPI_COMM_WORLD);
665
666             #get request_status for chunk
667             my $chunk_status;
668             MPI_Recv(\$chunk_status, 1, MPI_INT, $who, $request_status, MPI_COMM_WORLD );
669
670             #if there is a chunk do this
671             if($chunk_status == $go_chunk){
672                #get chunk to process
673                my $chnk;
674                MPI_RecvII(\$chnk, $who, $mpi_data, MPI_COMM_WORLD);
675
676                #run chunk
677                $chnk->run($rank);
678                $chunk_result = $chnk;
679             }
680             #if the reset signal is received do this
681             elsif($chunk_status == $reset){
682                #do nothing
683             }
684             #if the terminate signal is received do this
685             elsif($chunk_status == $terminate){
686                   $go_mpi_status = 0;
687                   last;
688             }
689             else{
690                die "ERROR: Invalid chunk status signal\n;";
691             }
692          }
693          else{
694             die "ERROR: Invalid request status type\n";
695          }
696       }#if what I want is help or the result from a helper do this
697       elsif ($what == $need_helper || $what == $need_c_res){
698          # check c_result_status
699          my $c_res_stat;
700          MPI_Recv(\$c_res_stat, 1, MPI_INT, $root, $c_res_status, MPI_COMM_WORLD);
701
702          #if there are chunk results, do this
703          my $locs;
704          if($c_res_stat == $yes_c_res){
705             #get ids of nodes with chunk result
706             MPI_RecvII(\$locs, $root, $mpi_data, MPI_COMM_WORLD);
707
708             #get chunk results from only the root node for now
709             foreach my $loc (@{$locs}){
710                next if ($loc != $root);
711                my $c_res;
712                MPI_RecvII(\$c_res, $loc, $mpi_data, MPI_COMM_WORLD);
713                $tier->update_chunk($c_res);
714             }
715          }
716
717          #continue the rest if the node needs a helper
718          if ($what == $need_helper){
719             #send the number of helpers required
720             my $num_helpers_req = $tier->num_chunks;
721             MPI_Send(\$num_helpers_req, 1, MPI_INT, $root, $work_order, MPI_COMM_WORLD);
722            
723             #see if helper is available
724             my $help_stat;
725             MPI_Recv(\$help_stat, 1, MPI_INT, $root, $request_status, MPI_COMM_WORLD);
726            
727             if($help_stat == $yes_helper){
728                my $helpers;
729                MPI_RecvII(\$helpers, $root, $mpi_data, MPI_COMM_WORLD);
730                
731                #send chunk to helper
732                foreach my $helper (@{$helpers}){
733                   #say I'm the one who needs help
734                   MPI_Send(\$rank, 1,  MPI_INT, $helper, $who_I_am, MPI_COMM_WORLD);
735                  
736                   #send go_chunk request_status
737                   MPI_Send(\$go_chunk, 1, MPI_INT, $helper, $request_status, MPI_COMM_WORLD);
738                  
739                   #send chunk
740                   my $chnk = $tier->next_chunk;
741                   MPI_SendII(\$chnk, $helper, $mpi_data, MPI_COMM_WORLD);
742                }
743             }
744          }
745
746          #get chunk results from non root nodes since root comm has terminated
747          foreach my $loc (@{$locs}){
748             next if ($loc == $root);
749             my $c_res;
750             MPI_RecvII(\$c_res, $loc, $mpi_data, MPI_COMM_WORLD);
751             $tier->update_chunk($c_res);
752          }
753
754          #run the chunk if there is one
755          if(defined($chunk)){
756             $chunk->run($rank);
757             $tier->update_chunk($chunk);
758             $tier->run();
759             $chunk = undef;
760          }
761       }
762       #if just finished a helper chunk, inform that it is finished
763       elsif($what == $have_c_res){
764          #send the owner id of the result
765          my $owner = $chunk_result->id();
766          ($owner) = split(":", $owner);
767          MPI_Send(\$owner, 1, MPI_INT, $root, $work_order, MPI_COMM_WORLD);
768
769          #send the result
770          MPI_SendII(\$chunk_result, $owner, $mpi_data, MPI_COMM_WORLD);
771          $chunk_result = undef;
772       }
773    }
774 }
775
776 #---------ALL NODES----------
777 MPI_Finalize();                 #terminate MPI
778
779 exit(0);
780
781 #-----------------------------------------------------------------------------
782 #----------------------------------- SUBS ------------------------------------
783 #-----------------------------------------------------------------------------
784 #other things for root node to do
785 #(thread allows root to process tiers like a secondary node)
786 sub node_thread {
787    my $tier;
788    my $chunk;
789    $t_need_flag = 1;
790
791    while(not $t_terminate){
792       #load serialized tier into tier
793       if(! defined ($tier) && defined ($t_tier)){
794          $t_need_flag = 0;
795          $tier = ${thaw($t_tier)};
796          $t_tier = undef;
797          next;
798       }#process tier
799       elsif(defined($tier)){
800          #get chunk results from other nodes
801          while(my $res = shift @returned_chunks){
802             $res = ${thaw($res)};
803             $tier->update_chunk($res);
804          }
805
806          #run the tier as far as possible
807          $tier->run;
808
809          #get all chunks available
810          my $chnk = $tier->next_chunk;
811          while(my $o_chnk = $tier->next_chunk){
812             $o_chnk = freeze(\$o_chnk);
813             push (@chunks, $o_chnk);
814          }
815
816          #run chunks one at a time
817          $chnk->run($rank) if ($chnk);
818          $tier->update_chunk($chnk) if ($chnk);
819          while($chnk = shift @chunks){
820             $chnk = ${thaw($chnk)};
821             if($tier->failed){ #skip chunks after failure
822                 $tier->update_chunk($chnk);
823                 next;
824             }
825
826             $chnk->run($rank);
827             $tier->update_chunk($chnk);
828          }
829
830          #let tier advance if possible
831          $tier->run();
832
833          #terminate tier, wait, or continue
834          if($tier->terminated){
835             my $tier_result;
836             $tier_result->{-error} = $tier->error;
837             $tier_result->{-failed} = $tier->failed;
838             $tier_result->{-DS} = $tier->DS;
839             $tier_result->{-fasta} = $tier->fasta if ($tier->failed);
840
841             sleep 1 while (defined ($t_tier_result)); #pause incase result will be overwritten
842             $t_tier_result = freeze(\$tier_result);
843             $tier = undef;
844             $t_need_flag = 1;
845          }#take a break
846          elsif($tier->num_chunks == 0){
847             #keeps thread from hogging resources while waiting for external results
848             sleep 1;
849          }
850
851          next;
852       }#load serialized chunk into chunk
853       elsif(! defined ($chunk) && defined ($t_chunk)){
854          $t_need_flag = 0;
855          $chunk = ${thaw($t_chunk)};
856          $t_chunk = undef;
857          next;
858       }#process chunk
859       elsif(defined($chunk)){
860          $chunk->run($rank);
861          sleep 1 while (defined ($t_chunk_result)); #pause incase result will be overwritten
862          $t_chunk_result = freeze(\$chunk);
863          $chunk = undef;
864          $t_need_flag = 1;
865          next;
866       }#take a break
867       else{
868          #keeps thread form hogging resources when there is nothing to do
869          sleep 1;
870       }
871    }
872 }
873 #----------------------------------------------------------------------------
874 #easy dump of string to a tempfile
875 sub totemp{
876    my $data = shift @_;
877
878    my ($fh, $name) = tempfile();
879    print $fh $data;
880    close ($fh);
881
882    return $name;
883 }
884 #----------------------------------------------------------------------------
885 #sends scalar variable contents via serialization
886 #scalar can hold ref to other data structures
887 sub MPI_SendII{
888    my $msg = shift @_;
889    my $target = shift @_;
890    my $tag = shift @_;
891    my $communicator = shift @_;
892
893    my $send = freeze($msg);
894    my $length = length($send);
895
896    MPI_Send(\$length, 1, MPI_INT, $target, $message_length, $communicator);
897    MPI_Send(\$send, $length, MPI_CHAR, $target, $tag, $communicator);
898
899 }
900 #----------------------------------------------------------------------------
901 #receives serialized scalar variable
902 #scalar can hold ref to other data structures
903 sub MPI_RecvII{
904    my $ref = shift @_;
905    my $source = shift @_;
906    my $tag = shift @_;
907    my $communicator = shift @_;
908
909    my $length;
910    my $recv;
911
912
913    MPI_Recv(\$length, 1, MPI_INT, $source, $message_length, $communicator);
914    MPI_Recv(\$recv, $length, MPI_CHAR, $source, $tag, $communicator); #receive line
915
916    ${$ref} = ${thaw($recv)};
917 }
Note: See TracBrowser for help on using the browser.