1
0
mirror of https://github.com/SpaceVim/SpaceVim.git synced 2025-02-03 06:50:05 +08:00
SpaceVim/bundle/deoplete-go/rplugin/python3/deoplete/sources/deoplete_go.py

365 lines
12 KiB
Python

import os
import re
import platform
import subprocess
from collections import OrderedDict
from deoplete.base.source import Base
from deoplete.util import charpos2bytepos, expand, getlines, load_external_module
load_external_module(__file__, "sources/deoplete_go")
from cgo import cgo
from stdlib import stdlib
try:
load_external_module(__file__, "")
from ujson import loads
except ImportError:
from json import loads
# from https://github.com/golang/go/blob/go1.13beta1/src/go/build/syslist.go
known_goos = (
"aix",
"android",
"appengine",
"darwin",
"dragonfly",
"freebsd",
"hurd",
"illumos",
"js",
"linux",
"nacl",
"netbsd",
"openbsd",
"plan9",
"solaris",
"windows",
"zos",
)
class Source(Base):
def __init__(self, vim):
super(Source, self).__init__(vim)
self.name = "go"
self.mark = "[Go]"
self.filetypes = ["go"]
self.input_pattern = r"(?:\b[^\W\d]\w*|[\]\)])\.(?:[^\W\d]\w*)?"
self.rank = 500
def on_init(self, context):
vars = context["vars"]
self.gocode_binary = ""
if "deoplete#sources#go#gocode_binary" in vars:
self.gocode_binary = expand(vars["deoplete#sources#go#gocode_binary"])
self.package_dot = False
if "deoplete#sources#go#package_dot" in vars:
self.package_dot = vars["deoplete#sources#go#package_dot"]
self.sort_class = []
if "deoplete#sources#go#sort_class" in vars:
self.sort_class = vars["deoplete#sources#go#sort_class"]
self.pointer = False
if "deoplete#sources#go#pointer" in vars:
self.pointer = vars["deoplete#sources#go#pointer"]
self.auto_goos = False
if "deoplete#sources#go#auto_goos" in vars:
self.auto_goos = vars["deoplete#sources#go#auto_goos"]
self.goos = ""
if "deoplete#sources#go#goos" in vars:
self.goos = vars["deoplete#sources#go#goos"]
self.goarch = ""
if "deoplete#sources#go#goarch" in vars:
self.goarch = vars["deoplete#sources#go#goarch"]
self.sock = ""
if "deoplete#sources#go#sock" in vars:
self.sock = vars["deoplete#sources#go#sock"]
self.cgo = False
if "deoplete#sources#go#cgo" in vars:
self.cgo = vars["deoplete#sources#go#cgo"]
self.cgo_only = False
if "deoplete#sources#go#cgo_only" in vars:
self.cgo_only = vars["deoplete#sources#go#cgo_only"]
self.source_importer = False
if "deoplete#sources#go#source_importer" in vars:
self.source_importer = vars["deoplete#sources#go#source_importer"]
self.builtin_objects = False
if "deoplete#sources#go#builtin_objects" in vars:
self.builtin_objects = vars["deoplete#sources#go#builtin_objects"]
self.unimported_packages = False
if "deoplete#sources#go#unimported_packages" in vars:
self.unimported_packages = vars["deoplete#sources#go#unimported_packages"]
self.fallback_to_source = False
if "deoplete#sources#go#fallback_to_source" in vars:
self.fallback_to_source = vars["deoplete#sources#go#fallback_to_source"]
self.loaded_gocode_binary = False
self.complete_pos = re.compile(r'\w*$|(?<=")[./\-\w]*$')
if self.pointer:
self.complete_pos = re.compile(self.complete_pos.pattern + r"|\*$")
self.input_pattern += r"|\*"
if self.cgo:
load_external_module(__file__, "clang")
import clang.cindex as clang
self.libclang_path = vars.get("deoplete#sources#go#cgo#libclang_path", "")
if self.libclang_path == "":
return
self.cgo_options = {
"std": vars.get("deoplete#sources#go#cgo#std", "c11"),
"sort_algo": vars.get("deoplete#sources#cgo#sort_algo", None),
}
if (
not clang.Config.loaded
and clang.Config.library_path != self.libclang_path
):
clang.Config.set_library_file(self.libclang_path)
clang.Config.set_compatibility_check(False)
# Set 'C.' complete pattern
self.cgo_complete_pattern = re.compile(r"[^\W\d]*C\.")
# Create clang.cindex.Index database
self.index = clang.Index.create(0)
# initialize in-memory cache
self.cgo_cache, self.cgo_inline_source = dict(), None
def get_complete_position(self, context):
m = self.complete_pos.search(context["input"])
return m.start() if m else -1
def gather_candidates(self, context):
# If enabled self.cgo, and matched self.cgo_complete_pattern pattern
if self.cgo and self.cgo_complete_pattern.search(context["input"]):
return self.cgo_completion(getlines(self.vim))
if self.cgo_only:
return []
bufname = self.vim.current.buffer.name
if not os.path.isfile(bufname):
bufname = self.vim.call("tempname")
result = self.get_complete_result(context, getlines(self.vim), bufname)
try:
if result[1][0]["class"] == "PANIC":
self.print_error("gocode panicked")
return []
if self.sort_class:
class_dict = OrderedDict((x, []) for x in self.sort_class)
out = []
sep = " "
for complete in result[1]:
word = complete["name"]
info = complete["type"]
_class = complete["class"]
abbr = str(word + sep + info).replace(" func", "", 1)
kind = _class
if _class == "package" and self.package_dot:
word += "."
if (
self.pointer
and str(context["input"][context["complete_position"] :]) == "*"
):
word = "*" + word
candidates = dict(word=word, abbr=abbr, kind=kind, info=info, dup=1)
if not self.sort_class or _class == "import":
out.append(candidates)
elif _class in class_dict.keys():
class_dict[_class].append(candidates)
if self.sort_class:
for v in class_dict.values():
out += v
return out
except Exception:
return []
def cgo_completion(self, buffer):
# No include header
if cgo.get_inline_source(buffer)[0] == 0:
return
count, inline_source = cgo.get_inline_source(buffer)
# exists 'self.cgo_inline_source', same inline sources and
# already cached cgo complete candidates
if (
self.cgo_inline_source is not None
and self.cgo_inline_source == inline_source
and self.cgo_cache[self.cgo_inline_source]
):
# Use in-memory(self.cgo_headers) cacahe
return self.cgo_cache[self.cgo_inline_source]
else:
self.cgo_inline_source = inline_source
# return candidates use libclang-python3
return cgo.complete(
self.index,
self.cgo_cache,
self.cgo_options,
count,
self.cgo_inline_source,
)
def get_complete_result(self, context, buffer, bufname):
offset = self.get_cursor_offset(context)
env = os.environ.copy()
env["GOPATH"] = self.vim.eval("$GOPATH")
if self.auto_goos:
name = os.path.basename(os.path.splitext(bufname)[0])
if "_" in name:
for part in name.rsplit("_", 2):
if part in known_goos:
env["GOOS"] = part
break
if "GOOS" not in env:
for line in buffer:
if line.startswith("package "):
break
elif not line.startswith("// +build"):
continue
directives = [x.split(",", 1)[0] for x in line[9:].strip().split()]
if platform.system().lower() not in directives:
for plat in directives:
if plat in known_goos:
env["GOOS"] = plat
break
elif self.goos != "":
env["GOOS"] = self.goos
if "GOOS" in env and env["GOOS"] != platform.system().lower():
env["CGO_ENABLED"] = "0"
if self.goarch != "":
env["GOARCH"] = self.goarch
gocode = self.find_gocode_binary()
if not gocode:
return []
args = [gocode, "-f=json"]
if self.source_importer:
args.append("-source")
if self.builtin_objects:
args.append("-builtin")
if self.unimported_packages:
args.append("-unimported-packages")
if self.fallback_to_source:
args.append("-fallback-to-source")
# basically, '-sock' option for mdempsky/gocode.
# probably meaningless in nsf/gocode that already run the rpc server
if self.sock != "" and self.sock in ["unix", "tcp", "none"]:
args.append("-sock={}".format(self.sock))
args += ["autocomplete", bufname, str(offset)]
process = subprocess.Popen(
args,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
start_new_session=True,
env=env,
)
stdout_data, stderr_data = process.communicate("\n".join(buffer).encode())
result = []
try:
result = loads(stdout_data.decode())
except Exception as e:
self.print_error("gocode decode error")
self.print_error(stdout_data.decode())
self.print_error(stderr_data.decode())
return result
def get_cursor_offset(self, context):
line = self.vim.current.window.cursor[0]
column = context["complete_position"]
count = self.vim.call("line2byte", line)
if self.vim.current.buffer.options["fileformat"] == "dos":
# Note: line2byte() counts "\r\n" in DOS format. It must be "\n"
# in gocode.
count -= line - 1
return count + charpos2bytepos("utf-8", context["input"][:column], column) - 1
def parse_import_package(self, buffer):
start = 0
packages = []
for line, b in enumerate(buffer):
if re.match(r"^\s*import \w*|^\s*import \(", b):
start = line
continue
elif re.match(r"\)", b):
break
elif line > start:
package_name = re.sub(r'\t|"', "", b)
if str(package_name).find(r"/", 0) > 0:
full_package_name = str(package_name).split("/", -1)
package_name = full_package_name[len(full_package_name) - 1]
library = (
"/".join(full_package_name[: len(full_package_name) - 1]),
)
packages.append(dict(library=library, package=package_name))
else:
packages.append(dict(library="none", package=package_name))
return packages
def find_gocode_binary(self):
if self.gocode_binary != "" and self.loaded_gocode_binary:
return self.gocode_binary
self.loaded_gocode_binary = os.path.isfile(self.gocode_binary)
if self.loaded_gocode_binary:
return self.gocode_binary
elif platform.system().lower() == "windows":
return self.find_binary_path("gocode.exe")
else:
return self.find_binary_path("gocode")
def find_binary_path(self, path):
def is_exec(bin_path):
return os.path.isfile(bin_path) and os.access(bin_path, os.X_OK)
dirpath, binary = os.path.split(path)
if dirpath:
if is_exec(path):
return path
else:
for p in os.environ["PATH"].split(os.pathsep):
p = p.strip('"')
binary = os.path.join(p, path)
if is_exec(binary):
return binary
return self.print_error(path + " binary not found")