mirror of
https://github.com/fergalmoran/flameshot.git
synced 2025-12-22 09:51:06 +00:00
389 lines
13 KiB
Python
389 lines
13 KiB
Python
#!/usr/bin/env python3
|
|
|
|
#
|
|
# Copyright (c) 2018-2020 Leonardo Taccari
|
|
# All rights reserved.
|
|
#
|
|
# Redistribution and use in source and binary forms, with or without
|
|
# modification, are permitted provided that the following conditions
|
|
# are met:
|
|
#
|
|
# 1. Redistributions of source code must retain the above copyright
|
|
# notice, this list of conditions and the following disclaimer.
|
|
# 2. Redistributions in binary form must reproduce the above copyright
|
|
# notice, this list of conditions and the following disclaimer in the
|
|
# documentation and/or other materials provided with the distribution.
|
|
#
|
|
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
|
|
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
|
|
# TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
|
|
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS
|
|
# BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
|
|
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
|
|
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
|
|
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
|
|
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
|
|
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
|
# POSSIBILITY OF SUCH DAMAGE.
|
|
#
|
|
|
|
|
|
"""
|
|
Download/upload files via wetransfer.com
|
|
|
|
transferwee is a script/module to download/upload files via wetransfer.com.
|
|
|
|
It exposes `download' and `upload' subcommands, respectively used to download
|
|
files from a `we.tl' or `wetransfer.com/downloads' URLs and upload files that
|
|
will be shared via emails or link.
|
|
"""
|
|
|
|
from typing import List
|
|
import os.path
|
|
import re
|
|
import urllib.parse
|
|
import zlib
|
|
|
|
import requests
|
|
|
|
|
|
WETRANSFER_API_URL = 'https://wetransfer.com/api/v4/transfers'
|
|
WETRANSFER_DOWNLOAD_URL = WETRANSFER_API_URL + '/{transfer_id}/download'
|
|
WETRANSFER_UPLOAD_EMAIL_URL = WETRANSFER_API_URL + '/email'
|
|
WETRANSFER_VERIFY_URL = WETRANSFER_API_URL + '/{transfer_id}/verify'
|
|
WETRANSFER_UPLOAD_LINK_URL = WETRANSFER_API_URL + '/link'
|
|
WETRANSFER_FILES_URL = WETRANSFER_API_URL + '/{transfer_id}/files'
|
|
WETRANSFER_PART_PUT_URL = WETRANSFER_FILES_URL + '/{file_id}/part-put-url'
|
|
WETRANSFER_FINALIZE_MPP_URL = WETRANSFER_FILES_URL + '/{file_id}/finalize-mpp'
|
|
WETRANSFER_FINALIZE_URL = WETRANSFER_API_URL + '/{transfer_id}/finalize'
|
|
|
|
WETRANSFER_DEFAULT_CHUNK_SIZE = 5242880
|
|
WETRANSFER_EXPIRE_IN = 604800
|
|
|
|
|
|
def download_url(url: str) -> str:
|
|
"""Given a wetransfer.com download URL download return the downloadable URL.
|
|
|
|
The URL should be of the form `https://we.tl/' or
|
|
`https://wetransfer.com/downloads/'. If it is a short URL (i.e. `we.tl')
|
|
the redirect is followed in order to retrieve the corresponding
|
|
`wetransfer.com/downloads/' URL.
|
|
|
|
The following type of URLs are supported:
|
|
- `https://we.tl/<short_url_id>`:
|
|
received via link upload, via email to the sender and printed by
|
|
`upload` action
|
|
- `https://wetransfer.com/<transfer_id>/<security_hash>`:
|
|
directly not shared in any ways but the short URLs actually redirect to
|
|
them
|
|
- `https://wetransfer.com/<transfer_id>/<recipient_id>/<security_hash>`:
|
|
received via email by recipients when the files are shared via email
|
|
upload
|
|
|
|
Return the download URL (AKA `direct_link') as a str or None if the URL
|
|
could not be parsed.
|
|
"""
|
|
# Follow the redirect if we have a short URL
|
|
if url.startswith('https://we.tl/'):
|
|
r = requests.head(url, allow_redirects=True)
|
|
url = r.url
|
|
|
|
recipient_id = None
|
|
params = urllib.parse.urlparse(url).path.split('/')[2:]
|
|
|
|
if len(params) == 2:
|
|
transfer_id, security_hash = params
|
|
elif len(params) == 3:
|
|
transfer_id, recipient_id, security_hash = params
|
|
else:
|
|
return None
|
|
|
|
j = {
|
|
"intent": "entire_transfer",
|
|
"security_hash": security_hash,
|
|
}
|
|
if recipient_id:
|
|
j["recipient_id"] = recipient_id
|
|
s = _prepare_session()
|
|
r = s.post(WETRANSFER_DOWNLOAD_URL.format(transfer_id=transfer_id),
|
|
json=j)
|
|
|
|
j = r.json()
|
|
return j.get('direct_link')
|
|
|
|
|
|
def _file_unquote(file: str) -> str:
|
|
"""Given a URL encoded file unquote it.
|
|
|
|
All occurences of `\', `/' and `../' will be ignored to avoid possible
|
|
directory traversals.
|
|
"""
|
|
return urllib.parse.unquote(file).replace('../', '').replace('/', '').replace('\\', '')
|
|
|
|
|
|
def download(url: str, file: str = '') -> None:
|
|
"""Given a `we.tl/' or `wetransfer.com/downloads/' download it.
|
|
|
|
First a direct link is retrieved (via download_url()), the filename can be
|
|
provided via the optional `file' argument. If not provided the filename
|
|
will be extracted to it and it will be fetched and stored on the current
|
|
working directory.
|
|
"""
|
|
dl_url = download_url(url)
|
|
if not file:
|
|
file = _file_unquote(urllib.parse.urlparse(dl_url).path.split('/')[-1])
|
|
|
|
r = requests.get(dl_url, stream=True)
|
|
with open(file, 'wb') as f:
|
|
for chunk in r.iter_content(chunk_size=1024):
|
|
f.write(chunk)
|
|
|
|
|
|
def _file_name_and_size(file: str) -> dict:
|
|
"""Given a file, prepare the "name" and "size" dictionary.
|
|
|
|
Return a dictionary with "name" and "size" keys.
|
|
"""
|
|
filename = os.path.basename(file)
|
|
filesize = os.path.getsize(file)
|
|
|
|
return {
|
|
"name": filename,
|
|
"size": filesize
|
|
}
|
|
|
|
|
|
def _prepare_session() -> requests.Session:
|
|
"""Prepare a wetransfer.com session.
|
|
|
|
Return a requests session that will always pass the required headers
|
|
and with cookies properly populated that can be used for wetransfer
|
|
requests.
|
|
"""
|
|
s = requests.Session()
|
|
r = s.get('https://wetransfer.com/')
|
|
m = re.search('name="csrf-token" content="([^"]+)"', r.text)
|
|
s.headers.update({
|
|
'x-csrf-token': m.group(1),
|
|
'x-requested-with': 'XMLHttpRequest',
|
|
})
|
|
|
|
return s
|
|
|
|
|
|
def _prepare_email_upload(filenames: List[str], message: str,
|
|
sender: str, recipients: List[str],
|
|
session: requests.Session) -> str:
|
|
"""Given a list of filenames, message a sender and recipients prepare for
|
|
the email upload.
|
|
|
|
Return the parsed JSON response.
|
|
"""
|
|
j = {
|
|
"files": [_file_name_and_size(f) for f in filenames],
|
|
"from": sender,
|
|
"message": message,
|
|
"recipients": recipients,
|
|
"ui_language": "en",
|
|
}
|
|
|
|
r = session.post(WETRANSFER_UPLOAD_EMAIL_URL, json=j)
|
|
return r.json()
|
|
|
|
|
|
def _verify_email_upload(transfer_id: str, session: requests.Session) -> str:
|
|
"""Given a transfer_id, read the code from standard input.
|
|
|
|
Return the parsed JSON response.
|
|
"""
|
|
code = input('Code:')
|
|
|
|
j = {
|
|
"code": code,
|
|
"expire_in": WETRANSFER_EXPIRE_IN,
|
|
}
|
|
|
|
r = session.post(WETRANSFER_VERIFY_URL.format(transfer_id=transfer_id),
|
|
json=j)
|
|
return r.json()
|
|
|
|
|
|
def _prepare_link_upload(filenames: List[str], message: str,
|
|
session: requests.Session) -> str:
|
|
"""Given a list of filenames and a message prepare for the link upload.
|
|
|
|
Return the parsed JSON response.
|
|
"""
|
|
j = {
|
|
"files": [_file_name_and_size(f) for f in filenames],
|
|
"message": message,
|
|
"ui_language": "en",
|
|
}
|
|
|
|
r = session.post(WETRANSFER_UPLOAD_LINK_URL, json=j)
|
|
return r.json()
|
|
|
|
|
|
def _prepare_file_upload(transfer_id: str, file: str,
|
|
session: requests.Session) -> str:
|
|
"""Given a transfer_id and file prepare it for the upload.
|
|
|
|
Return the parsed JSON response.
|
|
"""
|
|
j = _file_name_and_size(file)
|
|
r = session.post(WETRANSFER_FILES_URL.format(transfer_id=transfer_id),
|
|
json=j)
|
|
return r.json()
|
|
|
|
|
|
def _upload_chunks(transfer_id: str, file_id: str, file: str,
|
|
session: requests.Session,
|
|
default_chunk_size: int = WETRANSFER_DEFAULT_CHUNK_SIZE) -> str:
|
|
"""Given a transfer_id, file_id and file upload it.
|
|
|
|
Return the parsed JSON response.
|
|
"""
|
|
f = open(file, 'rb')
|
|
|
|
chunk_number = 0
|
|
while True:
|
|
chunk = f.read(default_chunk_size)
|
|
chunk_size = len(chunk)
|
|
if chunk_size == 0:
|
|
break
|
|
chunk_number += 1
|
|
|
|
j = {
|
|
"chunk_crc": zlib.crc32(chunk),
|
|
"chunk_number": chunk_number,
|
|
"chunk_size": chunk_size,
|
|
"retries": 0
|
|
}
|
|
|
|
r = session.post(
|
|
WETRANSFER_PART_PUT_URL.format(transfer_id=transfer_id,
|
|
file_id=file_id),
|
|
json=j)
|
|
url = r.json().get('url')
|
|
requests.options(url,
|
|
headers={
|
|
'Origin': 'https://wetransfer.com',
|
|
'Access-Control-Request-Method': 'PUT',
|
|
})
|
|
requests.put(url, data=chunk)
|
|
|
|
j = {
|
|
'chunk_count': chunk_number
|
|
}
|
|
r = session.put(
|
|
WETRANSFER_FINALIZE_MPP_URL.format(transfer_id=transfer_id,
|
|
file_id=file_id),
|
|
json=j)
|
|
|
|
return r.json()
|
|
|
|
|
|
def _finalize_upload(transfer_id: str, session: requests.Session) -> str:
|
|
"""Given a transfer_id finalize the upload.
|
|
|
|
Return the parsed JSON response.
|
|
"""
|
|
r = session.put(WETRANSFER_FINALIZE_URL.format(transfer_id=transfer_id))
|
|
|
|
return r.json()
|
|
|
|
|
|
def upload(files: List[str], message: str = '', sender: str = None,
|
|
recipients: List[str] = []) -> str:
|
|
"""Given a list of files upload them and return the corresponding URL.
|
|
|
|
Also accepts optional parameters:
|
|
- `message': message used as a description of the transfer
|
|
- `sender': email address used to receive an ACK if the upload is
|
|
successfull. For every download by the recipients an email
|
|
will be also sent
|
|
- `recipients': list of email addresses of recipients. When the upload
|
|
succeed every recipients will receive an email with a link
|
|
|
|
If both sender and recipient parameters are passed the email upload will be
|
|
used. Otherwise, the link upload will be used.
|
|
|
|
Return the short URL of the transfer on success.
|
|
"""
|
|
|
|
# Check that all files exists
|
|
for f in files:
|
|
if not os.path.exists(f):
|
|
raise FileNotFoundError(f)
|
|
|
|
# Check that there are no duplicates filenames
|
|
# (despite possible different dirname())
|
|
filenames = [os.path.basename(f) for f in files]
|
|
if len(files) != len(set(filenames)):
|
|
raise FileExistsError('Duplicate filenames')
|
|
|
|
transfer_id = None
|
|
s = _prepare_session()
|
|
if sender and recipients:
|
|
# email upload
|
|
transfer_id = \
|
|
_prepare_email_upload(files, message, sender, recipients, s)['id']
|
|
_verify_email_upload(transfer_id, s)
|
|
else:
|
|
# link upload
|
|
transfer_id = _prepare_link_upload(files, message, s)['id']
|
|
|
|
for f in files:
|
|
file_id = _prepare_file_upload(transfer_id, f, s)['id']
|
|
_upload_chunks(transfer_id, file_id, f, s)
|
|
|
|
return _finalize_upload(transfer_id, s)['shortened_url']
|
|
|
|
|
|
if __name__ == '__main__':
|
|
from sys import exit
|
|
import argparse
|
|
|
|
ap = argparse.ArgumentParser(
|
|
prog='transferwee',
|
|
description='Download/upload files via wetransfer.com'
|
|
)
|
|
sp = ap.add_subparsers(dest='action', help='action')
|
|
|
|
# download subcommand
|
|
dp = sp.add_parser('download', help='download files')
|
|
dp.add_argument('-g', action='store_true',
|
|
help='only print the direct link (without downloading it)')
|
|
dp.add_argument('-o', type=str, default='', metavar='file',
|
|
help='output file to be used')
|
|
dp.add_argument('url', nargs='+', type=str, metavar='url',
|
|
help='URL (we.tl/... or wetransfer.com/downloads/...)')
|
|
|
|
# upload subcommand
|
|
up = sp.add_parser('upload', help='upload files')
|
|
up.add_argument('-m', type=str, default='', metavar='message',
|
|
help='message description for the transfer')
|
|
up.add_argument('-f', type=str, metavar='from', help='sender email')
|
|
up.add_argument('-t', nargs='+', type=str, metavar='to',
|
|
help='recipient emails')
|
|
up.add_argument('files', nargs='+', type=str, metavar='file',
|
|
help='files to upload')
|
|
|
|
args = ap.parse_args()
|
|
|
|
if args.action == 'download':
|
|
if args.g:
|
|
for u in args.url:
|
|
print(download_url(u))
|
|
else:
|
|
for u in args.url:
|
|
download(u, args.o)
|
|
exit(0)
|
|
|
|
if args.action == 'upload':
|
|
print(upload(args.files, args.m, args.f, args.t))
|
|
exit(0)
|
|
|
|
# No action selected, print help message
|
|
ap.print_help()
|
|
exit(1) |