1
0
mirror of https://github.com/SpaceVim/SpaceVim.git synced 2025-04-14 15:19:12 +08:00

feat(defx): add bundle defx-sftp

ref: https://github.com/SpaceVim/SpaceVim/issues/4516
This commit is contained in:
wsdjeg 2022-04-13 09:48:14 +08:00
parent 5ccbb5c6a2
commit f9755af4c5
10 changed files with 590 additions and 0 deletions

View File

@ -81,6 +81,7 @@ function! SpaceVim#layers#core#plugins() abort
call add(plugins, [g:_spacevim_root_dir . 'bundle/defx.nvim',{'merged' : 0, 'loadconf' : 1 , 'loadconf_before' : 1}])
call add(plugins, [g:_spacevim_root_dir . 'bundle/defx-git',{'merged' : 0, 'loadconf' : 1}])
call add(plugins, [g:_spacevim_root_dir . 'bundle/defx-icons',{'merged' : 0}])
call add(plugins, [g:_spacevim_root_dir . 'bundle/defx-sftp',{'merged' : 0}])
endif
if !g:spacevim_vimcompatible

129
bundle/defx-sftp/.gitignore vendored Normal file
View File

@ -0,0 +1,129 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
pip-wheel-metadata/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
.python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/

21
bundle/defx-sftp/LICENSE Normal file
View File

@ -0,0 +1,21 @@
MIT License
Copyright (c) 2021 matsui54
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View File

@ -0,0 +1,30 @@
## About
Defx-sftp is a defx source for sftp.
## Features
- View and operate remote files via SFTP.
- Exchange files between remote and local.
## Requirements
For basic requirements, please follow the [instruction of defx.nvim](https://github.com/Shougo/defx.nvim#requirements).\
Additionally, defx-sftp requires [paramiko](http://www.paramiko.org/).\
You can install it with pip:
pip3 install --user paramiko
## Usage
For now, defx-sftp only supports RSA authentication.
Private key path can be specified with `g:defx_sftp#key_path` (default is ~/.ssh/id_rsa).
Remote files can be accessed like this.
``` vim
Defx sftp://[user@]hostname[:port][/path]
```
Columns for sftp files is supported (`sftp_mark`, `sftp_time`, `sftp_time`).\
If you want to show the same columns as defx's one, you can configure like this and open with `Defx sftp://user@hostname -buffer-name=sftp`.
```vim
call defx#custom#option('sftp', {
\ 'columns': 'sftp_mark:indent:icon:filename:type:sftp_time:sftp_size',
\ })
```

View File

@ -0,0 +1,28 @@
from pynvim import Nvim
import typing
from defx.column.mark import Column as Base, Highlights
from defx.context import Context
from defx.util import Candidate, len_bytes
class Column(Base):
def __init__(self, vim: Nvim) -> None:
super().__init__(vim)
self.name = 'sftp_mark'
def get_with_highlights(
self, context: Context, candidate: Candidate
) -> typing.Tuple[str, Highlights]:
candidate_path = candidate['action__path']
if candidate['is_selected']:
return (str(self.vars['selected_icon']),
[(f'{self.highlight_name}_selected',
self.start, len_bytes(self.vars['selected_icon']))])
elif (candidate['is_root'] and not candidate_path.is_dir()):
return (str(self.vars['readonly_icon']),
[(f'{self.highlight_name}_readonly',
self.start, len_bytes(self.vars['readonly_icon']))])
return (' ' * self.vars['length'], [])

View File

@ -0,0 +1,25 @@
from pynvim import Nvim
import typing
from defx.column.size import Column as Base, Highlights
from defx.context import Context
from defx.util import Candidate
class Column(Base):
def __init__(self, vim: Nvim) -> None:
super().__init__(vim)
self.name = 'sftp_size'
def get_with_highlights(
self, context: Context, candidate: Candidate
) -> typing.Tuple[str, Highlights]:
path = candidate['action__path']
if path.is_dir():
return (' ' * self._length, [])
size = self._get_size(path.stat().st_size)
text = '{:>6s}{:>3s}'.format(size[0], size[1])
highlight = f'{self.highlight_name}_{size[1]}'
return (text, [(highlight, self.start, self._length)])

View File

@ -0,0 +1,23 @@
from pynvim import Nvim
import time
import typing
from defx.column.time import Column as Base, Highlights
from defx.context import Context
from defx.util import Candidate
class Column(Base):
def __init__(self, vim: Nvim) -> None:
super().__init__(vim)
self.name = 'sftp_time'
def get_with_highlights(
self, context: Context, candidate: Candidate
) -> typing.Tuple[str, Highlights]:
path = candidate['action__path']
text = time.strftime(self.vars['format'],
time.localtime(path.stat().st_mtime))
return (text, [(self.highlight_name, self.start, self._length)])

View File

@ -0,0 +1,114 @@
from pathlib import Path
from urllib.parse import urlparse
import site
from pynvim import Nvim
from paramiko import SFTPClient
from defx.action import ActionAttr
from defx.kind.file import Kind as Base
from defx.base.kind import action
from defx.clipboard import ClipboardAction
from defx.context import Context
from defx.defx import Defx
from defx.view import View
site.addsitedir(str(Path(__file__).parent.parent))
from sftp import SFTPPath # noqa: E402
from source.sftp import Source # noqa: E402
class Kind(Base):
def __init__(self, vim: Nvim, source) -> None:
self.vim = vim
self.name = 'sftp'
self._source: Source = source
@property
def client(self) -> SFTPClient:
return self._source.client
def is_readable(self, path: SFTPPath) -> bool:
pass
def get_home(self) -> SFTPPath:
return SFTPPath(self.client, self.client.normalize('.'))
def path_maker(self, path: str) -> SFTPPath:
path = urlparse(path).path
if not path:
path = self._source.client.normalize('.')
return SFTPPath(self.client, path)
def rmtree(self, path: SFTPPath) -> None:
path.rmdir_recursive()
def get_buffer_name(self, path: str) -> str:
# TODO: return 'sftp://{}@{}'
pass
def paste(self, view: View, src: SFTPPath, dest: SFTPPath,
cwd: str) -> None:
action = view._clipboard.action
if view._clipboard.source_name == 'file':
if action == ClipboardAction.COPY:
self._put_recursive(src, dest, self.client)
elif action == ClipboardAction.MOVE:
pass
elif action == ClipboardAction.LINK:
pass
view._vim.command('redraw')
return
if action == ClipboardAction.COPY:
if src.is_dir():
src.copy_recursive(dest)
else:
src.copy(dest)
elif action == ClipboardAction.MOVE:
src.rename(dest)
# Check rename
# TODO: add prefix
if not src.is_dir():
view._vim.call('defx#util#buffer_rename',
view._vim.call('bufnr', str(src)), str(dest))
elif action == ClipboardAction.LINK:
# Create the symbolic link to dest
# dest.symlink_to(src, target_is_directory=src.is_dir())
pass
@action(name='copy')
def _copy(self, view: View, defx: Defx, context: Context) -> None:
super()._copy(view, defx, context)
def copy_to_local(path: str, dest: str):
client = defx._source.client
self._copy_recursive(SFTPPath(client, path), Path(dest), client)
view._clipboard.paster = copy_to_local
@action(name='remove_trash', attr=ActionAttr.REDRAW)
def _remove_trash(self, view: View, defx: Defx, context: Context) -> None:
view.print_msg('remove_trash is not supported')
def _copy_recursive(self, path: SFTPPath, dest: Path, client) -> None:
''' copy remote files to the local host '''
if path.is_file():
client.get(str(path), str(dest))
else:
dest.mkdir(parents=True)
for f in path.iterdir():
new_dest = dest.joinpath(f.name)
self._copy_recursive(f, new_dest, client)
def _put_recursive(self, path: Path, dest: SFTPPath,
client: SFTPClient) -> None:
''' copy local files to the remote host '''
if path.is_file():
client.put(str(path), str(dest))
else:
dest.mkdir()
for f in path.iterdir():
new_dest = dest.joinpath(f.name)
self._put_recursive(f, new_dest, client)

View File

@ -0,0 +1,115 @@
from __future__ import annotations
import typing
from pathlib import PurePosixPath
import stat
from paramiko import SFTPClient, SFTPAttributes
class SFTPPath(PurePosixPath):
def __new__(cls, client: SFTPClient, path: str,
stat: SFTPAttributes = None):
self = super().__new__(cls, path)
self.client: SFTPClient = client
self.path: str = path
self._stat: SFTPAttributes = stat
return self
def __eq__(self, other):
return self.__str__() == str(other)
def __str__(self):
return self.path
def copy(self, dest: SFTPPath) -> None:
fl = self.client.open(self.path)
self.client.putfo(fl, str(dest))
def copy_recursive(self, dest: SFTPPath) -> None:
if self.is_file():
self.copy(dest)
else:
dest.mkdir()
for f in self.iterdir():
new_dest = dest.joinpath(f.name)
f.copy_recursive(new_dest)
def exists(self):
try:
return bool(self.stat())
except FileNotFoundError:
return False
def is_dir(self) -> bool:
return not self.is_file()
def is_file(self) -> bool:
mode = self.stat().st_mode
return stat.S_ISREG(mode)
def is_symlink(self) -> bool:
mode = self.stat().st_mode
return stat.S_ISLNK(mode)
def iterdir(self) -> typing.Iterable(SFTPPath):
for f in self.client.listdir_attr(self.path):
yield self.joinpath(f.filename, f)
def joinpath(self, name: str, stat: SFTPAttributes = None):
sep = '' if self.path == '/' else '/'
new_path = self.path + sep + name
return SFTPPath(self.client, new_path, stat)
def mkdir(self, parents=False, exist_ok=False):
# TODO: mkdir recursively
self.client.mkdir(self.path)
@property
def parent(self):
if self.path == '/':
return self
parts = self.path.split('/')
new_path = '/'.join(parts[:-1])
if not new_path:
new_path = '/'
return SFTPPath(self.client, new_path)
def relative_to(self, other) -> SFTPPath:
return self
def rename(self, new: SFTPPath) -> SFTPPath:
self.client.rename(self.path, new.path)
def resolve(self) -> SFTPPath:
client = self.client
new_path = client.normalize(self.path)
return SFTPPath(client, new_path)
def rmdir(self):
"""
Remove directory. Directory must be empty.
"""
self.client.rmdir(self.path)
def rmdir_recursive(self):
if self.is_file():
self.unlink()
else:
for f in self.iterdir():
f.rmdir_recursive()
self.rmdir()
def stat(self) -> SFTPAttributes:
if self._stat:
return self._stat
else:
return self.client.stat(self.path)
def touch(self, exist_ok=True):
self.client.open(self.path, mode='x')
def unlink(self, missing_ok=False):
self.client.unlink(self.path)
if __name__ == '__main__':
print(SFTPPath.parse_path('//hoge@13.4.3'))

View File

@ -0,0 +1,104 @@
from pathlib import Path
import site
import typing
from urllib.parse import urlparse
from pynvim import Nvim
from paramiko import Transport, SFTPClient, RSAKey, SSHConfig
from defx.context import Context
from defx.base.source import Base
site.addsitedir(str(Path(__file__).parent.parent))
from sftp import SFTPPath # noqa: E402
class Source(Base):
def __init__(self, vim: Nvim) -> None:
super().__init__(vim)
self.name = 'sftp'
self.client: SFTPClient = None
self.config: SSHConfig = None
from kind.sftp import Kind
self.kind: Kind = Kind(self.vim, self)
self.username: str = ''
self.hostname: str = ''
self.vars = {
'root': None,
}
def init_client(self, hostname, username, port=None) -> None:
self.username = username
self.hostname = hostname
key_path = ''
conf_path = Path("~/.ssh/config").expanduser()
if conf_path.exists():
self.config = SSHConfig.from_path(conf_path)
conf = self.config.lookup(hostname)
if "identityfile" in conf:
key_path = conf["identityfile"][0]
port = conf.get("port", 22)
if not key_path:
key_path = self.vim.vars.get(
"defx_sftp#key_path", str(Path("~/.ssh/id_rsa").expanduser())
)
if port is None:
port = 22
transport = Transport((hostname, int(port)))
rsa_private_key = RSAKey.from_private_key_file(key_path)
transport.connect(username=username, pkey=rsa_private_key)
self.client = SFTPClient.from_transport(transport)
def get_root_candidate(
self, context: Context, path: Path
) -> typing.Dict[str, typing.Any]:
path_str = self._parse_arg(str(path))
path = SFTPPath(self.client, path_str)
word = str(path)
if word[-1:] != '/':
word += '/'
if self.vars['root']:
word = self.vim.call(self.vars['root'], str(path))
word = word.replace('\n', '\\n')
return {
'word': word,
'is_directory': True,
'action__path': path,
}
def gather_candidates(
self, context: Context, path: Path
) -> typing.List[typing.Dict[str, typing.Any]]:
path_str = self._parse_arg(str(path))
path = SFTPPath(self.client, path_str)
candidates = []
for f in path.iterdir():
candidates.append({
'word': f.name + ('/' if f.is_dir() else ''),
'is_directory': f.is_dir(),
'action__path': f,
})
return candidates
def _parse_arg(self, path: str) -> str:
parsed = urlparse(path)
uname = parsed.username
hname = parsed.hostname
if hname is None:
return parsed.path
if uname is None:
uname = ''
if (uname != self.username or
hname != self.hostname):
self.init_client(hname, uname, parsed.port)
rmt_path = parsed.path
if not rmt_path:
rmt_path = '.'
return self.client.normalize(rmt_path)