Module: Workflow
- Includes:
- InputModule
- Included in:
- RemoteWorkflow
- Defined in:
- lib/rbbt/workflow/doc.rb,
lib/rbbt/workflow/accessor.rb,
lib/rbbt/workflow/examples.rb,
lib/rbbt/workflow/refactor.rb,
lib/rbbt/workflow/refactor.rb,
lib/rbbt/workflow/refactor.rb,
lib/rbbt/workflow/util/data.rb,
lib/rbbt/workflow/definition.rb,
lib/rbbt/workflow/util/trace.rb,
lib/rbbt/workflow/dependencies.rb,
lib/rbbt/workflow/refactor/inputs.rb,
lib/rbbt/workflow/integration/cromwell.rb,
lib/rbbt/workflow/integration/nextflow.rb
Defined Under Namespace
Modules: Data, DependencyBlock
Constant Summary collapse
- DEFAULT_NAME =
Task::DEFAULT_NAME
- FORGET_DEP_TASKS =
ENV["RBBT_FORGET_DEP_TASKS"] == "true"
- REMOVE_DEP_TASKS =
ENV["RBBT_REMOVE_DEP_TASKS"] == "true"
- OUTPUT_FIELDS =
%w(outdir output)
Instance Attribute Summary collapse
-
#documentation ⇒ Object
Returns the value of attribute documentation.
-
#example_dir ⇒ Object
Returns the value of attribute example_dir.
-
#remote_tasks ⇒ Object
Returns the value of attribute remote_tasks.
Class Method Summary collapse
- .doc_parse_chunks(str, pattern) ⇒ Object
- .doc_parse_first_line(str) ⇒ Object
- .doc_parse_up_to(str, pattern, keep = false) ⇒ Object
- .job_path?(path) ⇒ Boolean
- .load_inputs(dir, names, types) ⇒ Object
- .nextflow_file_params(file) ⇒ Object
- .nextflow_includes(file) ⇒ Object
- .nextflow_recursive_params(file) ⇒ Object
- .original_require_workflow ⇒ Object
- .parse_nextflow_schema(file) ⇒ Object
- .parse_workflow_doc(doc) ⇒ Object
- .plot_trace_job_times(data, plot, width = 800, height = 800) ⇒ Object
- .require_remote_workflow(wf_name, url) ⇒ Object
- .require_workflow(wf_name, force_local = true) ⇒ Object
- .trace(seed_jobs, options = {}) ⇒ Object
- .trace_job_summary(jobs, report_keys = []) ⇒ Object
- .trace_job_times(jobs, fix_gap = false, report_keys = nil) ⇒ Object
- .workflow_dir ⇒ Object
- .workflow_for(path) ⇒ Object
- .workflow_repo ⇒ Object
Instance Method Summary collapse
- #assign_dep_inputs(_inputs, options, all_d, task_info) ⇒ Object
- #dep(*dependency, &block) ⇒ Object
- #dep_task(name, workflow, oname, *rest, &block) ⇒ Object (also: #task_alias)
- #desc(description) ⇒ Object
- #documentation_markdown ⇒ Object
- #example(task_name, example) ⇒ Object
- #example_inputs(task_name, example) ⇒ Object
- #example_step(task_name, example = "Example", new_inputs = {}) ⇒ Object
- #examples ⇒ Object
- #export_asynchronous(*names) ⇒ Object (also: #export)
- #export_exec(*names) ⇒ Object
- #export_stream(*names) ⇒ Object
- #export_synchronous(*names) ⇒ Object
- #extension(extension) ⇒ Object
- #fast_load_id(id) ⇒ Object (also: #load_id)
- #helper(name, *args, &block) ⇒ Object
- #id_for(path) ⇒ Object
- #import(source, *args) ⇒ Object
- #load_cromwell(file) ⇒ Object
- #load_documentation ⇒ Object
- #log(status, message = nil, &block) ⇒ Object
- #nextflow(path, *args) ⇒ Object
- #nextflow_dir(path, output = nil) ⇒ Object
- #nextflow_file(file, name = nil, output = nil) ⇒ Object
- #nextflow_project(project, *args) ⇒ Object
- #override_dependencies(inputs) ⇒ Object
- #real_dependencies(task, orig_jobname, inputs, dependencies) ⇒ Object
- #rec_dependencies(taskname, seen = []) ⇒ Object
- #rec_input_defaults(taskname) ⇒ Object
- #rec_input_descriptions(taskname) ⇒ Object
- #rec_input_options(taskname) ⇒ Object
- #rec_input_types(taskname) ⇒ Object
- #rec_input_use(taskname) ⇒ Object
- #rec_inputs(taskname) ⇒ Object
- #resumable ⇒ Object
- #returns(description) ⇒ Object
- #setup_override_dependency(dep, workflow, task_name) ⇒ Object
- #task(name, &block) ⇒ Object
- #task_exports ⇒ Object
- #task_for(path) ⇒ Object
- #task_from_dep(dep) ⇒ Object
- #task_info(name) ⇒ Object
- #unexport(*names) ⇒ Object
- #unlocated_override?(dep) ⇒ Boolean
Methods included from InputModule
Instance Attribute Details
#documentation ⇒ Object
Returns the value of attribute documentation.
71 72 73 |
# File 'lib/rbbt/workflow/doc.rb', line 71 def documentation @documentation end |
#example_dir ⇒ Object
Returns the value of attribute example_dir.
2 3 4 |
# File 'lib/rbbt/workflow/examples.rb', line 2 def example_dir @example_dir end |
#remote_tasks ⇒ Object
Returns the value of attribute remote_tasks.
66 67 68 |
# File 'lib/rbbt/workflow/refactor.rb', line 66 def remote_tasks @remote_tasks end |
Class Method Details
.doc_parse_chunks(str, pattern) ⇒ Object
21 22 23 24 25 26 27 |
# File 'lib/rbbt/workflow/doc.rb', line 21 def self.doc_parse_chunks(str, pattern) parts = str.split(pattern) return {} if parts.length < 2 tasks = Hash[*parts[1..-1].collect{|v| v.strip}] tasks.delete_if{|t,d| d.empty?} tasks end |
.doc_parse_first_line(str) ⇒ Object
3 4 5 6 7 8 9 10 |
# File 'lib/rbbt/workflow/doc.rb', line 3 def self.doc_parse_first_line(str) if str.match(/^([^\n]*)\n\n(.*)/sm) str.replace $2 $1 else "" end end |
.doc_parse_up_to(str, pattern, keep = false) ⇒ Object
12 13 14 15 16 17 18 19 |
# File 'lib/rbbt/workflow/doc.rb', line 12 def self.doc_parse_up_to(str, pattern, keep = false) pre, _pat, _post = str.partition pattern if _pat [pre, (keep ? _pat << _post : _post)] else _post end end |
.job_path?(path) ⇒ Boolean
19 20 21 |
# File 'lib/rbbt/workflow/accessor.rb', line 19 def self.job_path?(path) path.split("/")[-4] == "jobs" end |
.load_inputs(dir, names, types) ⇒ Object
15 16 17 18 19 20 21 22 23 |
# File 'lib/rbbt/workflow/refactor/inputs.rb', line 15 def self.load_inputs(dir, names, types) inputs = IndiferentHash.setup({}) names.zip(types) do |name, type| filename = File.join(directory, name.to_s) value = Task.load_input_from_file(filename, name, type) inputs[name] = value unless value.nil? end inputs end |
.nextflow_file_params(file) ⇒ Object
45 46 47 |
# File 'lib/rbbt/workflow/integration/nextflow.rb', line 45 def self.nextflow_file_params(file) Open.read(file).scan(/params\.\w+/).collect{|p| p.split(".").last}.uniq end |
.nextflow_includes(file) ⇒ Object
49 50 51 |
# File 'lib/rbbt/workflow/integration/nextflow.rb', line 49 def self.nextflow_includes(file) Open.read(file).scan(/^include\s*{\s*([^\s]*?)\s+.*?}\s*from\s+["'](.*?)["'](?:\s*params.*)?/).collect{|p| p}.uniq end |
.nextflow_recursive_params(file) ⇒ Object
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/rbbt/workflow/integration/nextflow.rb', line 53 def self.nextflow_recursive_params(file) params = nextflow_file_params(file) dir = File.dirname(file) nextflow_includes(file).inject(params) do |params,info| name_str, included_file = info included_file = File.join(dir, included_file) included_file += '.nf' unless File.exist?(included_file) || ! File.exist?(included_file + '.nf') name_str.split(";").each do |name| name = name.strip begin include_params = nextflow_recursive_params(included_file).collect{|p| [p,name] * "-"} params += include_params rescue end end params end end |
.original_require_workflow ⇒ Object
93 |
# File 'lib/rbbt/workflow/refactor.rb', line 93 alias original_require_workflow require_workflow |
.parse_nextflow_schema(file) ⇒ Object
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/rbbt/workflow/integration/nextflow.rb', line 5 def self.parse_nextflow_schema(file) doc = Open.open(file){|f| JSON.parse(f.read) } description = doc["description"] properties = {} required = [] properties[nil] = doc["properties"] if doc["properties"] required.concat doc["required"] if doc["required"] doc["definitions"].each do |section,section_info| next unless section_info["properties"] name = section_info["title"] || section properties[name] = section_info["properties"] required.concat section_info["required"] if section_info["required"] if section_info["required"] end if doc["definitions"] required = required.compact.flatten parameters = {} properties.each do |section,param_info| param_info.each do |name,info| = {} type = info["type"] format = info["format"] input_desc = info["description"] input_section = info["description"] input_required = required.include?(name) [:required] = true if input_required && ! OUTPUT_FIELDS.include?(name) if info.include?("enum") type = 'select' [:select_options] = info["enum"] end parameters[name] = {type: type, format: format, description: input_desc, options: , section: section} end end [description, parameters] end |
.parse_workflow_doc(doc) ⇒ Object
29 30 31 32 33 34 35 |
# File 'lib/rbbt/workflow/doc.rb', line 29 def self.parse_workflow_doc(doc) title = doc_parse_first_line doc description, task_info = doc_parse_up_to doc, /^# Tasks/i task_description, tasks = doc_parse_up_to task_info, /^##/, true tasks = doc_parse_chunks tasks, /^## (.*)/ {:title => title.strip, :description => description.strip, :task_description => task_description.strip, :tasks => tasks} end |
.plot_trace_job_times(data, plot, width = 800, height = 800) ⇒ Object
115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 |
# File 'lib/rbbt/workflow/util/trace.rb', line 115 def self.plot_trace_job_times(data, plot, width=800, height=800) data.R <<-EOF, [:svg] rbbt.require('dplyr') rbbt.require('tidyr') rbbt.require('ggplot2') names(data) <- make.names(names(data)) data$id = data$Code data$content = data$Task data$start = data$Start data$end = data$End data$Project = data$Workflow tasks = data #theme_gantt <- function(base_size=11, base_family="Source Sans Pro Light") { theme_gantt <- function(base_size=11, base_family="Sans Serif") { ret <- theme_bw(base_size, base_family) %+replace% theme(panel.background = element_rect(fill="#ffffff", colour=NA), axis.title.x=element_text(vjust=-0.2), axis.title.y=element_text(vjust=1.5), title=element_text(vjust=1.2, family="Source Sans Pro Semibold"), panel.border = element_blank(), axis.line=element_blank(), panel.grid.minor=element_blank(), panel.grid.major.y = element_blank(), panel.grid.major.x = element_line(size=0.5, colour="grey80"), axis.ticks=element_blank(), legend.position="bottom", axis.title=element_text(size=rel(1.2), family="Source Sans Pro Semibold"), strip.text=element_text(size=rel(1.5), family="Source Sans Pro Semibold"), strip.background=element_rect(fill="#ffffff", colour=NA), panel.spacing.y=unit(1.5, "lines"), legend.key = element_blank()) ret } tasks.long <- tasks %>% gather(date.type, task.date, -c(Code,Project, Task, id, Start.second, End.second)) %>% arrange(date.type, task.date) %>% mutate(id = factor(id, levels=rev(unique(id)), ordered=TRUE)) x.breaks <- seq(length(tasks$Task) + 0.5 - 3, 0, by=-3) timeline <- ggplot(tasks.long, aes(y=id, yend=id, x=Start.second, xend=End.second, colour=Task)) + geom_segment() + geom_vline(xintercept=x.breaks, colour="grey80", linetype="dotted") + guides(colour=guide_legend(title=NULL)) + labs(x=NULL, y=NULL) + theme_gantt() + theme(axis.text.x=element_text(angle=45, hjust=1)) rbbt.png_plot('#{plot}', 'plot(timeline)', width=#{width}, height=#{height}, pointsize=6) EOF end |
.require_remote_workflow(wf_name, url) ⇒ Object
95 96 97 98 |
# File 'lib/rbbt/workflow/refactor.rb', line 95 def require_remote_workflow(wf_name, url) require 'rbbt/workflow/remote_workflow' eval "Object::#{wf_name.split("+").first} = RemoteWorkflow.new '#{ url }', '#{wf_name}'" end |
.require_workflow(wf_name, force_local = true) ⇒ Object
100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 |
# File 'lib/rbbt/workflow/refactor.rb', line 100 def require_workflow(wf_name, force_local = true) if Open.remote?(wf_name) or Open.ssh?(wf_name) url = wf_name if Open.ssh?(wf_name) wf_name = File.basename(url.split(":").last) else wf_name = File.basename(url) end begin return require_remote_workflow(wf_name, url) ensure Log.debug{"Workflow #{ wf_name } loaded remotely: #{ url }"} end end original_require_workflow(wf_name) end |
.trace(seed_jobs, options = {}) ⇒ Object
220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 |
# File 'lib/rbbt/workflow/util/trace.rb', line 220 def self.trace(seed_jobs, = {}) jobs = [] seed_jobs.each do |step| jobs += step.rec_dependencies + [step] step.info[:archived_info].each do |path,ainfo| next unless Hash === ainfo archived_step = Step.new path archived_step.define_singleton_method :info do ainfo end #class << archived_step # self #end.define_method :info do # ainfo #end jobs << archived_step end if step.info[:archived_info] end jobs = jobs.uniq.sort_by{|job| [job, job.info]; t = job.info[:started] || Open.mtime(job.path) || Time.now; Time === t ? t : Time.parse(t) } report_keys = [:report_keys] || "" report_keys = report_keys.split(/,\s*/) if String === report_keys data = trace_job_times(jobs, [:fix_gap], report_keys) summary = trace_job_summary(jobs, report_keys) raise "No jobs to process" if data.size == 0 plot, size, width, height = .values_at :plot, :size, :width, :height size = 800 if size.nil? width = size.to_i * 2 if width.nil? height = size if height.nil? plot_trace_job_times(data, plot, width, height) if plot if [:plot_data] data else summary end end |
.trace_job_summary(jobs, report_keys = []) ⇒ Object
169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 |
# File 'lib/rbbt/workflow/util/trace.rb', line 169 def self.trace_job_summary(jobs, report_keys = []) tasks_info = {} report_keys = report_keys.collect{|k| k.to_s} jobs.each do |dep| next unless dep.info[:end] task = [dep.workflow, dep.task_name].compact.collect{|s| s.to_s} * "#" info = tasks_info[task] ||= IndiferentHash.setup({}) dep_info = IndiferentHash.setup(dep.info) ddone = dep_info[:end] started = dep_info[:start] started = Time.parse started if String === started ddone = Time.parse ddone if String === ddone time = ddone - started info[:time] ||= [] info[:time] << time report_keys.each do |key| info[key] = dep_info[key] end dep.info[:config_keys].each do |kinfo| key, value, tokens = kinfo info[key.to_s] = value if report_keys.include? key.to_s end if dep.info[:config_keys] end summary = TSV.setup({}, "Task~Calls,Avg. Time,Total Time#:type=:list") tasks_info.each do |task, info| time_lists = info[:time] avg_time = Misc.mean(time_lists).to_i total_time = Misc.sum(time_lists).to_i calls = time_lists.length summary[task] = [calls, avg_time, total_time] end report_keys.each do |key| summary.add_field Misc.humanize(key) do |task| tasks_info[task][key] end end if Array === report_keys && report_keys.any? summary end |
.trace_job_times(jobs, fix_gap = false, report_keys = nil) ⇒ Object
6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 |
# File 'lib/rbbt/workflow/util/trace.rb', line 6 def self.trace_job_times(jobs, fix_gap = false, report_keys = nil) data = TSV.setup({}, "Job~Code,Workflow,Task,Start,End#:type=:list") min_start = nil max_done = nil jobs.each do |job| next unless job.info[:end] started = job.info[:start] ddone = job.info[:end] started = Time.parse started if String === started ddone = Time.parse ddone if String === ddone code = [job.workflow, job.task_name].compact.collect{|s| s.to_s} * " · " code = job.name + " - " + code data[job.path] = [code,job.workflow.to_s, job.task_name, started, ddone] if min_start.nil? min_start = started else min_start = started if started < min_start end if max_done.nil? max_done = ddone else max_done = ddone if ddone > max_done end end data.add_field "Start.second" do |k,value| value["Start"] - min_start end data.add_field "End.second" do |k,value| value["End"] - min_start end if fix_gap ranges = [] data.through do |k,values| start, eend = values.values_at "Start.second", "End.second" ranges << (start..eend) end gaps = {} last = nil Misc.collapse_ranges(ranges).each do |range| start = range.begin eend = range.end if last gaps[last] = start - last end last = eend end data.process "End.second" do |value,k,values| gap = Misc.sum(gaps.select{|pos,size| pos < values["Start.second"]}.collect{|pos,size| size}) value - gap end data.process "Start.second" do |value,k,values| gap = Misc.sum(gaps.select{|pos,size| pos < values["Start.second"]}.collect{|pos,size| size}) value - gap end total_gaps = Misc.sum(gaps.collect{|k,v| v}) Log.info "Total gaps: #{total_gaps} seconds" end if report_keys && report_keys.any? job_keys = {} jobs.each do |job| job_info = IndiferentHash.setup(job.info) report_keys.each do |key| job_keys[job.path] ||= {} job_keys[job.path][key] = job_info[key] end end report_keys.each do |key| data.add_field Misc.humanize(key) do |p,values| job_keys[p][key] end end end start = data.column("Start.second").values.flatten.collect{|v| v.to_f}.min eend = data.column("End.second").values.flatten.collect{|v| v.to_f}.max total = eend - start unless eend.nil? || start.nil? Log.info "Total time elapsed: #{total} seconds" if total if report_keys && report_keys.any? job_keys = {} report_keys.each do |key| jobs.each do |job| job_keys[job.path] ||= {} job_keys[job.path][key] = job.info[key] end end report_keys.each do |key| data.add_field Misc.humanize(key) do |p,values| job_keys[p][key] end end end data end |
.workflow_dir ⇒ Object
124 125 126 127 128 129 130 131 132 133 134 135 |
# File 'lib/rbbt/workflow/refactor.rb', line 124 def workflow_dir @workflow_dir || ENV["RBBT_WORKFLOW_DIR"] || begin workflow_dir_config = Path.setup("etc/workflow_dir") if workflow_dir_config.exists? Path.setup(workflow_dir_config.read.strip) else Path.setup('workflows').find(:user) end end end |
.workflow_for(path) ⇒ Object
204 205 206 207 208 209 210 |
# File 'lib/rbbt/workflow/accessor.rb', line 204 def self.workflow_for(path) begin Kernel.const_get File.dirname(File.dirname(path)) rescue nil end end |
.workflow_repo ⇒ Object
137 138 139 140 141 142 143 144 145 146 147 148 |
# File 'lib/rbbt/workflow/refactor.rb', line 137 def workflow_repo @workflow_repo || ENV["RBBT_WORKFLOW_REPO"] || begin workflow_repo_config = Path.setup("etc/workflow_repo") if workflow_repo_config.exists? workflow_repo_config.read.strip else 'https://github.com/Rbbt-Workflows/' end end end |
Instance Method Details
#assign_dep_inputs(_inputs, options, all_d, task_info) ⇒ Object
106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 |
# File 'lib/rbbt/workflow/dependencies.rb', line 106 def assign_dep_inputs(_inputs, , all_d, task_info) IndiferentHash.setup(_inputs) .each{|i,v| next if i == :compute or i == "compute" case v when :compute compute = v when Symbol rec_dependency = all_d.flatten.select{|d| d.task_name.to_sym == v }.first if rec_dependency.nil? if _inputs.include?(v) #_inputs[i] = _inputs.delete(v) _inputs[i] = _inputs[v] unless _inputs.include? i #_inputs.delete(v) else _inputs[i] = v unless _inputs.include? i end else = task_info[:input_options][i] || {} #ToDo why was this always true? if [:stream] or true #rec_dependency.run(true).grace unless rec_dependency.done? or rec_dependency.running? _inputs[i] = rec_dependency else rec_dependency.abort if rec_dependency.streaming? and not rec_dependency.running? rec_dependency.clean if rec_dependency.error? or rec_dependency.aborted? if rec_dependency.streaming? and rec_dependency.running? _inputs[i] = rec_dependency.join.load else rec_dependency.run(true) rec_dependency.join _inputs[i] = rec_dependency.load end end end else _inputs[i] = v end } if _inputs end |
#dep(*dependency, &block) ⇒ Object
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
# File 'lib/rbbt/workflow/definition.rb', line 50 def dep(*dependency, &block) @dependencies ||= [] dependency = [tasks.keys.last] if dependency.empty? && ! block_given? if block_given? if dependency.any? wf, task_name, = dependency , task_name = task_name, nil if Hash === task_name , wf = wf, nil if Hash === wf task_name, wf = wf, self if task_name.nil? DependencyBlock.setup block, [wf, task_name, ] end @dependencies << block else if Module === dependency.first or (defined? RemoteWorkflow and RemoteWorkflow === dependency.first) or Hash === dependency.last dependency = ([self] + dependency) unless Module === dependency.first || (defined?(RemoteWorkflow) && RemoteWorkflow === dependency.first) @dependencies << dependency else @dependencies.concat dependency end end end |
#dep_task(name, workflow, oname, *rest, &block) ⇒ Object Also known as: task_alias
79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 |
# File 'lib/rbbt/workflow/definition.rb', line 79 def dep_task(name, workflow, oname, *rest, &block) dep(workflow, oname, *rest, &block) extension :dep_task unless @extension returns workflow.tasks[oname].result_description if workflow.tasks.include?(oname) unless @result_description task name do raise RbbtException, "dep_task does not have any dependencies" if dependencies.empty? Step.wait_for_jobs dependencies.select{|d| d.streaming? } dep = dependencies.last dep.join raise dep.get_exception if dep.error? raise Aborted, "Aborted dependency #{dep.path}" if dep.aborted? set_info :result_type, dep.info[:result_type] forget = config :forget_dep_tasks, "forget_dep_tasks", :default => FORGET_DEP_TASKS if forget remove = config :remove_dep_tasks, "remove_dep_tasks", :default => REMOVE_DEP_TASKS self.archive_deps self.copy_files_dir self.dependencies = self.dependencies - [dep] Open.rm_rf self.files_dir if Open.exist? self.files_dir FileUtils.cp_r dep.files_dir, self.files_dir if Open.exist?(dep.files_dir) if dep.overriden || ! Workflow.job_path?(dep.path) Open.link dep.path, self.tmp_path else Open.ln_h dep.path, self.tmp_path case remove.to_s when 'true' dep.clean when 'recursive' (dep.dependencies + dep.rec_dependencies).uniq.each do |d| next if d.overriden d.clean unless config(:remove_dep, d.task_signature, d.task_name, d.workflow.to_s, :default => true).to_s == 'false' end dep.clean unless config(:remove_dep, dep.task_signature, dep.task_name, dep.workflow.to_s, :default => true).to_s == 'false' end end else if Open.exists?(dep.files_dir) Open.rm_rf self.files_dir Open.link dep.files_dir, self.files_dir end if defined?(RemoteStep) && RemoteStep === dep Open.write(self.tmp_path, Open.read(dep.path)) else Open.link dep.path, self.path end end nil end end |
#desc(description) ⇒ Object
34 35 36 |
# File 'lib/rbbt/workflow/definition.rb', line 34 def desc(description) @description = description end |
#documentation_markdown ⇒ Object
37 38 39 40 41 42 43 44 45 46 |
# File 'lib/rbbt/workflow/doc.rb', line 37 def documentation_markdown return "" if @libdir.nil? file = @libdir['workflow.md'].find file = @libdir['README.md'].find unless file.exists? if file.exists? file.read else "" end end |
#example(task_name, example) ⇒ Object
21 22 23 24 25 26 |
# File 'lib/rbbt/workflow/examples.rb', line 21 def example(task_name, example) task_info(task_name.to_sym)[:input_types].collect do |input,type| next unless example_dir[task_name][example][input].exists? [input, type, example_dir[task_name][example][input].find] end.compact end |
#example_inputs(task_name, example) ⇒ Object
case type
when :tsv, :array, :text
Log.debug "Pointing #{ input } to #{file}"
inputs[input.to_sym] = file
when :boolean
inputs[input.to_sym] = (file.read.strip == 'true')
else
Log.debug "Loading #{ input } from #{file}"
inputs[input.to_sym] = file.read.strip
end
end
inputs
end
47 48 49 50 51 |
# File 'lib/rbbt/workflow/examples.rb', line 47 def example_inputs(task_name, example) dir = example_dir[task_name][example] info = self.task_info(task_name) Workflow.load_inputs(dir, info[:inputs], info[:input_types]) end |
#example_step(task_name, example = "Example", new_inputs = {}) ⇒ Object
53 54 55 56 57 58 59 60 61 62 |
# File 'lib/rbbt/workflow/examples.rb', line 53 def example_step(task_name, example="Example", new_inputs = {}) inputs = example_inputs(task_name, example) if new_inputs and new_inputs.any? IndiferentHash.setup(new_inputs) inputs = inputs.merge(new_inputs) end self.job(task_name, example, inputs) end |
#examples ⇒ Object
8 9 10 11 12 13 14 15 16 17 18 19 |
# File 'lib/rbbt/workflow/examples.rb', line 8 def examples return {} unless self.libdir.examples.exists? examples = {} example_dir.glob("*/*").each do |example_dir| example = File.basename(example_dir) task_name = File.basename(File.dirname(example_dir)) examples[task_name] ||= [] examples[task_name] << example end IndiferentHash.setup examples examples end |
#export_asynchronous(*names) ⇒ Object Also known as: export
207 208 209 210 211 212 |
# File 'lib/rbbt/workflow/definition.rb', line 207 def export_asynchronous(*names) unexport *names asynchronous_exports.concat names asynchronous_exports.uniq! asynchronous_exports end |
#export_exec(*names) ⇒ Object
193 194 195 196 197 198 |
# File 'lib/rbbt/workflow/definition.rb', line 193 def export_exec(*names) unexport *names exec_exports.concat names exec_exports.uniq! exec_exports end |
#export_stream(*names) ⇒ Object
214 215 216 217 218 219 |
# File 'lib/rbbt/workflow/definition.rb', line 214 def export_stream(*names) unexport *names stream_exports.concat names stream_exports.uniq! stream_exports end |
#export_synchronous(*names) ⇒ Object
200 201 202 203 204 205 |
# File 'lib/rbbt/workflow/definition.rb', line 200 def export_synchronous(*names) unexport *names synchronous_exports.concat names synchronous_exports.uniq! synchronous_exports end |
#extension(extension) ⇒ Object
38 39 40 |
# File 'lib/rbbt/workflow/definition.rb', line 38 def extension(extension) @extension = extension end |
#fast_load_id(id) ⇒ Object Also known as: load_id
79 80 81 82 83 84 85 86 87 88 |
# File 'lib/rbbt/workflow/refactor.rb', line 79 def fast_load_id(id) path = if Path === directory directory[id].find else File.join(directory, id) end task = task_for path return remote_tasks[task].load_id(id) if remote_tasks && remote_tasks.include?(task) return Workflow.fast_load_step path end |
#helper(name, *args, &block) ⇒ Object
25 26 27 28 29 30 31 32 |
# File 'lib/rbbt/workflow/definition.rb', line 25 def helper(name, *args, &block) if block_given? helpers[name] = block else raise RbbtException, "helper #{name} unkown in #{self} workflow" unless helpers[name] helpers[name].call(*args) end end |
#id_for(path) ⇒ Object
195 196 197 198 199 200 201 202 |
# File 'lib/rbbt/workflow/accessor.rb', line 195 def id_for(path) if workdir.respond_to? :find workdir_find = workdir.find else workdir_find = workdir end Misc.path_relative_to workdir_find, path end |
#import(source, *args) ⇒ Object
223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 |
# File 'lib/rbbt/workflow/definition.rb', line 223 def import(source, *args) if args.empty? tasks = source.tasks.collect{|n,t| n} + source.helpers.collect{|n,h| n } else tasks = args.flatten end tasks.each do |task| Log.high "Task #{task} from #{source.to_s} is already present in #{self.to_s} and will be cloacked" if self.tasks.include? task.to_sym self.tasks[task.to_sym] = source.tasks[task.to_sym] if source.tasks.include? task.to_sym self.task_dependencies[task.to_sym] = source.task_dependencies[task.to_sym] if source.tasks.include? task.to_sym self.task_description[task.to_sym] = source.task_description[task.to_sym] if source.tasks.include? task.to_sym self.helpers[task.to_sym] = source.helpers[task.to_sym] if source.helpers.include? task.to_sym end end |
#load_cromwell(file) ⇒ Object
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/rbbt/workflow/integration/cromwell.rb', line 18 def load_cromwell(file) jar = Rbbt.software.opt.jar["wdltool.jar"].produce.find inputs = JSON.load(CMD.cmd("java -jar '#{jar}' inputs '#{file}'")) workflow_inputs = {} inputs.each do |input,input_type| workflow, task, input_name = input.split(".") workflow_inputs[workflow] ||= {} if input_name.nil? input_name = task else input_name = [task, input_name] * "." end workflow_inputs[workflow][input_name] = input_type end workflow_inputs.each do |workflow,input_list| input_list.each do |input_name,input_type| input_type = input_type.split(" ").last.sub('?','') input_type_fixed = case input_type when "File", "file" :file when "Int" :integer when /Array/ :array else input_type.downcase.to_sym end desc = [workflow, input_name] * "." default = nil input input_name, input_type_fixed, desc, default, :nofile => true end task workflow => :string do |*args| cromwell = file = {} Misc.in_dir(self.files_dir) do ["metadata-output"] = file('metadata.json') ["inputs"] = file('inputs') cromwell_inputs = {} self.inputs.to_hash.each do |input, value| next if value.nil? key = [workflow.to_s, input] * "." cromwell_inputs[key] = value end Open.write(file('inputs'), cromwell_inputs.to_json ) Cromwell.run_cromwell(cromwell, self.files_dir, ) end Open.read(Dir.glob(File.join(files_dir, "/cromwell-executions/#{workflow}/*/call-*/execution/stdout")).first) end end end |
#load_documentation ⇒ Object
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/rbbt/workflow/doc.rb', line 48 def load_documentation return if @documentation @documentation ||= Workflow.parse_workflow_doc documentation_markdown @documentation[:tasks].each do |task, description| if task.include? "#" workflow, task = task.split("#") workflow = begin Kernel.const_get workflow rescue next end else workflow = self end if workflow.tasks.include? task.to_sym workflow.tasks[task.to_sym].description = description else Log.low "Documentation for #{ task }, but not a #{ workflow.to_s } task" end end end |
#log(status, message = nil, &block) ⇒ Object
23 24 25 |
# File 'lib/rbbt/workflow/accessor.rb', line 23 def log(status, = nil, &block) Step.log(status, , nil, &block) end |
#nextflow(path, *args) ⇒ Object
184 185 186 187 188 189 190 191 192 |
# File 'lib/rbbt/workflow/integration/nextflow.rb', line 184 def nextflow(path, *args) if File.directory?(path) nextflow_dir path, *args elsif File.exist?(path) nextflow_file path, *args else nextflow_project path, *args end end |
#nextflow_dir(path, output = nil) ⇒ Object
173 174 175 176 |
# File 'lib/rbbt/workflow/integration/nextflow.rb', line 173 def nextflow_dir(path, output = nil) main = File.join(path, 'main.nf') nextflow_file main, File.basename(path), output end |
#nextflow_file(file, name = nil, output = nil) ⇒ Object
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 |
# File 'lib/rbbt/workflow/integration/nextflow.rb', line 72 def nextflow_file(file, name = nil, output = nil) name, output = nil, name if Hash === name if Hash === output result, output = output.collect.first else result = :text end dir = Path.setup(File.dirname(file)) nextflow_schema = dir['nextflow_schema.json'] description, params = Workflow.parse_nextflow_schema(nextflow_schema) if nextflow_schema.exists? file = file + '.nf' unless File.exist?(file) || ! File.exist?(file + '.nf') file = File.(file) name ||= File.basename(file).sub(/\.nf$/,'').gsub(/\s/,'_') Workflow.nextflow_recursive_params(file).each do |param| p,_sep, section = param.partition("-") if ! params.include?(p) params[p] = {type: :string, description: "Undocumented"} end end used_params = [] desc description params.each do |name,info| input name.to_sym, info[:type], info[:description], nil, info[:options].merge(:noload => true) end task name => result do work = file('work') profile = config :profile, :nextflow resume = config :resume, :nextflow config_file = config :config, :nextflow nextflow_inputs = {} inputs.zip(inputs.fields).collect do |v,f| v = if String === v && m = v.match(/^JOB_FILE:(.*)/) file(m[1]) elsif v.nil? Rbbt::Config.get(['nextflow', f] * "_", 'default', f) else v end if f.to_s.include?("-") p,_sep, section = f.to_s.partition("-") name = [section, p] * "." else name = f end case name.to_s when 'outdir' output = nextflow_inputs[name] = v || output || file('output') when 'output' output = nextflow_inputs[name] = v || output || self.tmp_path else nextflow_inputs[name] = v end end current_pwd = FileUtils.pwd Misc.in_dir file('stage') do cmd = "nextflow " cmd += " -C #{config_file}" if config_file cmd += " run" cmd += " -work-dir #{work} -ansi-log false" cmd += " -profile #{profile}" if profile cmd += " -resume" if resume == 'true' Dir.glob(current_pwd + "/*").each do |file| target = File.basename(file) Open.ln_s file, target unless File.exist?(target) end cmd("#{cmd} #{file}", nextflow_inputs.merge('add_option_dashes' => true)) end if output && Open.exists?(output) if File.directory?(output) Dir.glob(output + "/**/*") * "\n" else output_file = output Open.link output, self.tmp_path nil end else work[File.join("*", "*", "*")].glob * "\n" end end end |
#nextflow_project(project, *args) ⇒ Object
178 179 180 181 182 |
# File 'lib/rbbt/workflow/integration/nextflow.rb', line 178 def nextflow_project(project, *args) CMD.cmd_log("nextflow pull #{project}") directory = File.join(ENV["HOME"], '.nextflow/assets', project) nextflow_dir directory, *args end |
#override_dependencies(inputs) ⇒ Object
58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/rbbt/workflow/dependencies.rb', line 58 def override_dependencies(inputs) override_dependencies = IndiferentHash.setup({}) return override_dependencies if inputs.nil? inputs.each do |key,value| if String === key && m = key.match(/(.*)#(.*)/) workflow, task = m.values_at 1, 2 workflow = self.to_s if workflow.empty? override_dependencies[workflow] ||= IndiferentHash.setup({}) override_dependencies[workflow][task] = value end end override_dependencies end |
#real_dependencies(task, orig_jobname, inputs, dependencies) ⇒ Object
151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 |
# File 'lib/rbbt/workflow/dependencies.rb', line 151 def real_dependencies(task, orig_jobname, inputs, dependencies) real_dependencies = [] path_deps = {} override_dependencies = override_dependencies(inputs) overriden = false dependencies.each do |dependency| _inputs = IndiferentHash.setup(inputs.dup) jobname = orig_jobname jobname = _inputs[:jobname] if _inputs.include? :jobname real_dep = case dependency when Array workflow, dep_task, = dependency if override_dependencies[workflow.to_s] && value = override_dependencies[workflow.to_s][dep_task] overriden = true if (inputs.nil? || ! inputs[:not_overriden]) && (.nil? || ! [:not_overriden]) && ! unlocated_override?(value) #overriden = true if (options.nil? || ! options[:not_overriden]) && ! unlocated_override?(value) setup_override_dependency(value, workflow, dep_task) else compute = [:compute] if if && [:canfail] compute = case compute when nil :canfail when Array compute + [:canfail] else [compute, :canfail] end end all_d = (real_dependencies + real_dependencies.flatten.collect{|d| d.rec_dependencies} ).flatten.compact.uniq _inputs = assign_dep_inputs(_inputs, , all_d, workflow.task_info(dep_task)) jobname = _inputs.delete :jobname if _inputs.include? :jobname job = workflow._job(dep_task, jobname, _inputs) ComputeDependency.setup(job, compute) if compute overriden = true if Symbol === job.overriden? && (d.nil? || ! d[:not_overriden]) && (inputs.nil? || ! inputs[:not_overriden]) job end when Step job = dependency overriden = true if Symbol === job.overriden? && (d.nil? || ! d[:not_overriden]) && (inputs.nil? || ! inputs[:not_overriden]) job when Symbol if override_dependencies[self.to_s] && value = override_dependencies[self.to_s][dependency] overriden = true if (inputs.nil? || ! inputs[:not_overriden]) && (.nil? || ! [:not_overriden]) && ! unlocated_override?(value) #overriden = true if (options.nil? || ! options[:not_overriden]) && ! unlocated_override?(value) setup_override_dependency(value, self, dependency) else job = _job(dependency, jobname, _inputs) overriden = true if Symbol === job.overriden && (.nil? || ! [:not_overriden]) && (inputs.nil? || ! inputs[:not_overriden]) job end when Proc if DependencyBlock === dependency orig_dep = dependency.dependency wf, task_name, = orig_dep if override_dependencies[wf.to_s] && value = override_dependencies[wf.to_s][task_name] overriden = true if (inputs.nil? || ! inputs[:not_overriden]) && (.nil? || ! [:not_overriden]) && ! unlocated_override?(value) #overriden = true if (options.nil? || ! options[:not_overriden]) && ! unlocated_override?(value) dep = setup_override_dependency(value, wf, task_name) else = {} if .nil? compute = [:compute] if [:canfail] compute = case compute when nil :canfail when Array compute + [:canfail] else [compute, :canfail] end end = IndiferentHash.setup(.dup) dep = dependency.call jobname, _inputs.merge(), real_dependencies dep = [dep] unless Array === dep new_=[] dep.each{|d| next if d.nil? if Hash === d d = d.merge() d[:workflow] ||= wf d[:task] ||= task_name _override_dependencies = override_dependencies.merge(override_dependencies(d[:inputs] || {})) d = if _override_dependencies[d[:workflow].to_s] && value = _override_dependencies[d[:workflow].to_s][d[:task]] overriden = true if (inputs.nil? || ! inputs[:not_overriden]) && (.nil? || ! [:not_overriden]) && ! unlocated_override?(value) #overriden = true if (options.nil? || ! options[:not_overriden]) && ! unlocated_override?(value) setup_override_dependency(value, d[:workflow], d[:task]) else task_info = d[:workflow].task_info(d[:task]) _inputs = assign_dep_inputs({}, .merge(d[:inputs] || {}), real_dependencies, task_info) _jobname = d.include?(:jobname) ? d[:jobname] : jobname job = d[:workflow]._job(d[:task], _jobname, _inputs) overriden = true if Symbol === job.overriden? && (d.nil? || ! d[:not_overriden]) && (inputs.nil? || ! inputs[:not_overriden]) job end end ComputeDependency.setup(d, compute) if compute new_ << d } dep = new_ end else _inputs = IndiferentHash.setup(_inputs.dup) dep = dependency.call jobname, _inputs, real_dependencies if Hash === dep dep[:workflow] ||= wf || self _override_dependencies = override_dependencies.merge(override_dependencies(dep[:inputs] || {})) if _override_dependencies[dep[:workflow].to_s] && value = _override_dependencies[dep[:workflow].to_s][dep[:task]] overriden = true if (inputs.nil? || ! inputs[:not_overriden]) && (.nil? || ! [:not_overriden]) && ! unlocated_override?(value) #overriden = true if (options.nil? || ! options[:not_overriden]) && ! unlocated_override?(value) setup_override_dependency(value, dep[:workflow], dep[:task]) else task_info = (dep[:task] && dep[:workflow]) ? dep[:workflow].task_info(dep[:task]) : nil _inputs = assign_dep_inputs({}, dep[:inputs], real_dependencies, task_info) _jobname = dep.include?(:jobname) ? dep[:jobname] : jobname job = dep[:workflow]._job(dep[:task], _jobname, _inputs) overriden = true if Symbol === job.overriden? && (d.nil? || ! d[:not_overriden]) && (inputs.nil? || ! inputs[:not_overriden]) job end end end dep else raise "Dependency for #{task.name} not understood: #{Misc.fingerprint dependency}" end real_dependencies << real_dep end [real_dependencies.flatten.compact, overriden] end |
#rec_dependencies(taskname, seen = []) ⇒ Object
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/rbbt/workflow/dependencies.rb', line 2 def rec_dependencies(taskname, seen = []) @rec_dependencies ||= {} @rec_dependencies[taskname] ||= [] unless task_dependencies.include?(taskname) @rec_dependencies[taskname] ||= begin deps = task_dependencies[taskname] all_deps = [] deps.each do |dep| next if seen.include?(dep) if DependencyBlock === dep all_deps << dep.dependency if dep.dependency else all_deps << dep unless Proc === dep end begin case dep when Array wf, t, o = dep wf.rec_dependencies(t.to_sym, seen + [dep]).each do |d| if Array === d new = d.dup else new = [dep.first, d] end if Hash === o and not o.empty? if Hash === new.last hash = new.last.dup o.each{|k,v| hash[k] ||= v} new[new.length-1] = hash else new.push o.dup end end all_deps << new end if wf && t when String, Symbol rec_deps = rec_dependencies(dep.to_sym, seen + [dep]) all_deps.concat rec_deps when DependencyBlock dep = dep.dependency raise TryAgain end rescue TryAgain retry end end all_deps.uniq end end |
#rec_input_defaults(taskname) ⇒ Object
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
# File 'lib/rbbt/workflow/accessor.rb', line 82 def rec_input_defaults(taskname) rec_inputs = rec_inputs(taskname) [taskname].concat(rec_dependencies(taskname)).inject(IndiferentHash.setup({})){|acc, tn| if Array === tn and tn[0] and tn[1] new = tn.first.tasks[tn[1].to_sym].input_defaults elsif Symbol === tn new = tasks[tn.to_sym].input_defaults else next acc end acc = new.merge(acc) acc.delete_if{|input,defaults| not rec_inputs.include? input} acc }.tap{|h| IndiferentHash.setup(h)} end |
#rec_input_descriptions(taskname) ⇒ Object
143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 |
# File 'lib/rbbt/workflow/accessor.rb', line 143 def rec_input_descriptions(taskname) rec_inputs = rec_inputs(taskname) [taskname].concat(rec_dependencies(taskname)).inject({}){|acc, tn| if Array === tn and tn[0] and tn[1] new = tn.first.tasks[tn[1].to_sym].input_descriptions elsif Symbol === tn new = tasks[tn.to_sym].input_descriptions else next acc end acc = new.merge(acc) acc.delete_if{|input,defaults| not rec_inputs.include? input} acc }.tap{|h| IndiferentHash.setup(h)} end |
#rec_input_options(taskname) ⇒ Object
159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 |
# File 'lib/rbbt/workflow/accessor.rb', line 159 def (taskname) rec_inputs = rec_inputs(taskname) [taskname].concat(rec_dependencies(taskname)).inject({}){|acc, tn| if Array === tn and tn[0] and tn[1] new = tn.first.tasks[tn[1].to_sym]. elsif Symbol === tn new = tasks[tn.to_sym]. else next acc end acc = new.merge(acc) acc = acc.delete_if{|input,defaults| not rec_inputs.include? input} acc }.tap{|h| IndiferentHash.setup(h)} end |
#rec_input_types(taskname) ⇒ Object
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 |
# File 'lib/rbbt/workflow/accessor.rb', line 98 def rec_input_types(taskname) rec_inputs = rec_inputs(taskname) [taskname].concat(rec_dependencies(taskname)).inject({}){|acc, tn| if Array === tn and tn[0] and tn[1] new = tn.first.tasks[tn[1].to_sym].input_types elsif Symbol === tn new = tasks[tn.to_sym].input_types else next acc end acc = new.merge(acc) acc.delete_if{|input,defaults| not rec_inputs.include? input} acc }.tap{|h| IndiferentHash.setup(h)} end |
#rec_input_use(taskname) ⇒ Object
114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 |
# File 'lib/rbbt/workflow/accessor.rb', line 114 def rec_input_use(taskname) task = task_from_dep(taskname) deps = rec_dependencies(taskname) inputs = {} task.inputs.each do |input| name = task.name workflow = (task.workflow || self).to_s inputs[input] ||= {} inputs[input][workflow] ||= [] inputs[input][workflow] << name end dep_inputs = Task.dep_inputs deps, self dep_inputs.each do |dep,is| name = dep.name workflow = dep.workflow is.each do |input| inputs[input] ||= {} inputs[input][workflow] ||= [] inputs[input][workflow] << name end end inputs end |
#rec_inputs(taskname) ⇒ Object
75 76 77 78 79 80 |
# File 'lib/rbbt/workflow/accessor.rb', line 75 def rec_inputs(taskname) task = task_from_dep(taskname) deps = rec_dependencies(taskname) dep_inputs = task.dep_inputs deps, self task.inputs + dep_inputs.values.flatten end |
#resumable ⇒ Object
35 36 37 |
# File 'lib/rbbt/workflow/refactor.rb', line 35 def resumable Log.warn "RESUMABLE MOCKED" end |
#returns(description) ⇒ Object
46 47 48 |
# File 'lib/rbbt/workflow/definition.rb', line 46 def returns(description) @result_description = description end |
#setup_override_dependency(dep, workflow, task_name) ⇒ Object
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 |
# File 'lib/rbbt/workflow/dependencies.rb', line 72 def setup_override_dependency(dep, workflow, task_name) return [] if dep == :skip || dep == 'skip' unlocated = unlocated_override?(dep) dep = Workflow.load_step(dep) if not Step === dep dep.original_workflow ||= dep.workflow if dep.workflow dep.original_task_name ||= dep.task_name if dep.task_name dep.original_task_name ||= dep.path.split("/")[-3] dep.original_task_name ||= dep.path.split("/")[-2] dep.workflow = workflow dep.info[:name] = dep.name begin workflow = Kernel.const_get workflow if String === workflow dep.task = workflow.tasks[task_name] if dep.task.nil? && workflow.tasks.include?(task_name) rescue Log.exception $! end dep.task_name = task_name dep.overriden = dep.original_task_name.to_sym if dep.original_task_name && dep.original_task_name.to_s != task_name.to_s || ! unlocated dep.extend step_module dep end |
#task(name, &block) ⇒ Object
134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 |
# File 'lib/rbbt/workflow/definition.rb', line 134 def task(name, &block) if Hash === name type = name.first.last name = name.first.first else result_type = consume_result_type || :marshal end name = name.to_sym block = self.method(name) unless block_given? task_info = { :name => name, :inputs => consume_inputs, :description => consume_description, :input_types => consume_input_types, :result_type => (String === type ? type.to_sym : type), :result_description => consume_result_description, :input_defaults => consume_input_defaults, :input_descriptions => consume_input_descriptions, :required_inputs => consume_required_inputs, :extension => consume_extension, :resumable => consume_resumable, :input_options => } task_info[:extension] = case task_info[:result_type].to_s when "tsv" "tsv" when "yaml" "yaml" when "marshal" "marshal" when "json" "json" else nil end if task_info[:extension].nil? task = Task.setup(task_info, &block) last_task = task tasks[name] = task task_dependencies[name] = consume_dependencies task end |
#task_exports ⇒ Object
229 230 231 |
# File 'lib/rbbt/workflow/accessor.rb', line 229 def task_exports [exec_exports, synchronous_exports, asynchronous_exports, stream_exports].compact.flatten.uniq end |
#task_for(path) ⇒ Object
212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 |
# File 'lib/rbbt/workflow/accessor.rb', line 212 def task_for(path) if workdir.respond_to? :find workdir_find = workdir.find else workdir_find = workdir end workdir_find = File.(workdir_find) path = File.(path) dir = File.dirname(path) begin Misc.path_relative_to(workdir_find, dir).sub(/([^\/]+)\/.*/,'\1') rescue nil end end |
#task_from_dep(dep) ⇒ Object
176 177 178 179 180 181 182 183 184 185 186 187 |
# File 'lib/rbbt/workflow/accessor.rb', line 176 def task_from_dep(dep) task = case dep when Array dep.first.tasks[dep[1]] when String tasks[dep.to_sym] when Symbol tasks[dep.to_sym] end raise "Unknown dependency: #{Misc.fingerprint dep}" if task.nil? task end |
#task_info(name) ⇒ Object
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/rbbt/workflow/accessor.rb', line 27 def task_info(name) name = name.to_sym task = tasks[name] raise "No '#{name}' task in '#{self.to_s}' Workflow" if task.nil? id = File.join(self.to_s, name.to_s) @task_info ||= {} @task_info[id] ||= begin description = task.description result_description = task.result_description result_type = task.result_type inputs = rec_inputs(name).uniq input_types = rec_input_types(name) input_descriptions = rec_input_descriptions(name) input_use = rec_input_use(name) input_defaults = rec_input_defaults(name) = (name) extension = task.extension export = case when (synchronous_exports.include?(name.to_sym) or synchronous_exports.include?(name.to_s)) :synchronous when (asynchronous_exports.include?(name.to_sym) or asynchronous_exports.include?(name.to_s)) :asynchronous when (exec_exports.include?(name.to_sym) or exec_exports.include?(name.to_s)) :exec when (stream_exports.include?(name.to_sym) or stream_exports.include?(name.to_s)) :stream else :none end dependencies = task_dependencies[name].select{|dep| String === dep or Symbol === dep} { :id => id, :description => description, :export => export, :inputs => inputs, :input_types => input_types, :input_descriptions => input_descriptions, :input_defaults => input_defaults, :input_options => , :input_use => input_use, :result_type => result_type, :result_description => result_description, :dependencies => dependencies, :extension => extension } end end |
#unexport(*names) ⇒ Object
184 185 186 187 188 189 190 191 |
# File 'lib/rbbt/workflow/definition.rb', line 184 def unexport(*names) names = names.collect{|n| n.to_s} + names.collect{|n| n.to_sym} names.uniq! exec_exports.replace exec_exports - names if exec_exports synchronous_exports.replace synchronous_exports - names if synchronous_exports asynchronous_exports.replace asynchronous_exports - names if asynchronous_exports stream_exports.replace stream_exports - names if stream_exports end |