fix: allow create a stage when calling process working dir is not in …
…dvc repo
dvc.yaml was harcoded in the _create_stage method of class StageCache. In some use case, we want to cache the stage but the calling process is not in the dvc repo. Hardcoding dvc.yaml raises an error (StagePathOutsideError) in the path verification as the dvc.yaml is out of the repository.
By providing a path we can pass the stage path file directly. So, now it is possible to create the cache!
-
[x] ❗ I have followed the Contributing to DVC checklist.
-
[x] 📖 If this PR requires documentation updates, I have created a separate PR (or issue, at least) in dvc.org and linked it here.
Thank you for the contribution - we'll try to review it as soon as possible. 🙏
@skshetry I may need help on the performance benchmark regression. I don't think my modification did impact the --help command.
@gbmarc1, could you please share your use case here? The default path limited to dvc.yaml is intended (until we see a better usecase). Why do you want to use dvc.yaml file that's not in repo?
Regarding StageCache._create_stage(), the stage that it returns is used only internally within StageCache, it is never exposed to outside code, because stagecache does not keep track of stage's path and that is also intended to be as such, because moving stages around dvc.yaml file is common and should not break our run-cache.
@gbmarc1, could you please share your use case here? The default path limited to
dvc.yamlis intended (until we see a better usecase). Why do you want to use dvc.yaml file that's not in repo?Regarding
StageCache._create_stage(), the stage that it returns is used only internally withinStageCache, it is never exposed to outside code, becausestagecachedoes not keep track of stage's path and that is also intended to be as such, because moving stages around dvc.yaml file is common and should not break our run-cache.
The high level objective is to have a repo where I can manage multiple models. My idea was to create an abstraction over DVC to manage the pipelines for those models. Therefore, I want to leverage the execution DVC "engine" as I don't want to redevelop what DVC does great.
My abstraction consists in an augmented Pipeline and Stage objects that I convert to DVC Pipeline/Stage objects for execution. There is a code snippet of my prototype:
from dvc.repo import Repo as DVCRepo
from dvc.repo.index import Index
from dvc.repo.reproduce import _reproduce_stages
from dvc.stage.cache import StageCache
class Model:
...
def execute(
self,
output_dir: Path,
env_name: Optional[str] = None,
stage_names: Optional[set[str]] = None,
):
dvc_repo = self.configure_dvc_repo(output_dir)
stages = [
stage.to_dvc_stage(dvc_repo, env_name) for stage in self.stages.values()
]
index = Index(dvc_repo, stages=stages)
dvc_repo.lock.lock()
try:
# from dvc._debug import debug
# with debug():
if stage_names is not None:
stages = [s for s in stages if s.name in stage_names]
_reproduce_stages(
index.graph,
stages,
downstream=False,
single_item=False,
on_unchanged=None,
**{},
)
finally:
dvc_repo.lock.unlock()
@staticmethod
def configure_dvc_repo(output_dir: Path):
os.makedirs(str(output_dir / "tmp"), exist_ok=True)
logger.info(f"DVC output result will be located:\n{output_dir}")
dvc_repo = DVCRepo(
root_dir=str(
output_dir
), # This does not work because uninitialized=True, we monkey patch below
uninitialized=True,
config={"cache": {"dir": str(output_dir / "cache")}},
)
dvc_repo.stage_cache = StageCache(dvc_repo)
dvc_repo.tmp_dir = output_dir / "tmp"
dvc_repo.root_dir = str(output_dir)
return dvc_repo
...
I know that I use a lot of internals and I am willing to accept the risk of changing my code if you change the internals. The code works great with the exception of the dvc.yaml path check (This PR). I may not understand the purpose of the path for a stage in the cache object; but, I feel like it is only to use the create_stage method which is for code reusability but the path could be anything in this particular case since we only want a Stage object. This is the comments that lead me to that thought:
https://github.com/iterative/dvc/blob/d0eda1d5385ba6a78247528327563b536a561b78/dvc/stage/cache.py#L135-L137
This is also why I gave the stage.path to _create_stage in the _uncached_outs method as we seem to only need a "copy" of the stage. Should the "copy" have the same path? I believe it would make more sense.
As you said because stagecache does not keep track of stage's path and that is also intended to be as such this means that providing the path should not be necessary in that case. So why bother checking it? As I mentioned, I think it's only for code reusability; but, I think it is damaging for code clarity and maintainability, and in my case it blocks my ninja use case :).
So, I guess the proper modification would be to remove that path argument or provide None value to make it more explicit that it is not necessary here? What do you think?
Would it be possible to share a traceback? It’s not clear where you use stagecache in the above snippet and where in particular the error is raised.
If stagecache does fail, I’d definitely consider it a bug as it should not depend on the path. dvc.yaml is just for convenience, and we are being defensive as we don’t have a good support for dangling stages.
If it’s possible, it’d be nice to see a simple reproducible test script for the bug. Thanks.
@skshetry , there is a test script and my traceback.
Test script:
import os
from dvc.repo import Repo
from dvc.repo.index import Index
from dvc.repo.reproduce import _reproduce_stages
from dvc.stage.cache import StageCache
from pathlib import Path
import tempfile
from dvc.stage import PipelineStage
from dvc.output import Output
from dvc.dependency import Dependency
def test_path_stage_cache_bug():
with tempfile.TemporaryDirectory() as temp_dir:
output_dir = Path(temp_dir)
stages = {
"stage1": {
"outputs": ["out.txt"],
"deps": [],
"cmd": f'echo "Hola" > {output_dir/"out.txt"}',
},
"stage2": {
"outputs": ["out2.txt"],
"deps": ["out.txt"],
"cmd": f'echo "Hola" > {output_dir/"out2.txt"}',
},
"stage3": {
"outputs": [],
"deps": ["out2.txt"],
"cmd": 'echo "Hi!"',
},
}
dvc_repo = configure_dvc_repo(output_dir)
stages = [
to_dvc_stage(dvc_repo, stage_name, stage_def)
for stage_name, stage_def in stages.items()
]
index = Index(dvc_repo, stages=stages)
dvc_repo.lock.lock()
try:
_reproduce_stages(
index.graph,
stages,
downstream=False,
single_item=False,
on_unchanged=None,
**{},
)
finally:
dvc_repo.lock.unlock()
def configure_dvc_repo(output_dir: Path) -> Repo:
os.makedirs(str(output_dir / "tmp"), exist_ok=True)
dvc_repo = Repo(
root_dir=str(
output_dir
), # This does not work because uninitialized=True, we monkey patch below
uninitialized=True,
config={"cache": {"dir": str(output_dir / "cache")}},
)
dvc_repo.stage_cache = StageCache(dvc_repo)
dvc_repo.tmp_dir = output_dir / "tmp"
dvc_repo.root_dir = str(output_dir)
return dvc_repo
def to_dvc_stage(
dvc_repo: Repo, dvc_stage_name: str, stage_def: dict
) -> PipelineStage:
dvc_stage = PipelineStage(
dvc_repo,
name=dvc_stage_name,
path=str(Path(dvc_repo.root_dir) / f"dvc.yaml"),
cmd=stage_def["cmd"],
wdir=str(Path(dvc_repo.root_dir)),
md5=None,
locked=False, # backward compatibility
frozen=False,
always_changed=False,
stage_text=dvc_stage_name,
dvcfile=None,
desc=None,
meta=None,
)
add_dependencies(dvc_stage, stage_def["deps"])
add_outputs(dvc_stage, stage_def["outputs"])
return dvc_stage
def add_outputs(dvc_stage: PipelineStage, outputs):
deps = [
Output(
dvc_stage,
str(Path(dvc_stage.repo.root_dir) / p),
repo=dvc_stage.repo,
cache=True,
)
for p in outputs
]
dvc_stage.outs.extend(deps)
def add_dependencies(dvc_stage: PipelineStage, deps):
deps = [
Dependency(
dvc_stage,
str(Path(dvc_stage.repo.root_dir) / p),
repo=dvc_stage.repo,
cache=True,
)
for p in deps
]
dvc_stage.deps.extend(deps)
Traceback:
FAILED [100%]2022-07-25 13:40:41,742 DEBUG: Output 'out.txt' of stage: 'C:\Users\MARCAN~1\AppData\Local\Temp\tmpgzxls30e\dvc.yaml:stage1' changed because it is 'deleted'.
2022-07-25 13:40:41,744 DEBUG: stage: 'C:\Users\MARCAN~1\AppData\Local\Temp\tmpgzxls30e\dvc.yaml:stage1' changed.
2022-07-25 13:40:41,754 DEBUG: Removing output 'out.txt' of stage: 'C:\Users\MARCAN~1\AppData\Local\Temp\tmpgzxls30e\dvc.yaml:stage1'.
Running stage 'C:\Users\MARCAN~1\AppData\Local\Temp\tmpgzxls30e\dvc.yaml:stage1':
> echo "Hola" > C:\Users\MARCAN~1\AppData\Local\Temp\tmpgzxls30e\out.txt
2022-07-25 13:40:41,790 DEBUG: Computed stage: 'C:\Users\MARCAN~1\AppData\Local\Temp\tmpgzxls30e\dvc.yaml:stage1' md5: '2a975831c622102a3b6c2df6adc63835'
2022-07-25 13:40:41,836 DEBUG: stage: 'C:\Users\MARCAN~1\AppData\Local\Temp\tmpgzxls30e\dvc.yaml:stage1' was reproduced
Generating lock file 'C:\Users\MARCAN~1\AppData\Local\Temp\tmpgzxls30e\dvc.lock'
Updating lock file 'C:\Users\MARCAN~1\AppData\Local\Temp\tmpgzxls30e\dvc.lock'
2022-07-25 13:40:41,900 DEBUG: Dependency 'out.txt' of stage: 'C:\Users\MARCAN~1\AppData\Local\Temp\tmpgzxls30e\dvc.yaml:stage2' changed because it is 'modified'.
2022-07-25 13:40:41,905 DEBUG: stage: 'C:\Users\MARCAN~1\AppData\Local\Temp\tmpgzxls30e\dvc.yaml:stage2' changed.
2022-07-25 13:40:41,914 DEBUG: Removing output 'out2.txt' of stage: 'C:\Users\MARCAN~1\AppData\Local\Temp\tmpgzxls30e\dvc.yaml:stage2'.
2022-07-25 13:40:41,923 DEBUG: {'out.txt': 'modified'}
Running stage 'C:\Users\MARCAN~1\AppData\Local\Temp\tmpgzxls30e\dvc.yaml:stage2':
> echo "Hola" > C:\Users\MARCAN~1\AppData\Local\Temp\tmpgzxls30e\out2.txt
2022-07-25 13:40:41,957 DEBUG: Computed stage: 'C:\Users\MARCAN~1\AppData\Local\Temp\tmpgzxls30e\dvc.yaml:stage2' md5: 'fdc834ed673b9c0d9509405feeb1bf6a'
tests\test_path_stage_cache_bug.py:12 (test_path_stage_cache_bug)
G = <networkx.classes.digraph.DiGraph object at 0x0000023A7AA527D0>
stages = [Stage: 'C:\Users\MARCAN~1\AppData\Local\Temp\tmpgzxls30e\dvc.yaml:stage1', Stage: 'C:\Users\MARCAN~1\AppData\Local\Temp\tmpgzxls30e\dvc.yaml:stage2', Stage: 'C:\Users\MARCAN~1\AppData\Local\Temp\tmpgzxls30e\dvc.yaml:stage3']
downstream = False, single_item = False, on_unchanged = None, kwargs = {}
steps = [Stage: 'C:\Users\MARCAN~1\AppData\Local\Temp\tmpgzxls30e\dvc.yaml:stage1', Stage: 'C:\Users\MARCAN~1\AppData\Local\Temp\tmpgzxls30e\dvc.yaml:stage2', Stage: 'C:\Users\MARCAN~1\AppData\Local\Temp\tmpgzxls30e\dvc.yaml:stage3']
force_downstream = False
result = [Stage: 'C:\Users\MARCAN~1\AppData\Local\Temp\tmpgzxls30e\dvc.yaml:stage1']
unchanged = []
ret = [Stage: 'C:\Users\MARCAN~1\AppData\Local\Temp\tmpgzxls30e\dvc.yaml:stage1']
checkpoint_func = None
stage = Stage: 'C:\Users\MARCAN~1\AppData\Local\Temp\tmpgzxls30e\dvc.yaml:stage2'
def _reproduce_stages(
G, stages, downstream=False, single_item=False, on_unchanged=None, **kwargs
):
r"""Derive the evaluation of the given node for the given graph.
When you _reproduce a stage_, you want to _evaluate the descendants_
to know if it make sense to _recompute_ it. A post-ordered search
will give us an order list of the nodes we want.
For example, let's say that we have the following pipeline:
E
/ \
D F
/ \ \
B C G
\ /
A
The derived evaluation of D would be: [A, B, C, D]
In case that `downstream` option is specified, the desired effect
is to derive the evaluation starting from the given stage up to the
ancestors. However, the `networkx.ancestors` returns a set, without
any guarantee of any order, so we are going to reverse the graph and
use a reverse post-ordered search using the given stage as a starting
point.
E A
/ \ / \
D F B C G
/ \ \ --- reverse --> \ / /
B C G D F
\ / \ /
A E
The derived evaluation of _downstream_ B would be: [B, D, E]
"""
steps = _get_steps(G, stages, downstream, single_item)
force_downstream = kwargs.pop("force_downstream", False)
result = []
unchanged = []
# `ret` is used to add a cosmetic newline.
ret = []
checkpoint_func = kwargs.pop("checkpoint_func", None)
for stage in steps:
if ret:
logger.info("")
if checkpoint_func:
kwargs["checkpoint_func"] = partial(
_repro_callback, checkpoint_func, unchanged
)
from dvc.stage.monitor import CheckpointKilledError
try:
> ret = _reproduce_stage(stage, **kwargs)
..\dvc\repo\reproduce.py:204:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
stage = Stage: 'C:\Users\MARCAN~1\AppData\Local\Temp\tmpgzxls30e\dvc.yaml:stage2'
kwargs = {}
_run_callback = <function _reproduce_stage.<locals>._run_callback at 0x0000023A7A9D57E0>
checkpoint_func = None
def _reproduce_stage(stage: "Stage", **kwargs):
def _run_callback(repro_callback):
_dump_stage(stage)
_track_stage(stage)
repro_callback([stage])
checkpoint_func = kwargs.pop("checkpoint_func", None)
if stage.is_checkpoint:
if checkpoint_func:
kwargs["checkpoint_func"] = partial(_run_callback, checkpoint_func)
else:
raise DvcException(
"Checkpoint stages are not supported in 'dvc repro'. "
"Checkpoint stages must be reproduced with 'dvc exp run'. "
)
if stage.frozen and not stage.is_import:
logger.warning(
"%s is frozen. Its dependencies are"
" not going to be reproduced.",
stage,
)
> stage = stage.reproduce(**kwargs)
..\dvc\repo\reproduce.py:41:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
args = (Stage: 'C:\Users\MARCAN~1\AppData\Local\Temp\tmpgzxls30e\dvc.yaml:stage2',)
kwargs = {}
call = <Call Stage.reproduce(stage: 'C:\Users\MARCAN~1\AppData\Local\Temp\tmpgzxls30e\dvc.yaml:stage2')>
def wrapper(*args, **kwargs):
call = Call(func, args, kwargs)
> return deco(call, *dargs, **dkwargs)
..\venv\lib\site-packages\funcy\decorators.py:45:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
call = <Call Stage.reproduce(stage: 'C:\Users\MARCAN~1\AppData\Local\Temp\tmpgzxls30e\dvc.yaml:stage2')>
read = ['deps'], write = ['outs']
@decorator
def rwlocked(call, read=None, write=None):
import sys
from dvc.dependency.repo import RepoDependency
from dvc.rwlock import rwlock
if read is None:
read = []
if write is None:
write = []
stage = call._args[0] # pylint: disable=protected-access
assert stage.repo.lock.is_locked
def _chain(names):
return [
item.fs_path
for attr in names
for item in getattr(stage, attr)
# There is no need to lock RepoDependency deps, as there is no
# corresponding OutputREPO, so we can't even write it.
if not isinstance(item, RepoDependency)
]
cmd = " ".join(sys.argv)
with rwlock(stage.repo.tmp_dir, cmd, _chain(read), _chain(write)):
> return call()
..\dvc\stage\decorators.py:36:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <Call Stage.reproduce(stage: 'C:\Users\MARCAN~1\AppData\Local\Temp\tmpgzxls30e\dvc.yaml:stage2')>
a = (), kw = {}
def __call__(self, *a, **kw):
if not a and not kw:
> return self._func(*self._args, **self._kwargs)
..\venv\lib\site-packages\funcy\decorators.py:66:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = Stage: 'C:\Users\MARCAN~1\AppData\Local\Temp\tmpgzxls30e\dvc.yaml:stage2'
interactive = False, kwargs = {}
msg = "Going to reproduce stage: 'C:\\Users\\MARCAN~1\\AppData\\Local\\Temp\\tmpgzxls30e\\dvc.yaml:stage2'. Are you sure you want to continue?"
@rwlocked(read=["deps"], write=["outs"])
def reproduce(self, interactive=False, **kwargs):
if not (kwargs.get("force", False) or self.changed()):
if not isinstance(self, PipelineStage) and self.is_data_source:
logger.info("'%s' didn't change, skipping", self.addressing)
else:
logger.info(
"Stage '%s' didn't change, skipping", self.addressing
)
return None
msg = (
"Going to reproduce {stage}. "
"Are you sure you want to continue?".format(stage=self)
)
if interactive and not prompt.confirm(msg):
raise DvcException("reproduction aborted by the user")
> self.run(**kwargs)
..\dvc\stage\__init__.py:430:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
args = (Stage: 'C:\Users\MARCAN~1\AppData\Local\Temp\tmpgzxls30e\dvc.yaml:stage2',)
kwargs = {}
call = <Call Stage.run(stage: 'C:\Users\MARCAN~1\AppData\Local\Temp\tmpgzxls30e\dvc.yaml:stage2')>
def wrapper(*args, **kwargs):
call = Call(func, args, kwargs)
> return deco(call, *dargs, **dkwargs)
..\venv\lib\site-packages\funcy\decorators.py:45:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
call = <Call Stage.run(stage: 'C:\Users\MARCAN~1\AppData\Local\Temp\tmpgzxls30e\dvc.yaml:stage2')>
read = ['deps', 'outs'], write = []
@decorator
def rwlocked(call, read=None, write=None):
import sys
from dvc.dependency.repo import RepoDependency
from dvc.rwlock import rwlock
if read is None:
read = []
if write is None:
write = []
stage = call._args[0] # pylint: disable=protected-access
assert stage.repo.lock.is_locked
def _chain(names):
return [
item.fs_path
for attr in names
for item in getattr(stage, attr)
# There is no need to lock RepoDependency deps, as there is no
# corresponding OutputREPO, so we can't even write it.
if not isinstance(item, RepoDependency)
]
cmd = " ".join(sys.argv)
with rwlock(stage.repo.tmp_dir, cmd, _chain(read), _chain(write)):
> return call()
..\dvc\stage\decorators.py:36:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <Call Stage.run(stage: 'C:\Users\MARCAN~1\AppData\Local\Temp\tmpgzxls30e\dvc.yaml:stage2')>
a = (), kw = {}
def __call__(self, *a, **kw):
if not a and not kw:
> return self._func(*self._args, **self._kwargs)
..\venv\lib\site-packages\funcy\decorators.py:66:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = Stage: 'C:\Users\MARCAN~1\AppData\Local\Temp\tmpgzxls30e\dvc.yaml:stage2'
dry = False, no_commit = False, force = False, allow_missing = False
kwargs = {}
@rwlocked(read=["deps", "outs"])
def run(
self,
dry=False,
no_commit=False,
force=False,
allow_missing=False,
**kwargs,
):
if (self.cmd or self.is_import) and not self.frozen and not dry:
self.remove_outs(ignore_remove=False, force=False)
if not self.frozen and self.is_import:
jobs = kwargs.get("jobs", None)
self._sync_import(dry, force, jobs)
elif not self.frozen and self.cmd:
self._run_stage(dry, force, **kwargs)
else:
args = (
("outputs", "frozen ") if self.frozen else ("data sources", "")
)
logger.info("Verifying %s in %s%s", *args, self)
if not dry:
self._check_missing_outputs()
if not dry:
if kwargs.get("checkpoint_func", None):
allow_missing = True
> self.save(allow_missing=allow_missing)
..\dvc\stage\__init__.py:549:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = Stage: 'C:\Users\MARCAN~1\AppData\Local\Temp\tmpgzxls30e\dvc.yaml:stage2'
allow_missing = False
def save(self, allow_missing=False):
self.save_deps(allow_missing=allow_missing)
self.save_outs(allow_missing=allow_missing)
self.md5 = self.compute_md5()
> self.repo.stage_cache.save(self)
..\dvc\stage\__init__.py:463:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <dvc.stage.cache.StageCache object at 0x0000023A7A631C90>
stage = Stage: 'C:\Users\MARCAN~1\AppData\Local\Temp\tmpgzxls30e\dvc.yaml:stage2'
def save(self, stage):
from .serialize import to_single_stage_lockfile
if not _can_hash(stage):
return
cache_key = _get_stage_hash(stage)
cache = to_single_stage_lockfile(stage)
cache_value = _get_cache_hash(cache)
existing_cache = self._load_cache(cache_key, cache_value)
cache = existing_cache or cache
> for out in self._uncached_outs(stage, cache):
..\dvc\stage\cache.py:163:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <dvc.stage.cache.StageCache object at 0x0000023A7A631C90>
stage = Stage: 'C:\Users\MARCAN~1\AppData\Local\Temp\tmpgzxls30e\dvc.yaml:stage2'
cache = OrderedDict([('cmd', 'echo "Hola" > C:\\Users\\MARCAN~1\\AppData\\Local\\Temp\\tmpgzxls30e\\out2.txt'), ('deps', [Orde...e', 9)])]), ('outs', [OrderedDict([('path', 'out2.txt'), ('md5', '16b8035ed285eddf828dbd92f177f111'), ('size', 9)])])])
def _uncached_outs(self, stage, cache):
# NOTE: using temporary stage to avoid accidentally modifying original
# stage and to workaround `commit/checkout` not working for uncached
# outputs.
> cached_stage = self._create_stage(cache, wdir=stage.wdir)
..\dvc\stage\cache.py:138:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <dvc.stage.cache.StageCache object at 0x0000023A7A631C90>
cache = OrderedDict([('cmd', 'echo "Hola" > C:\\Users\\MARCAN~1\\AppData\\Local\\Temp\\tmpgzxls30e\\out2.txt'), ('deps', [Orde...e', 9)])]), ('outs', [OrderedDict([('path', 'out2.txt'), ('md5', '16b8035ed285eddf828dbd92f177f111'), ('size', 9)])])])
wdir = 'C:\\Users\\MARCAN~1\\AppData\\Local\\Temp\\tmpgzxls30e'
def _create_stage(self, cache, wdir=None):
from . import PipelineStage, create_stage
from .loader import StageLoader
> stage = create_stage(
PipelineStage,
repo=self.repo,
path="dvc.yaml",
cmd=cache["cmd"],
wdir=wdir,
outs=[out["path"] for out in cache["outs"]],
external=True,
)
..\dvc\stage\cache.py:113:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
cls = <class 'dvc.stage.PipelineStage'>
repo = Repo: 'C:\Users\MARCAN~1\AppData\Local\Temp\tmpgzxls30e'
path = 'D:\\dvc\\tests\\dvc.yaml', external = True
kwargs = {'cmd': 'echo "Hola" > C:\\Users\\MARCAN~1\\AppData\\Local\\Temp\\tmpgzxls30e\\out2.txt', 'outs': ['out2.txt'], 'wdir': 'C:\\Users\\MARCAN~1\\AppData\\Local\\Temp\\tmpgzxls30e'}
check_dvcfile_path = <function check_dvcfile_path at 0x0000023A7AA8D630>
wdir = 'C:\\Users\\MARCAN~1\\AppData\\Local\\Temp\\tmpgzxls30e'
def create_stage(cls, repo, path, external=False, **kwargs):
from dvc.dvcfile import check_dvcfile_path
wdir = os.path.abspath(kwargs.get("wdir", None) or os.curdir)
path = os.path.abspath(path)
check_dvcfile_path(repo, path)
check_stage_path(repo, wdir, is_wdir=kwargs.get("wdir"))
> check_stage_path(repo, os.path.dirname(path))
..\dvc\stage\__init__.py:83:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
repo = Repo: 'C:\Users\MARCAN~1\AppData\Local\Temp\tmpgzxls30e'
path = 'D:\\dvc\\tests', is_wdir = False
def check_stage_path(repo, path, is_wdir=False):
from dvc.utils.fs import path_isin
assert repo is not None
error_msg = "{wdir_or_path} '{path}' {{}}".format(
wdir_or_path="stage working dir" if is_wdir else "file path", path=path
)
real_path = os.path.realpath(path)
if not os.path.exists(real_path):
raise StagePathNotFoundError(error_msg.format("does not exist"))
if not os.path.isdir(real_path):
raise StagePathNotDirectoryError(error_msg.format("is not directory"))
proj_dir = os.path.realpath(repo.root_dir)
if real_path != proj_dir and not path_isin(real_path, proj_dir):
> raise StagePathOutsideError(error_msg.format("is outside of DVC repo"))
E dvc.stage.exceptions.StagePathOutsideError: file path 'D:\dvc\tests' is outside of DVC repo
..\dvc\stage\utils.py:42: StagePathOutsideError
The above exception was the direct cause of the following exception:
def test_path_stage_cache_bug():
with tempfile.TemporaryDirectory() as temp_dir:
output_dir = Path(temp_dir)
stages = {
"stage1": {
"outputs": ["out.txt"],
"deps": [],
"cmd": f'echo "Hola" > {output_dir/"out.txt"}',
},
"stage2": {
"outputs": ["out2.txt"],
"deps": ["out.txt"],
"cmd": f'echo "Hola" > {output_dir/"out2.txt"}',
},
"stage3": {
"outputs": [],
"deps": ["out2.txt"],
"cmd": 'echo "Hi!"',
},
}
dvc_repo = configure_dvc_repo(output_dir)
stages = [
to_dvc_stage(dvc_repo, stage_name, stage_def)
for stage_name, stage_def in stages.items()
]
index = Index(dvc_repo, stages=stages)
dvc_repo.lock.lock()
try:
> _reproduce_stages(
index.graph,
stages,
downstream=False,
single_item=False,
on_unchanged=None,
**{},
)
test_path_stage_cache_bug.py:46:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
G = <networkx.classes.digraph.DiGraph object at 0x0000023A7AA527D0>
stages = [Stage: 'C:\Users\MARCAN~1\AppData\Local\Temp\tmpgzxls30e\dvc.yaml:stage1', Stage: 'C:\Users\MARCAN~1\AppData\Local\Temp\tmpgzxls30e\dvc.yaml:stage2', Stage: 'C:\Users\MARCAN~1\AppData\Local\Temp\tmpgzxls30e\dvc.yaml:stage3']
downstream = False, single_item = False, on_unchanged = None, kwargs = {}
steps = [Stage: 'C:\Users\MARCAN~1\AppData\Local\Temp\tmpgzxls30e\dvc.yaml:stage1', Stage: 'C:\Users\MARCAN~1\AppData\Local\Temp\tmpgzxls30e\dvc.yaml:stage2', Stage: 'C:\Users\MARCAN~1\AppData\Local\Temp\tmpgzxls30e\dvc.yaml:stage3']
force_downstream = False
result = [Stage: 'C:\Users\MARCAN~1\AppData\Local\Temp\tmpgzxls30e\dvc.yaml:stage1']
unchanged = []
ret = [Stage: 'C:\Users\MARCAN~1\AppData\Local\Temp\tmpgzxls30e\dvc.yaml:stage1']
checkpoint_func = None
stage = Stage: 'C:\Users\MARCAN~1\AppData\Local\Temp\tmpgzxls30e\dvc.yaml:stage2'
def _reproduce_stages(
G, stages, downstream=False, single_item=False, on_unchanged=None, **kwargs
):
r"""Derive the evaluation of the given node for the given graph.
When you _reproduce a stage_, you want to _evaluate the descendants_
to know if it make sense to _recompute_ it. A post-ordered search
will give us an order list of the nodes we want.
For example, let's say that we have the following pipeline:
E
/ \
D F
/ \ \
B C G
\ /
A
The derived evaluation of D would be: [A, B, C, D]
In case that `downstream` option is specified, the desired effect
is to derive the evaluation starting from the given stage up to the
ancestors. However, the `networkx.ancestors` returns a set, without
any guarantee of any order, so we are going to reverse the graph and
use a reverse post-ordered search using the given stage as a starting
point.
E A
/ \ / \
D F B C G
/ \ \ --- reverse --> \ / /
B C G D F
\ / \ /
A E
The derived evaluation of _downstream_ B would be: [B, D, E]
"""
steps = _get_steps(G, stages, downstream, single_item)
force_downstream = kwargs.pop("force_downstream", False)
result = []
unchanged = []
# `ret` is used to add a cosmetic newline.
ret = []
checkpoint_func = kwargs.pop("checkpoint_func", None)
for stage in steps:
if ret:
logger.info("")
if checkpoint_func:
kwargs["checkpoint_func"] = partial(
_repro_callback, checkpoint_func, unchanged
)
from dvc.stage.monitor import CheckpointKilledError
try:
ret = _reproduce_stage(stage, **kwargs)
if len(ret) == 0:
unchanged.extend([stage])
elif force_downstream:
# NOTE: we are walking our pipeline from the top to the
# bottom. If one stage is changed, it will be reproduced,
# which tells us that we should force reproducing all of
# the other stages down below, even if their direct
# dependencies didn't change.
kwargs["force"] = True
if ret:
result.extend(ret)
except CheckpointKilledError:
raise
except Exception as exc:
> raise ReproductionError(stage.addressing) from exc
E dvc.exceptions.ReproductionError: failed to reproduce 'C:\Users\MARCAN~1\AppData\Local\Temp\tmpgzxls30e\dvc.yaml:stage2'
..\dvc\repo\reproduce.py:221: ReproductionError
@skshetry any feedback on this? Were you able to reproduce using the provided script?
@skshetry any feedback on this? Were you able to reproduce using the provided script?
Hi, @gbmarc1. Sorry, I haven't been able to go through this. I'll try getting to it this week.
Would it be possible to make the outputs/dependencies absolute? i.e.
def test_repro(tmp_dir, make_tmp_dir, dvc):
working_dir = make_tmp_dir("working_dir")
working_dir.gen("foo", "foo")
with dvc.lock:
stage = PipelineStage(
repo=dvc,
name="stage",
path=os.fspath(tmp_dir / "dvc.yaml"),
cmd=f"cp {working_dir / 'foo'} {working_dir / 'bar'}",
wdir=os.fspath(tmp_dir),
)
stage.deps.append(Output(stage, os.fspath(working_dir / "foo")))
stage.outs.append(Output(stage, os.fspath(working_dir / "bar")))
stage.reproduce()
Closing as stale.