23 Commits
0.2 ... 0.3.0

Author SHA1 Message Date
r4sas
682b47fbd3 0.3.0
New:
* Added validation of PrivateBin instance URL - #18 (it must contain trailing slash because POST is used)
* URL shortener support with various supported services - #19
* Shortener configuration, certificate validation and insecure warning settings can be configured in config file or via env

Changed:
* Restructured some parts of code by splitting big code chunks in funtions (encrypt/decrypt)
* Rework error messaging repeatable code (moved in utils)
* Reduce code duplication (requests session configuring)

Signed-off-by: r4sas <r4sas@i2pmail.org>
2019-09-20 10:50:06 +00:00
r4sas
9d82c727b6 [shortener] move service-depend init code to functions
Signed-off-by: r4sas <r4sas@i2pmail.org>
2019-09-20 06:12:10 +00:00
r4sas
c425d86ed6 [shortener] remove not completed bitly support code
Anyone can request to add support later, so for now I removing that code due to lack of use.

Signed-off-by: r4sas <r4sas@i2pmail.org>
2019-09-20 06:00:08 +00:00
r4sas
3ed06686ab Codacy: fix W0106
Signed-off-by: r4sas <r4sas@i2pmail.org>
2019-09-19 00:10:39 +00:00
r4sas
7e4fb0a8c4 split encrypt and decrypt code in separated functions by paste version
Signed-off-by: r4sas <r4sas@i2pmail.org>
2019-09-18 23:48:46 +00:00
r4sas
ebfe0c48a0 [shortener] separate services related code in functions
Signed-off-by: r4sas <r4sas@i2pmail.org>
2019-09-18 23:22:56 +00:00
r4sas
45d854e590 [shortener] add is.gd, v.gd and cutt.ly services support
Start realisation of bitly support.

Signed-off-by: r4sas <r4sas@i2pmail.org>
2019-09-18 21:09:16 +00:00
r4sas
18d79c8e04 add .gitattributes
Signed-off-by: r4sas <r4sas@i2pmail.org>
2019-09-18 19:07:15 +00:00
r4sas
92c38344e3 add tinyurl support
Signed-off-by: r4sas <r4sas@i2pmail.org>
2019-09-18 19:01:41 +00:00
r4sas
b596f42b7e fix response handler (closes #19)
Signed-off-by: r4sas <r4sas@i2pmail.org>
2019-09-18 13:46:30 +00:00
r4sas
86c5051fcf move 'requests' configuration code in separate function
Signed-off-by: r4sas <r4sas@i2pmail.org>
2019-09-18 12:02:41 +00:00
r4sas
f838f8ee94 fix codacy issues
Signed-off-by: r4sas <r4sas@i2pmail.org>
2019-09-18 11:43:44 +00:00
r4sas
fb7a93732d rewrite yourls response handler (#19)
Signed-off-by: r4sas <r4sas@i2pmail.org>
2019-09-18 11:27:15 +00:00
r4sas
181763070c remove unused import, replace comment definition
Signed-off-by: r4sas <r4sas@i2pmail.org>
2019-09-18 10:22:58 +00:00
r4sas
c3a491ac46 correct response store for yourls (#19)
Signed-off-by: r4sas <r4sas@i2pmail.org>
2019-09-18 10:14:44 +00:00
r4sas
641c55a6a2 fix dict merging (#19)
Signed-off-by: r4sas <r4sas@i2pmail.org>
2019-09-17 13:22:28 +00:00
r4sas
432675f2e6 url shortener support (#19)
Currently tested only with clck.ru service. YOURLS test needed.

Signed-off-by: r4sas <r4sas@i2pmail.org>
2019-09-17 10:54:58 +00:00
r4sas
635c87dabd [wip] url shortener support (#19)
Signed-off-by: r4sas <r4sas@i2pmail.org>
2019-09-17 08:55:59 +00:00
r4sas
6b6c33e545 [wip] url shortener support (#19)
Signed-off-by: r4sas <r4sas@i2pmail.org>
2019-09-17 05:22:57 +00:00
r4sas
7c5ba2fdbe add URL validation for trailing slash (closes #18)
Signed-off-by: r4sas <r4sas@i2pmail.org>
2019-09-09 14:23:30 +00:00
r4sas
692335ee62 0.2.1
Signed-off-by: r4sas <r4sas@i2pmail.org>
2019-08-16 21:24:52 +00:00
R4SAS
c63256c628 Merge pull request #16 from firecat53/patch-1
Fix swapped "open discussion" and "burn after reading" options order
2019-08-17 00:12:49 +03:00
Scott Hansen
66659da66d Update format.py
The --burn and --discuss flags were switched. This change fixes that.
2019-08-16 12:59:36 -07:00
7 changed files with 375 additions and 142 deletions

3
.gitattributes vendored Normal file
View File

@@ -0,0 +1,3 @@
.gitattributes export-ignore
.gitignore export-ignore
README.md export-ignore

View File

@@ -2,6 +2,6 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
__author__ = "R4SAS <r4sas@i2pmail.org>" __author__ = "R4SAS <r4sas@i2pmail.org>"
__version__ = "0.2" __version__ = "0.3.0"
__copyright__ = "Copyright (c) R4SAS" __copyright__ = "Copyright (c) R4SAS"
__license__ = "MIT" __license__ = "MIT"

View File

@@ -1,14 +1,18 @@
from pbincli.format import Paste from pbincli.format import Paste
from pbincli.utils import PBinCLIError
def send(args, api_client, settings=None):
from pbincli.api import Shortener
if args.short:
shortener = Shortener(settings)
def send(args, api_client):
if not args.notext: if not args.notext:
if args.text: if args.text:
text = args.text text = args.text
elif args.stdin: elif args.stdin:
text = args.stdin.read() text = args.stdin.read()
elif not args.file: elif not args.file:
print("Nothing to send!") PBinCLIError("Nothing to send!")
exit(1)
else: else:
text = "" text = ""
@@ -60,29 +64,32 @@ def send(args, api_client):
result['id'], result['id'],
passphrase, passphrase,
result['deletetoken'], result['deletetoken'],
api_client.server, settings['server'],
result['id'], result['id'],
passphrase)) passphrase))
elif result['status']: # return code is other then zero elif result['status']: # return code is other then zero
print("Something went wrong...\nError:\t\t{}".format(result['message'])) PBinCLIError("Something went wrong...\nError:\t\t{}".format(result['message']))
exit(1)
else: # or here no status field in response or it is empty else: # or here no status field in response or it is empty
print("Something went wrong...\nError: Empty response.") PBinCLIError("Something went wrong...\nError: Empty response.")
exit(1)
if args.short:
print("\nQuerying URL shortening service...")
shortener.getlink("{}?{}#{}".format(
settings['server'],
result['id'],
passphrase))
def get(args, api_client): def get(args, api_client, settings=None):
from pbincli.utils import check_writable, json_encode from pbincli.utils import check_writable, json_encode
try: try:
pasteid, passphrase = args.pasteinfo.split("#") pasteid, passphrase = args.pasteinfo.split("#")
except ValueError: except ValueError:
print("PBinCLI error: provided info hasn't contain valid PasteID#Passphrase string") PBinCLIError("Provided info hasn't contain valid PasteID#Passphrase string")
exit(1)
if not (pasteid and passphrase): if not (pasteid and passphrase):
print("PBinCLI error: Incorrect request") PBinCLIError("Incorrect request")
exit(1)
if args.debug: print("PasteID:\t{}\nPassphrase:\t{}".format(pasteid, passphrase)) if args.debug: print("PasteID:\t{}\nPassphrase:\t{}".format(pasteid, passphrase))
@@ -139,14 +146,12 @@ def get(args, api_client):
api_client.delete(json_encode({'pasteid':pasteid,'deletetoken':'burnafterreading'})) api_client.delete(json_encode({'pasteid':pasteid,'deletetoken':'burnafterreading'}))
elif result['status']: # return code is other then zero elif result['status']: # return code is other then zero
print("Something went wrong...\nError:\t\t{}".format(result['message'])) PBinCLIError("Something went wrong...\nError:\t\t{}".format(result['message']))
exit(1)
else: # or here no status field in response or it is empty else: # or here no status field in response or it is empty
print("Something went wrong...\nError: Empty response.") PBinCLIError("Something went wrong...\nError: Empty response.")
exit(1)
def delete(args, api_client): def delete(args, api_client, settings=None):
from pbincli.utils import json_encode from pbincli.utils import json_encode
pasteid = args.paste pasteid = args.paste

View File

@@ -1,21 +1,29 @@
import requests import requests
from requests import HTTPError
from pbincli.utils import PBinCLIError
def _config_requests(settings=None):
if settings['proxy']:
proxy = {settings['proxy'].split('://')[0]: settings['proxy']}
else:
proxy = {}
if settings['no_insecure_warning']:
from requests.packages.urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
session = requests.Session()
session.verify = not settings['no_check_certificate']
return session, proxy
class PrivateBin: class PrivateBin:
def __init__(self, server, settings=None): def __init__(self, settings=None):
self.server = server self.server = settings['server']
self.headers = {'X-Requested-With': 'JSONHttpRequest'} self.headers = {'X-Requested-With': 'JSONHttpRequest'}
if settings['proxy']: self.session, self.proxy = _config_requests(settings)
self.proxy = {settings['proxy'].split('://')[0]: settings['proxy']}
else:
self.proxy = {}
if settings['noinsecurewarn']:
from requests.packages.urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
self.session = requests.Session()
self.session.verify = settings['nocheckcert']
def post(self, request): def post(self, request):
result = self.session.post( result = self.session.post(
@@ -27,8 +35,7 @@ class PrivateBin:
try: try:
return result.json() return result.json()
except ValueError: except ValueError:
print("ERROR: Unable parse response as json. Received (size = {}):\n{}".format(len(result.text), result.text)) PBinCLIError("Unable parse response as json. Received (size = {}):\n{}".format(len(result.text), result.text))
exit(1)
def get(self, request): def get(self, request):
@@ -56,11 +63,9 @@ class PrivateBin:
if not result['status']: if not result['status']:
print("Paste successfully deleted!") print("Paste successfully deleted!")
elif result['status']: elif result['status']:
print("Something went wrong...\nError:\t\t{}".format(result['message'])) PBinCLIError("Something went wrong...\nError:\t\t{}".format(result['message']))
exit(1)
else: else:
print("Something went wrong...\nError: Empty response.") PBinCLIError("Something went wrong...\nError: Empty response.")
exit(1)
def getVersion(self): def getVersion(self):
@@ -72,3 +77,172 @@ class PrivateBin:
'v' in jsonldSchema['@context'] and 'v' in jsonldSchema['@context'] and
'@value' in jsonldSchema['@context']['v']) \ '@value' in jsonldSchema['@context']['v']) \
else 1 else 1
class Shortener:
"""Some parts of this class was taken from
python-yourls (https://github.com/tflink/python-yourls/) library
"""
def __init__(self, settings=None):
self.api = settings['short_api']
# we checking which service is used, because some services doesn't require
# any authentication, or have only one domain on which it working
if self.api == 'yourls':
self._yourls_init(settings)
elif self.api == 'isgd' or self.api == 'vgd':
self._gd_init()
self.session, self.proxy = _config_requests(settings)
def _yourls_init(self, settings):
if not settings['short_url']:
PBinCLIError("YOURLS: An API URL is required")
# setting API URL
apiurl = settings['short_url']
if apiurl.endswith('/yourls-api.php'):
self.apiurl = apiurl
elif apiurl.endswith('/'):
self.apiurl = apiurl + 'yourls-api.php'
else:
PBinCLIError("YOURLS: Incorrect URL is provided.\n" +
"It must contain full address to 'yourls-api.php' script (like https://example.com/yourls-api.php)\n" +
"or just contain instance URL with '/' at the end (like https://example.com/)")
# validating for required credentials
if settings['short_user'] and settings['short_pass'] and settings['short_token'] is None:
self.auth_args = {'username': settings['short_user'], 'password': settings['short_pass']}
elif settings['short_user'] is None and settings['short_pass'] is None and settings['short_token']:
self.auth_args = {'signature': settings['short_token']}
elif settings['short_user'] is None and settings['short_pass'] is None and settings['short_token'] is None:
self.auth_args = {}
else:
PBinCLIError("YOURLS: either username and password or token are required. Otherwise set to default (None)")
def _gd_init(self):
if self.api == 'isgd':
self.apiurl = 'https://is.gd/'
else:
self.apiurl = 'https://v.gd/'
self.useragent = 'Mozilla/5.0 (compatible; pbincli - https://github.com/r4sas/pbincli/)'
def getlink(self, url):
# that is api -> function mapper for running service-related function when getlink() used
servicesList = {
'yourls': self._yourls,
'clckru': self._clckru,
'tinyurl': self._tinyurl,
'isgd': self._gd,
'vgd': self._gd,
'cuttly': self._cuttly
}
# run function selected by choosen API
servicesList[self.api](url)
def _yourls(self,url):
request = {'action': 'shorturl', 'format': 'json', 'url': url}
request.update(self.auth_args)
result = self.session.post(
url = self.apiurl,
proxies = self.proxy,
data = request)
try:
result.raise_for_status()
except HTTPError:
try:
response = result.json()
except ValueError:
PBinCLIError("YOURLS: Unable parse response. Received (size = {}):\n{}".format(len(result.text), result.text))
else:
PBinCLIError("YOURLS: Received error from API: {} with JSON {}".format(result, response))
else:
response = result.json()
if {'status', 'statusCode', 'message'} <= set(response.keys()):
if response['status'] == 'fail':
PBinCLIError("YOURLS: Received error from API: {}".format(response['message']))
if not 'shorturl' in response:
PBinCLIError("YOURLS: Unknown error: {}".format(response['message']))
else:
print("Short Link:\t{}".format(response['shorturl']))
else:
PBinCLIError("YOURLS: No status, statusCode or message fields in response! Received:\n{}".format(response))
def _clckru(self, url):
request = {'url': url}
try:
result = self.session.post(
url = "https://clck.ru/--",
proxies = self.proxy,
data = request)
print("Short Link:\t{}".format(result.text))
except Exception as ex:
PBinCLIError("clck.ru: unexcepted behavior: {}".format(ex))
def _tinyurl(self, url):
request = {'url': url}
try:
result = self.session.post(
url = "https://tinyurl.com/api-create.php",
proxies = self.proxy,
data = request)
print("Short Link:\t{}".format(result.text))
except Exception as ex:
PBinCLIError("TinyURL: unexcepted behavior: {}".format(ex))
def _gd(self, url):
request = {
'format': 'json',
'url': url,
'logstats': 0 # we don't want use any statistics
}
headers = { 'User-Agent': self.useragent}
try:
result = self.session.post(
url = self.apiurl + "create.php",
headers = headers,
proxies = self.proxy,
data = request)
response = result.json()
if 'shorturl' in response:
print("Short Link:\t{}".format(response['shorturl']))
else:
PBinCLIError("{}: got error {} from API: {}".format(
"is.gd" if self.api == 'isgd' else 'v.gd',
response['errorcode'],
response['errormessage']))
except Exception as ex:
PBinCLIError("{}: unexcepted behavior: {}".format(
"is.gd" if self.api == 'isgd' else 'v.gd',
ex))
def _cuttly(self, url):
request = {
'url': url,
'domain': 0
}
try:
result = self.session.post(
url = "https://cutt.ly/scripts/shortenUrl.php",
proxies = self.proxy,
data = request)
print("Short Link:\t{}".format(result.text))
except Exception as ex:
PBinCLIError("cutt.ly: unexcepted behavior: {}".format(ex))

View File

@@ -3,7 +3,7 @@ import os, sys, argparse
import pbincli.actions import pbincli.actions
from pbincli.api import PrivateBin from pbincli.api import PrivateBin
from pbincli.utils import PBinCLIException from pbincli.utils import PBinCLIException, validate_url
CONFIG_PATHS = [os.path.join(".", "pbincli.conf", ), CONFIG_PATHS = [os.path.join(".", "pbincli.conf", ),
os.path.join(os.getenv("HOME") or "~", ".config", "pbincli", "pbincli.conf") ] os.path.join(os.getenv("HOME") or "~", ".config", "pbincli", "pbincli.conf") ]
@@ -36,8 +36,19 @@ def main():
send_parser.add_argument("-q", "--notext", default=False, action="store_true", help="don't send text in paste") send_parser.add_argument("-q", "--notext", default=False, action="store_true", help="don't send text in paste")
send_parser.add_argument("-c", "--compression", default="zlib", action="store", send_parser.add_argument("-c", "--compression", default="zlib", action="store",
choices=["zlib", "none"], help="set compression for paste (default: zlib). Note: works only on v2 paste format") choices=["zlib", "none"], help="set compression for paste (default: zlib). Note: works only on v2 paste format")
send_parser.add_argument("--no-check-certificate", default=True, action="store_false", help="disable certificate validation") # URL shortener
send_parser.add_argument("-S", "--short", default=False, action="store_true", help="use URL shortener")
send_parser.add_argument("--short-api", default=argparse.SUPPRESS, action="store", choices=["tinyurl", "clckru", "isgd", "vgd", "cuttly", "yourls"], help="API used by shortener service")
send_parser.add_argument("--short-url", default=argparse.SUPPRESS, help="URL of shortener service API")
send_parser.add_argument("--short-user", default=argparse.SUPPRESS, help="Shortener username")
send_parser.add_argument("--short-pass", default=argparse.SUPPRESS, help="Shortener password")
send_parser.add_argument("--short-token", default=argparse.SUPPRESS, help="Shortener token")
# Connection options
send_parser.add_argument("-s", "--server", default=argparse.SUPPRESS, help="PrivateBin service URL (default: https://paste.i2pd.xyz/)")
send_parser.add_argument("-x", "--proxy", default=argparse.SUPPRESS, help="Proxy server address (default: None)")
send_parser.add_argument("--no-check-certificate", default=False, action="store_true", help="disable certificate validation")
send_parser.add_argument("--no-insecure-warning", default=False, action="store_true", help="suppress InsecureRequestWarning (only with --no-check-certificate)") send_parser.add_argument("--no-insecure-warning", default=False, action="store_true", help="suppress InsecureRequestWarning (only with --no-check-certificate)")
#
send_parser.add_argument("-d", "--debug", default=False, action="store_true", help="enable debug") send_parser.add_argument("-d", "--debug", default=False, action="store_true", help="enable debug")
send_parser.add_argument("--dry", default=False, action="store_true", help="invoke dry run") send_parser.add_argument("--dry", default=False, action="store_true", help="invoke dry run")
send_parser.add_argument("stdin", help="input paste text from stdin", nargs="?", type=argparse.FileType("r"), default=sys.stdin) send_parser.add_argument("stdin", help="input paste text from stdin", nargs="?", type=argparse.FileType("r"), default=sys.stdin)
@@ -47,7 +58,7 @@ def main():
get_parser = subparsers.add_parser("get", description="Get data from PrivateBin instance") get_parser = subparsers.add_parser("get", description="Get data from PrivateBin instance")
get_parser.add_argument("pasteinfo", help="example: aabb#cccddd") get_parser.add_argument("pasteinfo", help="example: aabb#cccddd")
get_parser.add_argument("-p", "--password", help="password for decrypting paste") get_parser.add_argument("-p", "--password", help="password for decrypting paste")
get_parser.add_argument("--no-check-certificate", default=True, action="store_false", help="disable certificate validation") get_parser.add_argument("--no-check-certificate", default=False, action="store_true", help="disable certificate validation")
get_parser.add_argument("--no-insecure-warning", default=False, action="store_true", help="suppress InsecureRequestWarning (only with --no-check-certificate)") get_parser.add_argument("--no-insecure-warning", default=False, action="store_true", help="suppress InsecureRequestWarning (only with --no-check-certificate)")
get_parser.add_argument("-d", "--debug", default=False, action="store_true", help="enable debug") get_parser.add_argument("-d", "--debug", default=False, action="store_true", help="enable debug")
get_parser.set_defaults(func=pbincli.actions.get) get_parser.set_defaults(func=pbincli.actions.get)
@@ -56,7 +67,7 @@ def main():
delete_parser = subparsers.add_parser("delete", description="Delete paste from PrivateBin instance using token") delete_parser = subparsers.add_parser("delete", description="Delete paste from PrivateBin instance using token")
delete_parser.add_argument("-p", "--paste", required=True, help="paste id") delete_parser.add_argument("-p", "--paste", required=True, help="paste id")
delete_parser.add_argument("-t", "--token", required=True, help="paste deletion token") delete_parser.add_argument("-t", "--token", required=True, help="paste deletion token")
delete_parser.add_argument("--no-check-certificate", default=True, action="store_false", help="disable certificate validation") delete_parser.add_argument("--no-check-certificate", default=False, action="store_true", help="disable certificate validation")
delete_parser.add_argument("--no-insecure-warning", default=False, action="store_true", help="suppress InsecureRequestWarning (only with --no-check-certificate)") delete_parser.add_argument("--no-insecure-warning", default=False, action="store_true", help="suppress InsecureRequestWarning (only with --no-check-certificate)")
delete_parser.add_argument("-d", "--debug", default=False, action="store_true", help="enable debug") delete_parser.add_argument("-d", "--debug", default=False, action="store_true", help="enable debug")
delete_parser.set_defaults(func=pbincli.actions.delete) delete_parser.set_defaults(func=pbincli.actions.delete)
@@ -65,10 +76,23 @@ def main():
args = parser.parse_args() args = parser.parse_args()
CONFIG = { CONFIG = {
"server": "https://paste.i2pd.xyz/", 'server': 'https://paste.i2pd.xyz/',
"proxy": None 'proxy': None,
'short_api': None,
'short_url': None,
'short_user': None,
'short_pass': None,
'short_token': None,
'no_check_certificate': False,
'no_insecure_warning': False
} }
# Configuration preference order:
# 1. Command line switches
# 2. Environment variables
# 3. Configuration file
# 4. Default values below
for p in CONFIG_PATHS: for p in CONFIG_PATHS:
if os.path.exists(p): if os.path.exists(p):
CONFIG.update(read_config(p)) CONFIG.update(read_config(p))
@@ -77,21 +101,21 @@ def main():
for key in CONFIG.keys(): for key in CONFIG.keys():
var = "PRIVATEBIN_{}".format(key.upper()) var = "PRIVATEBIN_{}".format(key.upper())
if var in os.environ: CONFIG[key] = os.getenv(var) if var in os.environ: CONFIG[key] = os.getenv(var)
# values from command line switches are preferred
args_var = vars(args)
if key in args_var:
CONFIG[key] = args_var[key]
SETTINGS = { # Re-validate PrivateBin instance URL
"proxy": CONFIG["proxy"], CONFIG['server'] = validate_url(CONFIG['server'])
"nocheckcert": args.no_check_certificate,
"noinsecurewarn": args.no_insecure_warning
}
api_client = PrivateBin(CONFIG["server"], settings=SETTINGS) api_client = PrivateBin(CONFIG)
if hasattr(args, "func"): if hasattr(args, "func"):
try: try:
args.func(args, api_client) args.func(args, api_client, settings=CONFIG)
except PBinCLIException as pe: except PBinCLIException as pe:
print("PBinCLI error: {}".format(pe)) raise PBinCLIException("error: {}".format(pe))
sys.exit(1)
else: else:
parser.print_help() parser.print_help()

View File

@@ -1,7 +1,7 @@
from Crypto.Random import get_random_bytes from Crypto.Random import get_random_bytes
from Crypto.Cipher import AES from Crypto.Cipher import AES
from base64 import b64encode, b64decode from base64 import b64encode, b64decode
from pbincli.utils import PBinCLIException from pbincli.utils import PBinCLIError
import zlib import zlib
CIPHER_ITERATION_COUNT = 100000 CIPHER_ITERATION_COUNT = 100000
@@ -143,7 +143,7 @@ class Paste:
elif self._version == 1: elif self._version == 1:
return zlib.decompress(bytearray(map(lambda c:ord(c)&255, b64decode(s.encode('utf-8')).decode('utf-8'))), -zlib.MAX_WBITS) return zlib.decompress(bytearray(map(lambda c:ord(c)&255, b64decode(s.encode('utf-8')).decode('utf-8'))), -zlib.MAX_WBITS)
else: else:
raise PBinCLIException('Unknown compression type provided in paste!') PBinCLIError('Unknown compression type provided in paste!')
def __compress(self, s): def __compress(self, s):
@@ -160,115 +160,131 @@ class Paste:
b = co.compress(s) + co.flush() b = co.compress(s) + co.flush()
return b64encode(''.join(map(chr, b)).encode('utf-8')) return b64encode(''.join(map(chr, b)).encode('utf-8'))
else: else:
raise PBinCLIException('Unknown compression type provided!') PBinCLIError('Unknown compression type provided!')
def decrypt(self): def decrypt(self):
# that is wrapper which running needed function regrading to paste version
if self._version == 2: self._decryptV2()
else: self._decryptV1()
def _decryptV2(self):
from json import loads as json_decode
iv = b64decode(self._data['adata'][0][0])
salt = b64decode(self._data['adata'][0][1])
key = self.__deriveKey(salt)
# Get compression type from received paste
self._compression = self._data['adata'][0][7]
cipher = self.__initializeCipher(key, iv, self._data['adata'])
# Cut the cipher text into message and tag
cipher_text_tag = b64decode(self._data['ct'])
cipher_text = cipher_text_tag[:-CIPHER_TAG_BYTES]
cipher_tag = cipher_text_tag[-CIPHER_TAG_BYTES:]
cipher_message = json_decode(self.__decompress(cipher.decrypt_and_verify(cipher_text, cipher_tag)).decode())
self._text = cipher_message['paste'].encode()
if 'attachment' in cipher_message and 'attachment_name' in cipher_message:
self._attachment = cipher_message['attachment']
self._attachment_name = cipher_message['attachment_name']
def _decryptV1(self):
from sjcl import SJCL
from json import loads as json_decode from json import loads as json_decode
if self._version == 2: password = self.__preparePassKey()
iv = b64decode(self._data['adata'][0][0]) cipher_text = json_decode(self._data['data'])
salt = b64decode(self._data['adata'][0][1]) if self._debug: print("Text:\t{}\n".format(cipher_text))
key = self.__deriveKey(salt)
# Get compression type from received paste text = SJCL().decrypt(cipher_text, password)
self._compression = self._data['adata'][0][7]
cipher = self.__initializeCipher(key, iv, self._data['adata']) if len(text):
# Cut the cipher text into message and tag if self._debug: print("Decoded Text:\t{}\n".format(text))
cipher_text_tag = b64decode(self._data['ct']) self._text = self.__decompress(text.decode())
cipher_text = cipher_text_tag[:-CIPHER_TAG_BYTES]
cipher_tag = cipher_text_tag[-CIPHER_TAG_BYTES:]
cipher_message = json_decode(self.__decompress(cipher.decrypt_and_verify(cipher_text, cipher_tag)).decode())
self._text = cipher_message['paste'].encode() if 'attachment' in self._data and 'attachmentname' in self._data:
cipherfile = json_decode(self._data['attachment'])
cipherfilename = json_decode(self._data['attachmentname'])
if 'attachment' in cipher_message and 'attachment_name' in cipher_message: if self._debug: print("Name:\t{}\nData:\t{}".format(cipherfilename, cipherfile))
self._attachment = cipher_message['attachment']
self._attachment_name = cipher_message['attachment_name']
else:
from sjcl import SJCL
password = self.__preparePassKey() attachment = SJCL().decrypt(cipherfile, password)
attachmentname = SJCL().decrypt(cipherfilename, password)
cipher_text = json_decode(self._data['data']) self._attachment = self.__decompress(attachment.decode('utf-8')).decode('utf-8')
self._attachment_name = self.__decompress(attachmentname.decode('utf-8')).decode('utf-8')
if self._debug: print("Text:\t{}\n".format(cipher_text))
text = SJCL().decrypt(cipher_text, password)
if len(text):
if self._debug: print("Decoded Text:\t{}\n".format(text))
self._text = self.__decompress(text.decode())
if 'attachment' in self._data and 'attachmentname' in self._data:
cipherfile = json_decode(self._data['attachment'])
cipherfilename = json_decode(self._data['attachmentname'])
if self._debug: print("Name:\t{}\nData:\t{}".format(cipherfilename, cipherfile))
attachment = SJCL().decrypt(cipherfile, password)
attachmentname = SJCL().decrypt(cipherfilename, password)
self._attachment = self.__decompress(attachment.decode('utf-8')).decode('utf-8')
self._attachment_name = self.__decompress(attachmentname.decode('utf-8')).decode('utf-8')
def encrypt(self, formatter, burnafterreading, discussion, expiration): def encrypt(self, formatter, burnafterreading, discussion, expiration):
# that is wrapper which running needed function regrading to paste version
self._formatter = formatter
self._burnafterreading = burnafterreading
self._discussion = discussion
self._expiration = expiration
if self._version == 2: self._encryptV2()
else: self._encryptV1()
def _encryptV2(self):
from pbincli.utils import json_encode from pbincli.utils import json_encode
if self._version == 2:
iv = get_random_bytes(CIPHER_TAG_BYTES)
salt = get_random_bytes(CIPHER_SALT_BYTES)
key = self.__deriveKey(salt)
# prepare encryption authenticated data and message iv = get_random_bytes(CIPHER_TAG_BYTES)
adata = [ salt = get_random_bytes(CIPHER_SALT_BYTES)
[ key = self.__deriveKey(salt)
b64encode(iv).decode(),
b64encode(salt).decode(),
CIPHER_ITERATION_COUNT,
CIPHER_BLOCK_BITS,
CIPHER_TAG_BITS,
'aes',
'gcm',
self._compression
],
formatter,
int(burnafterreading),
int(discussion)
]
cipher_message = {'paste':self._text}
if self._attachment:
cipher_message['attachment'] = self._attachment
cipher_message['attachment_name'] = self._attachment_name
cipher = self.__initializeCipher(key, iv, adata) # prepare encryption authenticated data and message
ciphertext, tag = cipher.encrypt_and_digest(self.__compress(json_encode(cipher_message))) adata = [
[
b64encode(iv).decode(),
b64encode(salt).decode(),
CIPHER_ITERATION_COUNT,
CIPHER_BLOCK_BITS,
CIPHER_TAG_BITS,
'aes',
'gcm',
self._compression
],
self._formatter,
int(self._discussion),
int(self._burnafterreading)
]
cipher_message = {'paste':self._text}
if self._attachment:
cipher_message['attachment'] = self._attachment
cipher_message['attachment_name'] = self._attachment_name
self._data = {'v':2,'adata':adata,'ct':b64encode(ciphertext + tag).decode(),'meta':{'expire':expiration}} cipher = self.__initializeCipher(key, iv, adata)
ciphertext, tag = cipher.encrypt_and_digest(self.__compress(json_encode(cipher_message)))
else: self._data = {'v':2,'adata':adata,'ct':b64encode(ciphertext + tag).decode(),'meta':{'expire':self._expiration}}
from sjcl import SJCL
self._data = {'expire':expiration,'formatter':formatter,'burnafterreading':int(burnafterreading),'opendiscussion':int(discussion)}
password = self.__preparePassKey() def _encryptV1(self):
from sjcl import SJCL
from pbincli.utils import json_encode
if self._debug: print("Password:\t{}".format(password)) self._data = {'expire':self._expiration,'formatter':self._formatter,'burnafterreading':int(self._burnafterreading),'opendiscussion':int(self._discussion)}
# Encrypting text password = self.__preparePassKey()
cipher = SJCL().encrypt(self.__compress(self._text.encode('utf-8')), password, mode='gcm') if self._debug: print("Password:\t{}".format(password))
for k in ['salt', 'iv', 'ct']: cipher[k] = cipher[k].decode()
self._data['data'] = json_encode(cipher) # Encrypting text
cipher = SJCL().encrypt(self.__compress(self._text.encode('utf-8')), password, mode='gcm')
for k in ['salt', 'iv', 'ct']: cipher[k] = cipher[k].decode()
if self._attachment: self._data['data'] = json_encode(cipher)
cipherfile = SJCL().encrypt(self.__compress(self._attachment.encode('utf-8')), password, mode='gcm')
for k in ['salt', 'iv', 'ct']: cipherfile[k] = cipherfile[k].decode()
cipherfilename = SJCL().encrypt(self.__compress(self._attachment_name.encode('utf-8')), password, mode='gcm') if self._attachment:
for k in ['salt', 'iv', 'ct']: cipherfilename[k] = cipherfilename[k].decode() cipherfile = SJCL().encrypt(self.__compress(self._attachment.encode('utf-8')), password, mode='gcm')
for k in ['salt', 'iv', 'ct']: cipherfile[k] = cipherfile[k].decode()
self._data['attachment'] = json_encode(cipherfile) cipherfilename = SJCL().encrypt(self.__compress(self._attachment_name.encode('utf-8')), password, mode='gcm')
self._data['attachmentname'] = json_encode(cipherfilename) for k in ['salt', 'iv', 'ct']: cipherfilename[k] = cipherfilename[k].decode()
self._data['attachment'] = json_encode(cipherfile)
self._data['attachmentname'] = json_encode(cipherfilename)

View File

@@ -1,9 +1,14 @@
import json, ntpath, os import json, ntpath, os, sys
class PBinCLIException(Exception): class PBinCLIException(Exception):
pass pass
def PBinCLIError(message):
print("PBinCLI Error: {}".format(message), file=sys.stderr)
exit(1)
def path_leaf(path): def path_leaf(path):
head, tail = ntpath.split(path) head, tail = ntpath.split(path)
return tail or ntpath.basename(head) return tail or ntpath.basename(head)
@@ -12,14 +17,20 @@ def path_leaf(path):
def check_readable(f): def check_readable(f):
# Checks if path exists and readable # Checks if path exists and readable
if not os.path.exists(f) or not os.access(f, os.R_OK): if not os.path.exists(f) or not os.access(f, os.R_OK):
raise PBinCLIException("Error accessing path: {}".format(f)) PBinCLIError("Error accessing path: {}".format(f))
def check_writable(f): def check_writable(f):
# Checks if path is writable # Checks if path is writable
if not os.access(os.path.dirname(f) or ".", os.W_OK): if not os.access(os.path.dirname(f) or ".", os.W_OK):
raise PBinCLIException("Path is not writable: {}".format(f)) PBinCLIError("Path is not writable: {}".format(f))
def json_encode(s): def json_encode(s):
return json.dumps(s, separators=(',',':')).encode() return json.dumps(s, separators=(',',':')).encode()
def validate_url(s):
if not s.endswith('/'):
s = s + "/"
return s