Luke Ross

Colorado

4 releases git clone https://lukeross.name/projects/colorado.git/

Web-based git repository viewer.

fd40eb3715e4b4329997f94485ef0168c3aa88dd / src / colorado /

views.py

import arrow
import chardet
from flask import abort, Blueprint, current_app, make_response, send_file, url_for
from io import BytesIO
from itertools import chain
from lxmlmeld import parse_xml
from os import path
from sqlalchemy.orm.exc import NoResultFound
from tarfile import TarFile, TarInfo, REGTYPE, DIRTYPE
from tempfile import TemporaryFile

from .db import session
from .repo import Repo

bp = Blueprint(
	"colorado", __name__, static_folder="static", template_folder="templates"
)

doctype = (
	"html", "-//W3C//DTD XHTML 1.0 Strict//EN",
	"http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd"
)

EMPTY_TREE_SHA = "4b825dc642cb6eb9a060e54bf8d69288fbee4904"


def nice_author(c):
	if c.author == c.committer:
		return c.author.name
	else:
		return "{} & {}".format(c.author.name, c.committer.name)


def nice_size(num):
	suffixes = ("", "k", "M", "G", "T")
	for offset, _ in enumerate(suffixes):
		if num > 1023:
			num /= 1024
		else:
			return "{}{}".format(int(num), suffixes[offset])
	return "{}{}".format(int(num), suffixes[-1])


def run_on_meld(tpl, id, fn):
	node = tpl.findmeld(id)
	if node is not None:
		return fn(node)


def configure_template(tpl, repo=None):
	run_on_meld(tpl, "static-css", lambda n: n.set("href", url_for(
		".static", filename="colorado.css"
	)))
	run_on_meld(tpl, "static-js", lambda n: n.set("src", url_for(
		".static", filename="colorado.js"
	)))
	run_on_meld(tpl, "html-title", lambda n: n.content(
		repo.name if repo else current_app.config.get("SITE_NAME", "Git Viewer")
	))
	if repo:
		git = repo.repo
		tpl.findmeld("repo-name").content(repo.name)
		tpl.findmeld("repo-name").set("href", url_for(
			".repo_home", slug=repo.slug
		))
		run_on_meld(tpl, "repo-desc", lambda n: n.content(repo.description))
		run_on_meld(tpl, "repo-clone", lambda n: n.content(repo.read_clone_url))
		run_on_meld(tpl, "repo-issues", lambda n: n.deparent())  # FIXME
		num_branches = len(git.heads)
		if num_branches > 1:
			run_on_meld(tpl, "repo-branches", lambda n: (
				n[0].content(str(num_branches)),
				n.set("href", url_for(
					".trees_view", slug=repo.slug
				))
			))
		else:
			run_on_meld(tpl, "repo-branches", lambda n: n.deparent())
		if git.tags:
			run_on_meld(tpl, "repo-tags", lambda n: (
				n[0].content(str(len(git.tags))),
				n.set("href", url_for(
					".trees_view", slug=repo.slug
				))
			))
		else:
			run_on_meld(tpl, "repo-tags", lambda n: n.deparent())


def parse_xml_file(template_name, using):
	"""
	Method: parse_xml_for_template

	Opens and parses an XML template, looked up as a filename
	relative to the Blueprint/Flask object provided.

	Parameters:

		template_name - The XML template filename to use
		using - The Blueprint or Flask to check for templates

	Returns:

		An lxml Element
	"""
	return parse_xml(path.join(
		path.dirname(path.abspath(__file__)),
		using.template_folder,
		"{}.xml".format(template_name)
	))


def parse_xml_for_template(template_name, using=bp):
	base = parse_xml_file("base", using)
	inner = parse_xml_file(template_name, using)
	base.findmeld("container").replace(inner[:])
	return base


def write_template(doc):
	"""
	Method: write_template

	Formats an lxml Element as an XHTML unicode object.

	Parameters:

		doc - An lxml Element

	Returns:

		A unicode string
	"""
	return doc.write_xhtmlstring(
		doctype=doctype,
		declaration=True,
		encoding="UTF-8"
	)


@bp.route("/", methods=["GET"])
def index():
	"""
	Start the app
	"""
	repos = session().run(Repo.public_repos()).order_by("slug")
	tpl = parse_xml_for_template("index")
	configure_template(tpl)
	for ele, repo in tpl.findmeld("repo").repeat(repos):
		ele.findmeld("repo-name").content(repo.name)
		ele.findmeld("repo-name").set("href", url_for(".repo_home", slug=repo.slug))
		ele.findmeld("repo-desc").content(repo.description)
		updated = repo.get_master().commit.committed_datetime
		ele.findmeld("repo-updated").content(arrow.get(updated).humanize())
		ele.findmeld("repo-updated").set("title", updated.isoformat(" "))
	return write_template(tpl)


def get_repo(slug):
	try:
		return session().run(Repo.find_by_slug(slug)).one()
	except NoResultFound:
		abort(404)


def look_up_thing(slug, type, id):
	repo = get_repo(slug)
	git = repo.repo
	if type == "branch":
		try:
			branch = getattr(git.heads, id)
		except AttributeError:
			abort(404)
		return repo, branch, branch.commit
	elif type == "tag":
		try:
			tag = getattr(git.tags, id)
		except AttributeError:
			abort(404)
		return repo, tag, tag.commit
	elif type == "commit":
		try:
			commit = git.commit(id)
		except:
			abort(404)
		return repo, commit, commit
	else:
		abort(404)


@bp.route("/<slug>/", methods=["GET"])
def repo_home(slug):
	repo = get_repo(slug)
	return top_level_view(slug, "branch", repo.get_master().name)


def get_parent_diff(git, commit, **kwargs):
	parent = commit.parents[0] \
		if commit.parents \
		else git.rev_parse(EMPTY_TREE_SHA)
	return parent.diff(commit, **kwargs)


def get_diff_by_id(slug, id):
	repo, point, commit = look_up_thing(slug, "commit", id)
	return repo, commit, get_parent_diff(repo.repo, commit, create_patch=True)


@bp.route("/<slug>/commit/<id>/diff/raw", methods=["GET"])
def raw_diff_view(slug, id):
	repo, _, diffs = get_diff_by_id(slug, id)

	def make_diff(diff):
		return b"\n".join((
			"--- {}".format(
				"a/" + diff.a_path if diff.a_path else "/dev/null"
			).encode("utf-8"),
			"+++ {}".format(
				"b/" + diff.b_path if diff.b_path else "/dev/null"
			).encode("utf-8"),
			diff.diff
		))

	return make_response((
		b"".join(make_diff(diff) for diff in diffs if diff.diff),
		{"Content-Type": "text/plain"}
	))


@bp.route("/<slug>/commit/<id>/diff", methods=["GET"])
def diff_view(slug, id):
	repo, commit, diffs = get_diff_by_id(slug, id)
	tpl = parse_xml_for_template("repo-revision")
	configure_template(tpl, repo)
	tpl.findmeld("rev-id").content(commit.hexsha)
	tpl.findmeld("rev-id").set("href", url_for(
		".raw_diff_view", slug=slug, id=id
	))
	tpl.findmeld("rev-when").content(
		arrow.get(commit.committed_datetime).humanize()
	)
	tpl.findmeld("rev-when").set("href", url_for(
		".history_view", slug=slug, type="commit", id=id
	))
	tpl.findmeld("rev-when").set(
		"title", commit.committed_datetime.isoformat(" ")
	)
	tpl.findmeld("rev-desc").content(commit.message)
	tpl.findmeld("rev-author").content(nice_author(commit))

	for ele, diff in tpl.findmeld("file").repeat(diffs):
		message = None if diff.diff else "No content change"
		if not diff.b_path:
			ele.findmeld("file-from").content(diff.a_path)
			ele.findmeld("file-to").content("(removed)")
			ele.findmeld("file-to").tag = "em"
			ele.findmeld("file-to").attrib.pop("href")
		elif diff.a_path and diff.a_path != diff.b_path:
			ele.findmeld("file-from").content(diff.a_path)
			ele.findmeld("file-to").content(diff.b_path)
			if not diff.diff:
				message = "File removed"
		else:
			ele.findmeld("file-change").deparent()
			ele.findmeld("file-to").content(diff.b_path)
			ele.findmeld("file-to").set("href", url_for(
				".tree_view", slug=slug, type="commit", id=id, path=diff.b_path
			))

		if not message:
			parts = diff.diff.split(b"\n")
			for holder, line in ele.findmeld("file-diff-line").repeat(parts):
				try:
					holder.content(line + b"\n")
				except ValueError:
					message = "This diff cannot be displayed"
					break
				if line.startswith(b"+"):
					holder.set("class", "diff-add")
				elif line.startswith(b"-"):
					holder.set("class", "diff-del")
				elif line.startswith(b"@"):
					holder.set("class", "diff-hunk")
			else:
				ele.findmeld("file-message").deparent()

		if message:
			ele.findmeld("file-message")[0].content(message)
			ele.findmeld("file-diff").deparent()
	return write_template(tpl)


@bp.route("/<slug>/<type>/<id>/history", methods=["GET"])
def history_view(slug, type, id):
	repo, point, commit = look_up_thing(slug, type, id)

	tpl = parse_xml_for_template("repo-history")
	configure_template(tpl, repo)
	all_commits = chain([commit], commit.iter_parents())
	for ele, c in tpl.findmeld("rev").repeat(all_commits):
		ele.set("id", "rev-" + c.hexsha)
		ele.findmeld("rev-desc").content(c.message)
		ele.findmeld("rev-desc").set("href", url_for(
			".diff_view", slug=slug, id=c.hexsha
		))
		ele.findmeld("rev-author").content(nice_author(c))
		ele.findmeld("rev-id").content(c.hexsha)
		ele.findmeld("rev-id").set("href", url_for(
			".top_level_view", slug=slug, type="commit", id=c.hexsha
		))
		ele.findmeld("rev-when").content(
			arrow.get(c.committed_datetime).humanize()
		)
		ele.findmeld("rev-when").set("href", url_for(
			".history_view", slug=slug, type=type, id=id, _anchor="rev-" + c.hexsha
		))
		ele.findmeld("rev-when").set(
			"title", c.committed_datetime.isoformat(" ")
		)
	tpl.findmeld("browse").set("href", url_for(
		".top_level_view", slug=slug, type=type, id=id
	))
	tpl.findmeld("browse")[0].content(str(sum(
		1 for _ in chain(commit.tree.trees, commit.tree.blobs)
	)))

	return write_template(tpl)


def configure_breadcrumbs(tpl, slug, type, id, commit, path_parts):
	curr = commit.tree
	items = chain(
		[(id, False)],
		((p, True) for p in path_parts if p)
	)
	for ele, (text, is_part) in tpl.findmeld("tree-part").repeat(items):
		ele = ele.findmeld("tree-part-name")
		ele.content(text)
		if is_part:
			curr = curr.join(text)
			ele.set("href", url_for(
				".tree_view", slug=slug, type=type, id=id, path=curr.path
			))
		else:
			ele.set("href", url_for(
				".top_level_view", slug=slug, type=type, id=id
			))


def render_blob(ele, blob, dl_link):
	whitelist = ("application/json", "application/javascript")
	cannot_render = None
	if not(blob.mime_type.startswith("text/") or blob.mime_type in whitelist):
		cannot_render = "MIME type {} is not previewable".format(blob.mime_type)
	elif blob.size > (1024 * 1024):
		cannot_render = "Content too large"
	data = None
	if not cannot_render:
		data = blob.data_stream.read()
		encoding = chardet.detect(data)
		if "encoding" in encoding and encoding.get("confidence", 0) > 0.7:
			try:
				data = data.decode(encoding["encoding"])
			except UnicodeDecodeError as e:
				cannot_render = "Failed to convert to text"
		else:
			cannot_render = "Failed to guess content encoding"
	ele.findmeld("file-container-name").content(blob.name)
	ele.findmeld("file-container-name").set("href", dl_link)
	if not cannot_render:
		try:
			ele.findmeld("file-container-content").content(data)
		except ValueError as e:
			cannot_render = "Not XML safe"
		else:
			ele.findmeld("file-container-not-viewable").deparent()

	if cannot_render:
		ele.findmeld("file-container-content").deparent()
		ele.findmeld("file-container-not-viewable").set("title", cannot_render)


@bp.route("/<slug>/<type>/<id>/raw/<path:path>", methods=["GET"])
def raw_view(slug, type, id, path):
	repo, point, commit = look_up_thing(slug, type, id)
	try:
		blob = commit.tree[path]
	except KeyError:
		abort(404)

	if blob.type == "tree":
		abort(404)

	# blob goes out of scope too quickly!
	return send_file(
		BytesIO(blob.data_stream.read()),
		blob.mime_type,
		True,
		blob.name
	)


@bp.route("/<slug>/<type>/<id>/tree/<path:path>", methods=["GET"])
def tree_view(slug, type, id, path):
	repo, point, commit = look_up_thing(slug, type, id)
	try:
		blob = commit.tree[path]
	except KeyError:
		abort(404)

	if blob.type == "tree":
		return tree_base_view(slug, type, id, repo, commit, blob, False)

	tpl = parse_xml_for_template("repo-blob")
	configure_template(tpl, repo=repo)
	configure_breadcrumbs(tpl, slug, type, id, commit, blob.path.split("/")[:-1])
	render_blob(tpl, blob, url_for(
		".raw_view", slug=slug, type=type, id=id, path=blob.path
	))
	return write_template(tpl)


@bp.route("/<slug>/<type>/<id>/tree", methods=["GET"])
def top_level_view(slug, type, id):
	repo, point, commit = look_up_thing(slug, type, id)
	show_download = type in ("branch", "tag")
	return tree_base_view(
		slug, type, id, repo, commit, commit.tree, show_download)


def tree_base_view(slug, type, id, repo, commit, tree, show_download=False):
	by_file = {}
	commit_count = 0
	for current in chain([commit], commit.iter_parents()):
		commit_count += 1
		for diff in get_parent_diff(repo.repo, current):
			if diff.change_type not in "AM":
				continue
			if diff.b_path in by_file:
				if by_file[diff.b_path].committed_date < current.committed_date:
					by_file[diff.b_path] = current
			else:
				by_file[diff.b_path] = current

	tpl = parse_xml_for_template("repo-home")
	configure_template(tpl, repo)
	tpl.findmeld("repo-name").content(repo.name)
	tpl.findmeld("repo-desc").content(repo.description)
	tpl.findmeld("repo-commits")[0].content(str(commit_count))
	tpl.findmeld("repo-commits").set("href", url_for(
		".history_view", slug=slug, type=type, id=id
	))
	if show_download:
		tpl.findmeld("repo-download").set("href", url_for(
			".download_view", slug=slug, type=type, id=id
		))
		tpl.findmeld("repo-download").content(
			make_tar_filename(slug, type, id, commit)
		)
	else:
		tpl.findmeld("repo-download").deparent()
		tpl.findmeld("repo-clone").deparent()

	configure_breadcrumbs(tpl, slug, type, id, commit, tree.path.split("/"))
	contents = chain(tree.trees, tree.blobs)
	for ele, direntry in tpl.findmeld("file").repeat(contents):
		ele.findmeld("file-name").set("href", url_for(
			".tree_view", slug=slug, type=type, id=id, path=direntry.path
		))
		if direntry.type == "blob":
			ele.findmeld("file-name").content(direntry.name)
			ele.findmeld("file-size").content(nice_size(direntry.size))
			if direntry.path in by_file:
				file_commit = by_file[direntry.path]
				ele.findmeld("file-revdesc").content(file_commit.message)
				ele.findmeld("file-revdesc").set("href", url_for(
					".diff_view", slug=slug, id=file_commit.hexsha
				))
				ele.findmeld("file-revdesc").set("title", file_commit.hexsha)
				ele.findmeld("file-when").content(
					arrow.get(file_commit.committed_datetime).humanize()
				)
				ele.findmeld("file-when").set("href", url_for(
					".history_view", slug=slug, type=type,
					id=id, _anchor="rev-" + file_commit.hexsha
				))
				ele.findmeld("file-when").set(
					"title", file_commit.committed_datetime.isoformat(" ")
				)
		else:
			ele.findmeld("file-name").content(direntry.name + "/")
			ele.findmeld("file-size").content("")
			ele.findmeld("file-when").content("")
			ele.findmeld("file-when").attrib.pop("title")
			ele.findmeld("file-revdesc").deparent()

	# README.*, case-insensitive, text content
	candidate_readmes = [
		c for c in tree.blobs if
		c.mime_type.startswith("text/") and
		(c.name.lower() == "readme" or (
			c.name.lower().startswith("readme.") and
			"." not in c.name[7:]
		))
	]
	preferred = ("readme.txt", "readme.md", "readme")
	candidate_readmes.sort(
		key=lambda c: not c.name.lower() in preferred
	)
	if candidate_readmes:
		readme = candidate_readmes[0]
		render_blob(tpl, readme, url_for(
			".tree_view", slug=slug, type=type, id=id, path=readme.path
		))
	else:
		tpl.findmeld("file-container").deparent()
	return write_template(tpl)


def make_tar_filename(slug, type, id, commit):
	return "{}-{}{}.tar.gz".format(
		slug,
		id,
		"-{}".format(
			commit.committed_datetime.date().isoformat()
		)
		if type == "branch"
		else ""
	)


@bp.route("/<slug>/<type>/<id>/download", methods=["GET"])
def download_view(slug, type, id):
	if type not in ("branch", "tag"):
		abort(404)
	repo, point, commit = look_up_thing(slug, type, id)

	def make_ti(thing, type, mode):
		ti = TarInfo(thing.path)
		ti.size = getattr(thing, "size", 0)
		ti.mode = mode
		ti.type = type
		ti.mtime = commit.committed_date
		ti.uid = 0
		ti.gid = 0
		ti.uname = "root"
		ti.gname = "root"
		return ti

	tar = TemporaryFile("w+b")
	with TarFile.open(mode="x:gz", fileobj=tar) as handle:
		dirs = [commit.tree]
		while dirs:
			currdir = dirs.pop(0)
			if currdir.path:
				handle.addfile(make_ti(currdir, DIRTYPE, 493))
			dirs.extend(currdir.trees)
			for blob in currdir.blobs:
				handle.addfile(make_ti(blob, REGTYPE, blob.mode), blob.data_stream)

	tar.seek(0)
	return send_file(tar, "application/tar+gz", True, make_tar_filename(
		slug, type, id, commit))


@bp.route("/<slug>/trees", methods=["GET"])
def trees_view(slug):
	repo = get_repo(slug)
	git = repo.repo
	tpl = parse_xml_for_template("repo-trees")
	configure_template(tpl, repo)
	iters = chain(
		((h, "Branch") for h in git.heads),
		((t, "Tag") for t in git.tags)
	)
	for ele, (thing, type) in tpl.findmeld("branch").repeat(iters):
		ele.findmeld("branch-type").content(type)
		ele.findmeld("branch-name").content(thing.name)
		ele.findmeld("branch-name").set("href", url_for(
			".top_level_view", slug=slug, type=type.lower(), id=thing.name
		))
		ele.findmeld("branch-revdesc").content(thing.commit.message)
		ele.findmeld("branch-revdesc").set("href", url_for(
			".diff_view", slug=slug, id=thing.commit.hexsha
		))
		ele.findmeld("branch-when").content(
			arrow.get(thing.commit.committed_datetime).humanize()
		)
		ele.findmeld("branch-when").set("href", url_for(
			".history_view", slug=slug, type=type.lower(), id=thing.name
		))
		ele.findmeld("branch-when").set(
			"title", thing.commit.committed_datetime.isoformat(" ")
		)
		ele.findmeld("branch-download").set("href", url_for(
			".download_view", slug=slug, type=type.lower(), id=thing.name
		))
		ele.findmeld("branch-download").content(
			make_tar_filename(slug, type.lower(), thing.name, thing.commit)
		)
	return write_template(tpl)