Skip to content
GitLab
Menu
Projects
Groups
Snippets
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
Pascal MOLIN
lektor-commit
Commits
559b5ec4
Commit
559b5ec4
authored
Jan 08, 2021
by
Pascal
Browse files
improve git facilities
parent
68e70c25
Changes
7
Hide whitespace changes
Inline
Side-by-side
git.py
View file @
559b5ec4
...
...
@@ -2,6 +2,16 @@
import
shlex
import
subprocess
class
Log
:
def
__init__
(
self
,
line
):
self
.
line
=
line
self
.
sha
,
self
.
author
,
self
.
message
=
line
.
split
(
';'
,
maxsplit
=
2
)
def
__repr__
(
self
):
return
self
.
line
@
classmethod
def
map_stream
(
cls
,
stream
):
return
[
cls
(
l
)
for
l
in
stream
.
decode
(
'utf-8'
).
splitlines
()]
def
git_decode
(
letter
):
code
=
{
' '
:
'unmodified'
,
...
...
@@ -40,69 +50,108 @@ class Status(list):
#super().__init__()
super
().
__init__
(
files
)
class
Repo
:
cmd_list
=
[]
def
__init__
(
self
,
path
):
self
.
path
=
path
for
cls
,
cmd
in
self
.
cmd_list
:
setattr
(
self
,
cmd
,
cls
(
self
))
@
classmethod
def
command
(
this
,
name
):
def
wrapper
(
cls
):
this
.
cmd_list
.
append
((
cls
,
name
))
return
cls
return
wrapper
class
GitCommand
(
object
):
"""
TODO: consider turning this class
into a function,
using __new__(*args)
instead of __call__(self, *args)
Base for git commands, to be instantiated
in a git repo.
"""
git_cmd
=
[]
def
__init__
(
self
,
repo
):
self
.
repo
=
repo
def
git_args
(
self
,
*
args
):
return
[]
def
result
(
self
,
rv
):
return
rv
.
returncode
==
0
"""
convert args to actual git command arguments
"""
return
list
(
args
)
def
before
(
self
):
"""
actions to be run before the git command
(e.g. other git commands to be in a particular state)
"""
pass
def
after
(
self
):
"""
actions to be run after the git command
(e.g. to restore tree state)
"""
pass
def
check_before
(
self
):
"""
checks to be done before
"""
return
True
def
result
(
self
,
rv
):
"""
postprocess the CompletedProcess value
"""
return
rv
.
returncode
==
0
def
__call__
(
self
,
*
args
,
**
kwargs
):
self
.
before
()
assert
self
.
check_before
()
args
=
self
.
git_cmd
+
self
.
git_args
(
*
args
)
rv
=
subprocess
.
check_output
(
args
)
rv
=
subprocess
.
run
(
args
,
cwd
=
self
.
repo
.
path
,
check
=
True
,
stdout
=
subprocess
.
PIPE
)
self
.
after
()
return
self
.
result
(
rv
)
def
instantiate
(
cls
):
return
cls
()
@
instantiate
class
git_skip
(
GitCommand
):
@
Repo
.
command
(
'noop'
)
class
GitNoOp
(
GitCommand
):
pass
@
instantiate
class
git_add
(
GitCommand
):
git_cmd
=
[
'git'
,
'add'
]
def
git_args
(
self
,
path
):
return
[
path
]
@
instantiate
class
git_add_tree
(
GitCommand
):
git_cmd
=
[
'git'
,
'add'
,
'-u'
,
':/'
]
@
instantiate
class
git_status
(
GitCommand
):
@
Repo
.
command
(
'status'
)
class
GitStatus
(
GitCommand
):
git_cmd
=
[
'git'
,
'status'
,
'--porcelain'
]
def
result
(
self
,
rv
):
return
Status
(
rv
.
decode
(
'utf-8'
).
splitlines
())
return
Status
(
rv
.
stdout
.
decode
(
'utf-8'
).
splitlines
())
@
Repo
.
command
(
'log'
)
class
GitLog
(
GitCommand
):
git_cmd
=
[
'git'
,
'log'
,
'--pretty="%h;%an;%s"'
]
def
result
(
self
,
rv
):
return
Log
.
map_stream
(
rv
.
stdout
)
@
Repo
.
command
(
'add'
)
class
GitAdd
(
GitCommand
):
git_cmd
=
[
'git'
,
'add'
]
@
Repo
.
command
(
'add_tree'
)
class
GitAddTree
(
GitCommand
):
"""
adds all files in working directory,
new and deleted ones
"""
git_cmd
=
[
'git'
,
'add'
,
'--all'
]
@
Repo
.
command
(
'commit'
)
class
GitCommit
(
GitCommand
):
git_cmd
=
[
'git'
,
'commit'
]
def
git_args
(
self
,
message
,
author
):
author
=
shlex
.
quote
(
author
)
return
[
'-m'
,
message
,
'--author="%s"'
%
(
author
)]
@
instantiate
class
g
it
_c
ommit
_i
ndex
(
GitCommit
):
@
Repo
.
command
(
'commit_index'
)
class
G
it
C
ommit
I
ndex
(
GitCommit
):
pass
@
instantiate
class
g
it
_c
ommit
_t
ree
(
GitCommit
):
@
Repo
.
command
(
'commit_tree'
)
class
G
it
C
ommit
T
ree
(
GitCommit
):
git_cmd
=
[
'git'
,
'commit'
]
def
check_before
(
self
):
return
GitAddT
ree
()
return
self
.
repo
.
add_t
ree
()
#@instantiate
#class GitCheckUpstream(GitCommand):
# """
# git fetch origin master
...
...
lektor_commit.py
View file @
559b5ec4
...
...
@@ -9,9 +9,8 @@ from flask import Blueprint, \
Markup
,
\
has_app_context
,
\
has_request_context
from
lektor.pluginsystem
import
Plugin
from
lektor.pluginsystem
import
Plugin
,
get_plugin
from
lektor.publisher
import
Publisher
from
lektor.pluginsystem
import
get_plugin
from
lektor.admin.modules
import
serve
,
api
from
git
import
git_add
,
git_status
,
git_commit_tree
...
...
@@ -72,7 +71,7 @@ class GitPushPublisher(CommitPublisher):
caveat: the publish method does not have access to the request
which triggered it, hence the publisher name.
As a tentative workaround, we
let flask
store the user associated
As a tentative workaround, we
must
store the user associated
to the last valid /publish request
"""
def
publish
(
self
,
target_url
,
credentials
=
None
,
**
extra
):
...
...
@@ -118,3 +117,7 @@ class CommitPlugin(Plugin):
# # can use request
# return response
def
on_lektor_login_access_api
(
self
,
*
args
,
**
extra
):
raise
Exception
(
'Cool'
)
def
on_login_access_api
(
self
,
*
args
,
**
extra
):
raise
Exception
(
'Cool'
)
lektor_commit/__init__.py
0 → 100644
View file @
559b5ec4
lektor_commit/git.py
0 → 100644
View file @
559b5ec4
# -*- coding: utf-8 -*-
import
shlex
import
subprocess
def
git_decode
(
letter
):
code
=
{
' '
:
'unmodified'
,
'M'
:
'modified'
,
'A'
:
'added'
,
'D'
:
'deleted'
,
'U'
:
'unmerged'
,
'?'
:
'untracked'
}
return
code
.
get
(
letter
,
None
)
class
PathStatus
:
"""
File status as output by git status --porcelain
TODO: consider using porcelain=v2
"""
def
__init__
(
self
,
line
,
number
):
self
.
number
=
number
self
.
line
=
line
self
.
index
=
git_decode
(
line
[
0
])
self
.
tree
=
git_decode
(
line
[
1
])
self
.
path
=
line
[
3
:]
def
__repr__
(
self
):
return
self
.
line
class
Status
(
list
):
states
=
[
(
'changed'
,
'modifiés non publiés'
),
(
'saved'
,
'validés pour publication'
)
]
actions
=
[
(
'save'
,
'valider'
),
(
'skip'
,
'ignorer'
),
(
'discard'
,
'abandonner'
)
]
def
__init__
(
self
,
gitstatus
):
files
=
[
PathStatus
(
l
,
i
)
for
i
,
l
in
enumerate
(
gitstatus
)
]
#super().__init__()
super
().
__init__
(
files
)
class
GitCommand
(
object
):
"""
TODO: consider turning this class
into a function,
using __new__(*args)
instead of __call__(self, *args)
"""
git_cmd
=
[]
def
git_args
(
self
,
*
args
):
return
[]
def
result
(
self
,
rv
):
return
rv
.
returncode
==
0
def
before
(
self
):
pass
def
check_before
(
self
):
return
True
def
__call__
(
self
,
*
args
,
**
kwargs
):
self
.
before
()
assert
self
.
check_before
()
args
=
self
.
git_cmd
+
self
.
git_args
(
*
args
)
rv
=
subprocess
.
check_output
(
args
)
return
self
.
result
(
rv
)
def
instantiate
(
cls
):
return
cls
()
@
instantiate
class
git_skip
(
GitCommand
):
pass
@
instantiate
class
git_add
(
GitCommand
):
git_cmd
=
[
'git'
,
'add'
]
def
git_args
(
self
,
path
):
return
[
path
]
@
instantiate
class
git_add_tree
(
GitCommand
):
git_cmd
=
[
'git'
,
'add'
,
'-u'
,
':/'
]
@
instantiate
class
git_status
(
GitCommand
):
git_cmd
=
[
'git'
,
'status'
,
'--porcelain'
]
def
result
(
self
,
rv
):
return
Status
(
rv
.
decode
(
'utf-8'
).
splitlines
())
class
GitCommit
(
GitCommand
):
git_cmd
=
[
'git'
,
'commit'
]
def
git_args
(
self
,
message
,
author
):
author
=
shlex
.
quote
(
author
)
return
[
'-m'
,
message
,
'--author="%s"'
%
(
author
)]
@
instantiate
class
git_commit_index
(
GitCommit
):
pass
@
instantiate
class
git_commit_tree
(
GitCommit
):
git_cmd
=
[
'git'
,
'commit'
]
def
check_before
(
self
):
return
GitAddTree
()
#@instantiate
#class GitCheckUpstream(GitCommand):
# """
# git fetch origin master
# git merge-tree `git merge-base FETCH_HEAD master` FETCH_HEAD master
# """
# branch = 'master'
# origin = 'origin'
# class GitMergeBase(GitCommand):
# git_cmd = ['git', 'merge-base', 'FETCH_HEAD', branch]
# class GitMergeTree(GitCommand):
# git_cmd = ['git', 'merge-tree']
# def git_args(self, sha):
# sha = merge_base()
# return []
# fetch = GitFetch()
# merge_base = GitMergeBase()
# merge_tree = GitMergeTree()
# def __call__(self, *args):
# self.fetch()
# sha = self.merge_base()
# return self.merge_tree(sha)
lektor_commit/lektor_commit.py
0 → 100644
View file @
559b5ec4
# -*- coding: utf-8 -*-
import
os
import
shutil
import
subprocess
from
flask
import
Blueprint
,
\
current_app
,
\
render_template
,
\
url_for
,
\
Markup
,
\
has_app_context
,
\
has_request_context
from
lektor.pluginsystem
import
Plugin
from
lektor.publisher
import
Publisher
from
lektor.pluginsystem
import
get_plugin
from
lektor.admin.modules
import
serve
,
api
from
git
import
git_add
,
git_status
,
git_commit_tree
gitbp
=
Blueprint
(
'git'
,
__name__
,
url_prefix
=
'/git'
,
static_folder
=
'static'
,
template_folder
=
'templates'
)
@
gitbp
.
route
(
'/status'
,
methods
=
[
'GET'
])
def
status
():
values
=
git_status
()
return
render_template
(
"status.html"
,
status
=
values
)
#@gitbp.route('/add',methods=['POST'])
#def add():
# path = request.values.get('path', None)
# if path:
# rv = git_add(path)
# return "done"
# return "failed"
#
#@gitbp.route('/commit',methods=['POST'])
#def commit():
# #author = request.values.get(
# rv = git_commit()
# #subprocess.call(['git', 'commit', '--author="John Doe <john@doe.org>"'])
# return "failed"
class
CommitPublisher
(
Publisher
):
"""
commit all changes from current tree
get username from session cookie
"""
def
publish
(
self
,
target_url
,
credentials
=
None
,
**
extra
):
rv
=
git_commit_tree
()
yield
rv
class
AskReviewPublisher
(
CommitPublisher
):
"""
commit changes and ask for review before push
"""
def
send_mail
(
self
,
status
):
#mail -s "Test" pascal.molin@math.univ-paris-diderot.fr < /dev/null
pass
def
publish
(
self
,
target_url
,
credentials
=
None
,
**
extra
):
status
=
git_status
()
yield
'%d fichiers modifiés'
for
s
in
status
:
yield
str
(
s
)
self
.
send_mail
(
status
)
yield
'Message envoyé'
yield
'Les changements seront publiés après validation.'
class
GitPushPublisher
(
CommitPublisher
):
"""
commit modified files and push
caveat: the publish method does not have access to the request
which triggered it, hence the publisher name.
As a tentative workaround, we let flask store the user associated
to the last valid /publish request
"""
def
publish
(
self
,
target_url
,
credentials
=
None
,
**
extra
):
#src_path = self.output_path
#dst_path = target_url.path
#self.fail("Vous n'avez pas le droit de publier directement, \
# sélectionnez plutôt «signaler les changements pour publication»")
app
=
current_app
login
=
get_plugin
(
'login'
,
self
.
env
)
with
app
.
app_context
():
user
=
login
.
get_auth_user
()
if
user
is
not
None
and
user
.
get
(
'publish'
,
False
):
msg
=
'user %s'
%
user
[
'username'
]
yield
msg
yield
'Done'
self
.
fail
(
'Please commit all changes before publishing'
)
class
CommitPlugin
(
Plugin
):
name
=
'lektor-commit'
description
=
u
'Publish by git commit+push, or ask review.'
def
on_setup_env
(
self
,
*
args
,
**
extra
):
self
.
env
.
add_publisher
(
'askreview'
,
AskReviewPublisher
)
self
.
env
.
add_publisher
(
'gitpush'
,
GitPushPublisher
)
@
serve
.
bp
.
before_app_first_request
def
setup_git_bp
():
app
=
current_app
app
.
register_blueprint
(
gitbp
)
login
=
get_plugin
(
'login'
,
self
.
env
)
login
.
add_button
(
url_for
(
'git.status'
),
'see all changes'
,
Markup
(
'<i class="fa fa-save"></i>'
))
## use to add/commit after each use
#@api.bp.after_request
##pylint: disable=unused-variable
#def after_request_api(response):
# # add modification to user cookie
# # can use request
# return response
test/conftest.py
0 → 100644
View file @
559b5ec4
import
os
import
sys
import
pytest
import
shutil
import
tempfile
import
json
import
requests
import
subprocess
import
time
from
requests
import
Session
from
urllib.parse
import
urljoin
from
configparser
import
ConfigParser
sys
.
path
.
insert
(
0
,
os
.
path
.
abspath
(
os
.
path
.
join
(
os
.
path
.
dirname
(
__file__
),
'..'
)))
from
git
import
Repo
def
pytest_addoption
(
parser
):
parser
.
addini
(
'project'
,
'path of a the lektor test project'
)
parser
.
addini
(
'packages'
,
'plugins path to add'
,
'linelist'
)
# dunno how pathlist is parsed
parser
.
addini
(
'port'
,
'http server port'
,
default
=
'5787'
)
@
pytest
.
fixture
(
scope
=
'session'
)
def
project_path
(
request
):
return
request
.
config
.
getini
(
'project'
)
@
pytest
.
fixture
(
scope
=
'session'
)
def
port
(
request
):
return
int
(
request
.
config
.
getini
(
'port'
))
@
pytest
.
fixture
(
scope
=
'session'
)
def
packages
(
request
):
return
request
.
config
.
getini
(
'packages'
)
@
pytest
.
fixture
(
scope
=
'session'
)
def
lektorproject
(
project_path
,
packages
):
try
:
output_path
=
tempfile
.
mkdtemp
()
print
(
'OUT: '
,
output_path
)
# copy main repo
print
(
'PROJECT: '
,
project_path
)
# copy with git repo
print
(
'COPY project'
)
shutil
.
copytree
(
project_path
,
output_path
,
dirs_exist_ok
=
True
)
def
ignore
(
path
,
names
):
# no symlink
remove
=
set
(
name
for
name
in
names
if
name
in
[
'.git'
,
'test'
,
'content'
,
'__pycache__'
,
'.pytest_cache'
]
or
name
.
endswith
(
'.egg-info'
)
or
os
.
path
.
islink
(
os
.
path
.
join
(
path
,
name
))
)
print
(
'folder %s -> remove %s'
%
(
path
,
remove
))
return
remove
# add local modifications
print
(
'COPY local folders, except symlinks'
)
if
os
.
path
.
isdir
(
'test/site'
):
shutil
.
copytree
(
'test/site'
,
output_path
,
ignore
=
ignore
,
dirs_exist_ok
=
True
)
# add packages and current plugin
#testdir = shutil.ignore_patterns('test', '.git', '.egg*', '__pycache*')
for
p
in
packages
+
[
os
.
getcwd
()
]:
print
(
'COPY package %s'
%
p
)
p
=
os
.
path
.
normpath
(
p
)
name
=
os
.
path
.
basename
(
p
)
shutil
.
copytree
(
p
,
os
.
path
.
join
(
output_path
,
'packages'
,
name
),
ignore
=
ignore
,
dirs_exist_ok
=
True
)
except
(
OSError
,
IOError
)
as
e
:
pytest
.
exit
(
'FATAL: could not copy test site directory. %s'
,
e
)
# error
yield
output_path
try
:
shutil
.
rmtree
(
output_path
)
except
(
OSError
,
IOError
):
pass
@
pytest
.
fixture
(
scope
=
'session'
)
def
git
(
lektorproject
):
repo
=
Repo
(
lektorproject
)
yield
repo
class
BaseUrlSession
(
requests
.
Session
):
# https://github.com/requests/toolbelt/blob/master/requests_toolbelt/sessions.py
def
__init__
(
self
,
base_url
=
None
):
if
base_url
:
self
.
base_url
=
base_url
super
(
BaseUrlSession
,
self
).
__init__
()
def
request
(
self
,
method
,
url
,
*
args
,
**
kwargs
):
"""Send the request after generating the complete URL."""
url
=
self
.
create_url
(
url
)
return
super
(
BaseUrlSession
,
self
).
request
(
method
,
url
,
*
args
,
**
kwargs
)
def
create_url
(
self
,
url
):
"""Create the URL based off this partial path."""
return
urljoin
(
self
.
base_url
,
url
)
@
pytest
.
fixture
(
scope
=
'module'
)
def
server
(
lektorproject
,
port
):
servercmd
=
'lektor server -p %d'
%
port
print
(
"[START LEKTOR SERVER]"
)
server
=
subprocess
.
Popen
([
"lektor"
,
"server"
,
"-p %d"
%
port
],
cwd
=
lektorproject
)
server
.
base_url
=
'http://localhost:%d'
%
port
time
.
sleep
(
4
)
#while True:
# try:
# requests.get(URL+'/', timeout=.2)
# break
# except requests.exceptions.Timeout:
# print("wait server")
# pass
yield
server
print
(
"[HALT LEKTOR SERVER]"
)
server
.
kill
()
def
login
(
client
,
name
,
**
kwargs
):
password
=
kwargs
.
pop
(
'password'
,
name
)
url
=
kwargs
.
pop
(
'url'
,
'/admin/root/edit'
)
return
client
.
post
(
'/auth/login'
,
data
=
dict
(
username
=
name
,
password
=
password
,
url
=
url
),
**
kwargs
)
def
logout
(
client
,
**
kwargs
):
return
client
.
get
(
'/auth/logout'
,
**
kwargs
)
@
pytest
.
fixture
(
scope
=
'function'
)
def
anonymous
(
server
):
session
=
BaseUrlSession
(
base_url
=
server
.
base_url
)
yield
session
session
.
close
()
@
pytest
.
fixture
(
scope
=
'function'
)
def
admin
(
server
):
session
=
BaseUrlSession
(
base_url
=
server
.
base_url
)
login
(
session
,
'admin'
)
yield
session
session
.
close
()
@
pytest
.
fixture
(
scope
=
'function'
)
def
blog
(
server
):
session
=
BaseUrlSession
(
base_url
=
server
.
base_url
)
login
(
session
,
'blog'
)
yield
session
session
.
close
()
@
pytest
.
fixture
(
scope
=
'function'
)
def
test
(
server
):
session
=
BaseUrlSession
(
base_url
=
server
.
base_url
)
login
(
session
,
'test'
)
yield
session
session
.
close
()
@
pytest
.
fixture
(
scope
=
'module'
)
def
project
(
request
,
lektorproject
):
from
lektor.project
import
Project
return
Project
.
from_path
(
lektorproject
)
@
pytest
.
fixture
(
scope
=
'module'
)
def
env
(
request
,
project
):
env
=
project
.
make_env
()
return
env
@
pytest
.
fixture
(
scope
=
'function'
)
def
pad
(
request
,
env
):
from
lektor.db
import
Database
return
Database
(
env
).
new_pad
()
@
pytest
.
fixture
(
scope
=
'module'
)
def
webui
(
request
,
env
):
from
lektor.admin.webui
import
WebUI
output_path
=
tempfile
.
mkdtemp
()
def
cleanup
():
try
:
shutil
.
rmtree
(
output_path
)
except
(
OSError
,
IOError
):
pass
request
.
addfinalizer
(
cleanup
)
return
WebUI
(
env
,
debug
=
True
,
output_path
=
output_path
)
@
pytest
.
fixture
(
scope
=
'module'
)