32 Commits

Author SHA1 Message Date
dependabot-preview[bot]
051f259fa7 Upgrade to GitHub-native Dependabot 2021-04-29 21:00:42 +00:00
r4sas
94023a986d remove unused exception variable
Signed-off-by: r4sas <r4sas@i2pmail.org>
2020-06-22 11:33:34 +00:00
r4sas
5909e7330b validate config file for empty lines (closes #24)
Signed-off-by: r4sas <r4sas@i2pmail.org>
2020-06-22 11:22:21 +00:00
r4sas
d0f23094bc update gitignore
Signed-off-by: r4sas <r4sas@i2pmail.org>
2020-06-19 14:35:51 +00:00
r4sas
86061e595c do not calculate tag size twice
Signed-off-by: r4sas <r4sas@i2pmail.org>
2020-06-04 15:31:36 +00:00
r4sas
49362c70a6 get encryption options from paste (closes #22)
Signed-off-by: r4sas <r4sas@i2pmail.org>
2020-06-04 15:29:04 +00:00
r4sas
70328903fa include license and requirements in source archive
Signed-off-by: r4sas <r4sas@i2pmail.org>
2020-04-11 17:59:19 +00:00
r4sas
29498b9315 make codacy happy
Signed-off-by: r4sas <r4sas@i2pmail.org>
2020-01-07 00:58:19 +00:00
r4sas
19f130feb1 add launch arguments for get and delete, check if pycrypto used
Signed-off-by: r4sas <r4sas@i2pmail.org>
2020-01-06 18:22:11 +00:00
r4sas
682b47fbd3 0.3.0
New:
* Added validation of PrivateBin instance URL - #18 (it must contain trailing slash because POST is used)
* URL shortener support with various supported services - #19
* Shortener configuration, certificate validation and insecure warning settings can be configured in config file or via env

Changed:
* Restructured some parts of code by splitting big code chunks in funtions (encrypt/decrypt)
* Rework error messaging repeatable code (moved in utils)
* Reduce code duplication (requests session configuring)

Signed-off-by: r4sas <r4sas@i2pmail.org>
2019-09-20 10:50:06 +00:00
r4sas
9d82c727b6 [shortener] move service-depend init code to functions
Signed-off-by: r4sas <r4sas@i2pmail.org>
2019-09-20 06:12:10 +00:00
r4sas
c425d86ed6 [shortener] remove not completed bitly support code
Anyone can request to add support later, so for now I removing that code due to lack of use.

Signed-off-by: r4sas <r4sas@i2pmail.org>
2019-09-20 06:00:08 +00:00
r4sas
3ed06686ab Codacy: fix W0106
Signed-off-by: r4sas <r4sas@i2pmail.org>
2019-09-19 00:10:39 +00:00
r4sas
7e4fb0a8c4 split encrypt and decrypt code in separated functions by paste version
Signed-off-by: r4sas <r4sas@i2pmail.org>
2019-09-18 23:48:46 +00:00
r4sas
ebfe0c48a0 [shortener] separate services related code in functions
Signed-off-by: r4sas <r4sas@i2pmail.org>
2019-09-18 23:22:56 +00:00
r4sas
45d854e590 [shortener] add is.gd, v.gd and cutt.ly services support
Start realisation of bitly support.

Signed-off-by: r4sas <r4sas@i2pmail.org>
2019-09-18 21:09:16 +00:00
r4sas
18d79c8e04 add .gitattributes
Signed-off-by: r4sas <r4sas@i2pmail.org>
2019-09-18 19:07:15 +00:00
r4sas
92c38344e3 add tinyurl support
Signed-off-by: r4sas <r4sas@i2pmail.org>
2019-09-18 19:01:41 +00:00
r4sas
b596f42b7e fix response handler (closes #19)
Signed-off-by: r4sas <r4sas@i2pmail.org>
2019-09-18 13:46:30 +00:00
r4sas
86c5051fcf move 'requests' configuration code in separate function
Signed-off-by: r4sas <r4sas@i2pmail.org>
2019-09-18 12:02:41 +00:00
r4sas
f838f8ee94 fix codacy issues
Signed-off-by: r4sas <r4sas@i2pmail.org>
2019-09-18 11:43:44 +00:00
r4sas
fb7a93732d rewrite yourls response handler (#19)
Signed-off-by: r4sas <r4sas@i2pmail.org>
2019-09-18 11:27:15 +00:00
r4sas
181763070c remove unused import, replace comment definition
Signed-off-by: r4sas <r4sas@i2pmail.org>
2019-09-18 10:22:58 +00:00
r4sas
c3a491ac46 correct response store for yourls (#19)
Signed-off-by: r4sas <r4sas@i2pmail.org>
2019-09-18 10:14:44 +00:00
r4sas
641c55a6a2 fix dict merging (#19)
Signed-off-by: r4sas <r4sas@i2pmail.org>
2019-09-17 13:22:28 +00:00
r4sas
432675f2e6 url shortener support (#19)
Currently tested only with clck.ru service. YOURLS test needed.

Signed-off-by: r4sas <r4sas@i2pmail.org>
2019-09-17 10:54:58 +00:00
r4sas
635c87dabd [wip] url shortener support (#19)
Signed-off-by: r4sas <r4sas@i2pmail.org>
2019-09-17 08:55:59 +00:00
r4sas
6b6c33e545 [wip] url shortener support (#19)
Signed-off-by: r4sas <r4sas@i2pmail.org>
2019-09-17 05:22:57 +00:00
r4sas
7c5ba2fdbe add URL validation for trailing slash (closes #18)
Signed-off-by: r4sas <r4sas@i2pmail.org>
2019-09-09 14:23:30 +00:00
r4sas
692335ee62 0.2.1
Signed-off-by: r4sas <r4sas@i2pmail.org>
2019-08-16 21:24:52 +00:00
R4SAS
c63256c628 Merge pull request #16 from firecat53/patch-1
Fix swapped "open discussion" and "burn after reading" options order
2019-08-17 00:12:49 +03:00
Scott Hansen
66659da66d Update format.py
The --burn and --discuss flags were switched. This change fixes that.
2019-08-16 12:59:36 -07:00
12 changed files with 470 additions and 159 deletions

3
.gitattributes vendored Normal file
View File

@@ -0,0 +1,3 @@
.gitattributes export-ignore
.gitignore export-ignore
README.md export-ignore

8
.github/dependabot.yml vendored Normal file
View File

@@ -0,0 +1,8 @@
version: 2
updates:
- package-ecosystem: pip
directory: "/"
schedule:
interval: daily
time: "17:00"
open-pull-requests-limit: 10

3
.gitignore vendored
View File

@@ -131,3 +131,6 @@ venv.bak/
# PyCharm project settings # PyCharm project settings
.idea .idea
# PBinCLI downloaded paste text
paste-*.txt

3
MANIFEST.in Normal file
View File

@@ -0,0 +1,3 @@
include LICENSE
include README.rst
include requirements.txt

View File

@@ -2,6 +2,6 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
__author__ = "R4SAS <r4sas@i2pmail.org>" __author__ = "R4SAS <r4sas@i2pmail.org>"
__version__ = "0.2" __version__ = "0.3.1"
__copyright__ = "Copyright (c) R4SAS" __copyright__ = "Copyright (c) R4SAS"
__license__ = "MIT" __license__ = "MIT"

View File

@@ -1,14 +1,26 @@
from pbincli.format import Paste from pbincli.format import Paste
from pbincli.utils import PBinCLIError
import signal
def signal_handler(sig, frame):
print('Keyboard interrupt received, terminating...')
exit(0)
signal.signal(signal.SIGINT, signal_handler)
def send(args, api_client, settings=None):
from pbincli.api import Shortener
if args.short:
shortener = Shortener(settings)
def send(args, api_client):
if not args.notext: if not args.notext:
if args.text: if args.text:
text = args.text text = args.text
elif args.stdin: elif args.stdin:
text = args.stdin.read() text = args.stdin.read()
elif not args.file: elif not args.file:
print("Nothing to send!") PBinCLIError("Nothing to send!")
exit(1)
else: else:
text = "" text = ""
@@ -60,29 +72,32 @@ def send(args, api_client):
result['id'], result['id'],
passphrase, passphrase,
result['deletetoken'], result['deletetoken'],
api_client.server, settings['server'],
result['id'], result['id'],
passphrase)) passphrase))
elif result['status']: # return code is other then zero elif result['status']: # return code is other then zero
print("Something went wrong...\nError:\t\t{}".format(result['message'])) PBinCLIError("Something went wrong...\nError:\t\t{}".format(result['message']))
exit(1)
else: # or here no status field in response or it is empty else: # or here no status field in response or it is empty
print("Something went wrong...\nError: Empty response.") PBinCLIError("Something went wrong...\nError: Empty response.")
exit(1)
if args.short:
print("\nQuerying URL shortening service...")
shortener.getlink("{}?{}#{}".format(
settings['server'],
result['id'],
passphrase))
def get(args, api_client): def get(args, api_client, settings=None):
from pbincli.utils import check_writable, json_encode from pbincli.utils import check_writable, json_encode
try: try:
pasteid, passphrase = args.pasteinfo.split("#") pasteid, passphrase = args.pasteinfo.split("#")
except ValueError: except ValueError:
print("PBinCLI error: provided info hasn't contain valid PasteID#Passphrase string") PBinCLIError("Provided info hasn't contain valid PasteID#Passphrase string")
exit(1)
if not (pasteid and passphrase): if not (pasteid and passphrase):
print("PBinCLI error: Incorrect request") PBinCLIError("Incorrect request")
exit(1)
if args.debug: print("PasteID:\t{}\nPassphrase:\t{}".format(pasteid, passphrase)) if args.debug: print("PasteID:\t{}\nPassphrase:\t{}".format(pasteid, passphrase))
@@ -139,14 +154,12 @@ def get(args, api_client):
api_client.delete(json_encode({'pasteid':pasteid,'deletetoken':'burnafterreading'})) api_client.delete(json_encode({'pasteid':pasteid,'deletetoken':'burnafterreading'}))
elif result['status']: # return code is other then zero elif result['status']: # return code is other then zero
print("Something went wrong...\nError:\t\t{}".format(result['message'])) PBinCLIError("Something went wrong...\nError:\t\t{}".format(result['message']))
exit(1)
else: # or here no status field in response or it is empty else: # or here no status field in response or it is empty
print("Something went wrong...\nError: Empty response.") PBinCLIError("Something went wrong...\nError: Empty response.")
exit(1)
def delete(args, api_client): def delete(args, api_client, settings=None):
from pbincli.utils import json_encode from pbincli.utils import json_encode
pasteid = args.paste pasteid = args.paste

View File

@@ -1,21 +1,29 @@
import requests import requests
from requests import HTTPError
from pbincli.utils import PBinCLIError
def _config_requests(settings=None):
if settings['proxy']:
proxy = {settings['proxy'].split('://')[0]: settings['proxy']}
else:
proxy = {}
if settings['no_insecure_warning']:
from requests.packages.urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
session = requests.Session()
session.verify = not settings['no_check_certificate']
return session, proxy
class PrivateBin: class PrivateBin:
def __init__(self, server, settings=None): def __init__(self, settings=None):
self.server = server self.server = settings['server']
self.headers = {'X-Requested-With': 'JSONHttpRequest'} self.headers = {'X-Requested-With': 'JSONHttpRequest'}
if settings['proxy']: self.session, self.proxy = _config_requests(settings)
self.proxy = {settings['proxy'].split('://')[0]: settings['proxy']}
else:
self.proxy = {}
if settings['noinsecurewarn']:
from requests.packages.urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
self.session = requests.Session()
self.session.verify = settings['nocheckcert']
def post(self, request): def post(self, request):
result = self.session.post( result = self.session.post(
@@ -27,8 +35,7 @@ class PrivateBin:
try: try:
return result.json() return result.json()
except ValueError: except ValueError:
print("ERROR: Unable parse response as json. Received (size = {}):\n{}".format(len(result.text), result.text)) PBinCLIError("Unable parse response as json. Received (size = {}):\n{}".format(len(result.text), result.text))
exit(1)
def get(self, request): def get(self, request):
@@ -56,11 +63,9 @@ class PrivateBin:
if not result['status']: if not result['status']:
print("Paste successfully deleted!") print("Paste successfully deleted!")
elif result['status']: elif result['status']:
print("Something went wrong...\nError:\t\t{}".format(result['message'])) PBinCLIError("Something went wrong...\nError:\t\t{}".format(result['message']))
exit(1)
else: else:
print("Something went wrong...\nError: Empty response.") PBinCLIError("Something went wrong...\nError: Empty response.")
exit(1)
def getVersion(self): def getVersion(self):
@@ -72,3 +77,172 @@ class PrivateBin:
'v' in jsonldSchema['@context'] and 'v' in jsonldSchema['@context'] and
'@value' in jsonldSchema['@context']['v']) \ '@value' in jsonldSchema['@context']['v']) \
else 1 else 1
class Shortener:
"""Some parts of this class was taken from
python-yourls (https://github.com/tflink/python-yourls/) library
"""
def __init__(self, settings=None):
self.api = settings['short_api']
# we checking which service is used, because some services doesn't require
# any authentication, or have only one domain on which it working
if self.api == 'yourls':
self._yourls_init(settings)
elif self.api == 'isgd' or self.api == 'vgd':
self._gd_init()
self.session, self.proxy = _config_requests(settings)
def _yourls_init(self, settings):
if not settings['short_url']:
PBinCLIError("YOURLS: An API URL is required")
# setting API URL
apiurl = settings['short_url']
if apiurl.endswith('/yourls-api.php'):
self.apiurl = apiurl
elif apiurl.endswith('/'):
self.apiurl = apiurl + 'yourls-api.php'
else:
PBinCLIError("YOURLS: Incorrect URL is provided.\n" +
"It must contain full address to 'yourls-api.php' script (like https://example.com/yourls-api.php)\n" +
"or just contain instance URL with '/' at the end (like https://example.com/)")
# validating for required credentials
if settings['short_user'] and settings['short_pass'] and settings['short_token'] is None:
self.auth_args = {'username': settings['short_user'], 'password': settings['short_pass']}
elif settings['short_user'] is None and settings['short_pass'] is None and settings['short_token']:
self.auth_args = {'signature': settings['short_token']}
elif settings['short_user'] is None and settings['short_pass'] is None and settings['short_token'] is None:
self.auth_args = {}
else:
PBinCLIError("YOURLS: either username and password or token are required. Otherwise set to default (None)")
def _gd_init(self):
if self.api == 'isgd':
self.apiurl = 'https://is.gd/'
else:
self.apiurl = 'https://v.gd/'
self.useragent = 'Mozilla/5.0 (compatible; pbincli - https://github.com/r4sas/pbincli/)'
def getlink(self, url):
# that is api -> function mapper for running service-related function when getlink() used
servicesList = {
'yourls': self._yourls,
'clckru': self._clckru,
'tinyurl': self._tinyurl,
'isgd': self._gd,
'vgd': self._gd,
'cuttly': self._cuttly
}
# run function selected by choosen API
servicesList[self.api](url)
def _yourls(self,url):
request = {'action': 'shorturl', 'format': 'json', 'url': url}
request.update(self.auth_args)
result = self.session.post(
url = self.apiurl,
proxies = self.proxy,
data = request)
try:
result.raise_for_status()
except HTTPError:
try:
response = result.json()
except ValueError:
PBinCLIError("YOURLS: Unable parse response. Received (size = {}):\n{}".format(len(result.text), result.text))
else:
PBinCLIError("YOURLS: Received error from API: {} with JSON {}".format(result, response))
else:
response = result.json()
if {'status', 'statusCode', 'message'} <= set(response.keys()):
if response['status'] == 'fail':
PBinCLIError("YOURLS: Received error from API: {}".format(response['message']))
if not 'shorturl' in response:
PBinCLIError("YOURLS: Unknown error: {}".format(response['message']))
else:
print("Short Link:\t{}".format(response['shorturl']))
else:
PBinCLIError("YOURLS: No status, statusCode or message fields in response! Received:\n{}".format(response))
def _clckru(self, url):
request = {'url': url}
try:
result = self.session.post(
url = "https://clck.ru/--",
proxies = self.proxy,
data = request)
print("Short Link:\t{}".format(result.text))
except Exception as ex:
PBinCLIError("clck.ru: unexcepted behavior: {}".format(ex))
def _tinyurl(self, url):
request = {'url': url}
try:
result = self.session.post(
url = "https://tinyurl.com/api-create.php",
proxies = self.proxy,
data = request)
print("Short Link:\t{}".format(result.text))
except Exception as ex:
PBinCLIError("TinyURL: unexcepted behavior: {}".format(ex))
def _gd(self, url):
request = {
'format': 'json',
'url': url,
'logstats': 0 # we don't want use any statistics
}
headers = { 'User-Agent': self.useragent}
try:
result = self.session.post(
url = self.apiurl + "create.php",
headers = headers,
proxies = self.proxy,
data = request)
response = result.json()
if 'shorturl' in response:
print("Short Link:\t{}".format(response['shorturl']))
else:
PBinCLIError("{}: got error {} from API: {}".format(
"is.gd" if self.api == 'isgd' else 'v.gd',
response['errorcode'],
response['errormessage']))
except Exception as ex:
PBinCLIError("{}: unexcepted behavior: {}".format(
"is.gd" if self.api == 'isgd' else 'v.gd',
ex))
def _cuttly(self, url):
request = {
'url': url,
'domain': 0
}
try:
result = self.session.post(
url = "https://cutt.ly/scripts/shortenUrl.php",
proxies = self.proxy,
data = request)
print("Short Link:\t{}".format(result.text))
except Exception as ex:
PBinCLIError("cutt.ly: unexcepted behavior: {}".format(ex))

View File

@@ -3,7 +3,7 @@ import os, sys, argparse
import pbincli.actions import pbincli.actions
from pbincli.api import PrivateBin from pbincli.api import PrivateBin
from pbincli.utils import PBinCLIException from pbincli.utils import PBinCLIException, PBinCLIError, validate_url
CONFIG_PATHS = [os.path.join(".", "pbincli.conf", ), CONFIG_PATHS = [os.path.join(".", "pbincli.conf", ),
os.path.join(os.getenv("HOME") or "~", ".config", "pbincli", "pbincli.conf") ] os.path.join(os.getenv("HOME") or "~", ".config", "pbincli", "pbincli.conf") ]
@@ -13,8 +13,13 @@ def read_config(filename):
settings = {} settings = {}
with open(filename) as f: with open(filename) as f:
for l in f.readlines(): for l in f.readlines():
key, value = l.strip().split("=") if len(l.strip()) == 0:
settings[key.strip()] = value.strip() continue
try:
key, value = l.strip().split("=")
settings[key.strip()] = value.strip()
except ValueError:
PBinCLIError("Unable to parse config file, please check it for errors.")
return settings return settings
@@ -36,8 +41,21 @@ def main():
send_parser.add_argument("-q", "--notext", default=False, action="store_true", help="don't send text in paste") send_parser.add_argument("-q", "--notext", default=False, action="store_true", help="don't send text in paste")
send_parser.add_argument("-c", "--compression", default="zlib", action="store", send_parser.add_argument("-c", "--compression", default="zlib", action="store",
choices=["zlib", "none"], help="set compression for paste (default: zlib). Note: works only on v2 paste format") choices=["zlib", "none"], help="set compression for paste (default: zlib). Note: works only on v2 paste format")
send_parser.add_argument("--no-check-certificate", default=True, action="store_false", help="disable certificate validation") ## URL shortener
send_parser.add_argument("--no-insecure-warning", default=False, action="store_true", help="suppress InsecureRequestWarning (only with --no-check-certificate)") send_parser.add_argument("-S", "--short", default=False, action="store_true", help="use URL shortener")
send_parser.add_argument("--short-api", default=argparse.SUPPRESS, action="store",
choices=["tinyurl", "clckru", "isgd", "vgd", "cuttly", "yourls"], help="API used by shortener service")
send_parser.add_argument("--short-url", default=argparse.SUPPRESS, help="URL of shortener service API")
send_parser.add_argument("--short-user", default=argparse.SUPPRESS, help="Shortener username")
send_parser.add_argument("--short-pass", default=argparse.SUPPRESS, help="Shortener password")
send_parser.add_argument("--short-token", default=argparse.SUPPRESS, help="Shortener token")
## Connection options
send_parser.add_argument("-s", "--server", default=argparse.SUPPRESS, help="PrivateBin service URL (default: https://paste.i2pd.xyz/)")
send_parser.add_argument("-x", "--proxy", default=argparse.SUPPRESS, help="Proxy server address (default: None)")
send_parser.add_argument("--no-check-certificate", default=False, action="store_true", help="disable certificate validation")
send_parser.add_argument("--no-insecure-warning", default=False, action="store_true",
help="suppress InsecureRequestWarning (only with --no-check-certificate)")
##
send_parser.add_argument("-d", "--debug", default=False, action="store_true", help="enable debug") send_parser.add_argument("-d", "--debug", default=False, action="store_true", help="enable debug")
send_parser.add_argument("--dry", default=False, action="store_true", help="invoke dry run") send_parser.add_argument("--dry", default=False, action="store_true", help="invoke dry run")
send_parser.add_argument("stdin", help="input paste text from stdin", nargs="?", type=argparse.FileType("r"), default=sys.stdin) send_parser.add_argument("stdin", help="input paste text from stdin", nargs="?", type=argparse.FileType("r"), default=sys.stdin)
@@ -47,8 +65,13 @@ def main():
get_parser = subparsers.add_parser("get", description="Get data from PrivateBin instance") get_parser = subparsers.add_parser("get", description="Get data from PrivateBin instance")
get_parser.add_argument("pasteinfo", help="example: aabb#cccddd") get_parser.add_argument("pasteinfo", help="example: aabb#cccddd")
get_parser.add_argument("-p", "--password", help="password for decrypting paste") get_parser.add_argument("-p", "--password", help="password for decrypting paste")
get_parser.add_argument("--no-check-certificate", default=True, action="store_false", help="disable certificate validation") ## Connection options
get_parser.add_argument("--no-insecure-warning", default=False, action="store_true", help="suppress InsecureRequestWarning (only with --no-check-certificate)") get_parser.add_argument("-s", "--server", default=argparse.SUPPRESS, help="PrivateBin service URL (default: https://paste.i2pd.xyz/)")
get_parser.add_argument("-x", "--proxy", default=argparse.SUPPRESS, help="Proxy server address (default: None)")
get_parser.add_argument("--no-check-certificate", default=False, action="store_true", help="disable certificate validation")
get_parser.add_argument("--no-insecure-warning", default=False, action="store_true",
help="suppress InsecureRequestWarning (only with --no-check-certificate)")
##
get_parser.add_argument("-d", "--debug", default=False, action="store_true", help="enable debug") get_parser.add_argument("-d", "--debug", default=False, action="store_true", help="enable debug")
get_parser.set_defaults(func=pbincli.actions.get) get_parser.set_defaults(func=pbincli.actions.get)
@@ -56,8 +79,13 @@ def main():
delete_parser = subparsers.add_parser("delete", description="Delete paste from PrivateBin instance using token") delete_parser = subparsers.add_parser("delete", description="Delete paste from PrivateBin instance using token")
delete_parser.add_argument("-p", "--paste", required=True, help="paste id") delete_parser.add_argument("-p", "--paste", required=True, help="paste id")
delete_parser.add_argument("-t", "--token", required=True, help="paste deletion token") delete_parser.add_argument("-t", "--token", required=True, help="paste deletion token")
delete_parser.add_argument("--no-check-certificate", default=True, action="store_false", help="disable certificate validation") ## Connection options
delete_parser.add_argument("--no-insecure-warning", default=False, action="store_true", help="suppress InsecureRequestWarning (only with --no-check-certificate)") delete_parser.add_argument("-s", "--server", default=argparse.SUPPRESS, help="PrivateBin service URL (default: https://paste.i2pd.xyz/)")
delete_parser.add_argument("-x", "--proxy", default=argparse.SUPPRESS, help="Proxy server address (default: None)")
delete_parser.add_argument("--no-check-certificate", default=False, action="store_true", help="disable certificate validation")
delete_parser.add_argument("--no-insecure-warning", default=False, action="store_true",
help="suppress InsecureRequestWarning (only with --no-check-certificate)")
##
delete_parser.add_argument("-d", "--debug", default=False, action="store_true", help="enable debug") delete_parser.add_argument("-d", "--debug", default=False, action="store_true", help="enable debug")
delete_parser.set_defaults(func=pbincli.actions.delete) delete_parser.set_defaults(func=pbincli.actions.delete)
@@ -65,10 +93,23 @@ def main():
args = parser.parse_args() args = parser.parse_args()
CONFIG = { CONFIG = {
"server": "https://paste.i2pd.xyz/", 'server': 'https://paste.i2pd.xyz/',
"proxy": None 'proxy': None,
'short_api': None,
'short_url': None,
'short_user': None,
'short_pass': None,
'short_token': None,
'no_check_certificate': False,
'no_insecure_warning': False
} }
# Configuration preference order:
# 1. Command line switches
# 2. Environment variables
# 3. Configuration file
# 4. Default values below
for p in CONFIG_PATHS: for p in CONFIG_PATHS:
if os.path.exists(p): if os.path.exists(p):
CONFIG.update(read_config(p)) CONFIG.update(read_config(p))
@@ -77,21 +118,21 @@ def main():
for key in CONFIG.keys(): for key in CONFIG.keys():
var = "PRIVATEBIN_{}".format(key.upper()) var = "PRIVATEBIN_{}".format(key.upper())
if var in os.environ: CONFIG[key] = os.getenv(var) if var in os.environ: CONFIG[key] = os.getenv(var)
# values from command line switches are preferred
args_var = vars(args)
if key in args_var:
CONFIG[key] = args_var[key]
SETTINGS = { # Re-validate PrivateBin instance URL
"proxy": CONFIG["proxy"], CONFIG['server'] = validate_url(CONFIG['server'])
"nocheckcert": args.no_check_certificate,
"noinsecurewarn": args.no_insecure_warning
}
api_client = PrivateBin(CONFIG["server"], settings=SETTINGS) api_client = PrivateBin(CONFIG)
if hasattr(args, "func"): if hasattr(args, "func"):
try: try:
args.func(args, api_client) args.func(args, api_client, settings=CONFIG)
except PBinCLIException as pe: except PBinCLIException as pe:
print("PBinCLI error: {}".format(pe)) raise PBinCLIException("error: {}".format(pe))
sys.exit(1)
else: else:
parser.print_help() parser.print_help()

View File

@@ -1,15 +1,33 @@
from Crypto.Random import get_random_bytes
from Crypto.Cipher import AES
from base64 import b64encode, b64decode from base64 import b64encode, b64decode
from pbincli.utils import PBinCLIException from pbincli.utils import PBinCLIError
import zlib import zlib
# try import AES cipher and check if it has GCM mode (prevent usage of pycrypto)
try:
from Crypto.Cipher import AES
if not hasattr(AES, 'MODE_GCM'):
try:
from Cryptodome.Cipher import AES
from Cryptodome.Random import get_random_bytes
except ImportError:
PBinCLIError("AES GCM mode is not found in imported crypto module.\n" +
"That can happen if you have installed pycrypto.\n\n" +
"We tried to import pycryptodomex but it is not available.\n" +
"Please install it via pip, if you still need pycrypto, by running:\n" +
"\tpip install pycryptodomex\n" +
"... otherwise use separate python environment or uninstall pycrypto:\n" +
"\tpip uninstall pycrypto")
else:
from Crypto.Random import get_random_bytes
except ImportError:
PBinCLIError("Unable import pycryptodome")
CIPHER_ITERATION_COUNT = 100000 CIPHER_ITERATION_COUNT = 100000
CIPHER_SALT_BYTES = 8 CIPHER_SALT_BYTES = 8
CIPHER_BLOCK_BITS = 256 CIPHER_BLOCK_BITS = 256
CIPHER_BLOCK_BYTES = int(CIPHER_BLOCK_BITS/8) CIPHER_TAG_BITS = 128
CIPHER_TAG_BITS = int(CIPHER_BLOCK_BITS/2)
CIPHER_TAG_BYTES = int(CIPHER_TAG_BITS/8)
class Paste: class Paste:
def __init__(self, debug=False): def __init__(self, debug=False):
@@ -19,9 +37,13 @@ class Paste:
self._text = '' self._text = ''
self._attachment = '' self._attachment = ''
self._attachment_name = '' self._attachment_name = ''
self._key = get_random_bytes(CIPHER_BLOCK_BYTES)
self._password = '' self._password = ''
self._debug = debug self._debug = debug
self._iteration_count = CIPHER_ITERATION_COUNT
self._salt_bytes = CIPHER_SALT_BYTES
self._block_bits = CIPHER_BLOCK_BITS
self._tag_bits = CIPHER_TAG_BITS
self._key = get_random_bytes(int(self._block_bits / 8))
def setVersion(self, version): def setVersion(self, version):
@@ -105,8 +127,8 @@ class Paste:
return PBKDF2( return PBKDF2(
self._key + self._password.encode(), self._key + self._password.encode(),
salt, salt,
dkLen = CIPHER_BLOCK_BYTES, dkLen = int(self._block_bits / 8),
count = CIPHER_ITERATION_COUNT, count = self._iteration_count,
prf = lambda password, salt: HMAC.new( prf = lambda password, salt: HMAC.new(
password, password,
salt, salt,
@@ -115,10 +137,10 @@ class Paste:
@classmethod @classmethod
def __initializeCipher(self, key, iv, adata): def __initializeCipher(self, key, iv, adata, tagsize):
from pbincli.utils import json_encode from pbincli.utils import json_encode
cipher = AES.new(key, AES.MODE_GCM, nonce=iv, mac_len=CIPHER_TAG_BYTES) cipher = AES.new(key, AES.MODE_GCM, nonce=iv, mac_len=tagsize)
cipher.update(json_encode(adata)) cipher.update(json_encode(adata))
return cipher return cipher
@@ -143,7 +165,7 @@ class Paste:
elif self._version == 1: elif self._version == 1:
return zlib.decompress(bytearray(map(lambda c:ord(c)&255, b64decode(s.encode('utf-8')).decode('utf-8'))), -zlib.MAX_WBITS) return zlib.decompress(bytearray(map(lambda c:ord(c)&255, b64decode(s.encode('utf-8')).decode('utf-8'))), -zlib.MAX_WBITS)
else: else:
raise PBinCLIException('Unknown compression type provided in paste!') PBinCLIError('Unknown compression type provided in paste!')
def __compress(self, s): def __compress(self, s):
@@ -160,115 +182,141 @@ class Paste:
b = co.compress(s) + co.flush() b = co.compress(s) + co.flush()
return b64encode(''.join(map(chr, b)).encode('utf-8')) return b64encode(''.join(map(chr, b)).encode('utf-8'))
else: else:
raise PBinCLIException('Unknown compression type provided!') PBinCLIError('Unknown compression type provided!')
def decrypt(self): def decrypt(self):
# that is wrapper which running needed function regrading to paste version
if self._version == 2: self._decryptV2()
else: self._decryptV1()
def _decryptV2(self):
from json import loads as json_decode
iv = b64decode(self._data['adata'][0][0])
salt = b64decode(self._data['adata'][0][1])
self._iteration_count = self._data['adata'][0][2]
self._block_bits = self._data['adata'][0][3]
self._tag_bits = self._data['adata'][0][4]
cipher_tag_bytes = int(self._tag_bits / 8)
key = self.__deriveKey(salt)
# Get compression type from received paste
self._compression = self._data['adata'][0][7]
cipher = self.__initializeCipher(key, iv, self._data['adata'], cipher_tag_bytes)
# Cut the cipher text into message and tag
cipher_text_tag = b64decode(self._data['ct'])
cipher_text = cipher_text_tag[:-cipher_tag_bytes]
cipher_tag = cipher_text_tag[-cipher_tag_bytes:]
cipher_message = json_decode(self.__decompress(cipher.decrypt_and_verify(cipher_text, cipher_tag)).decode())
self._text = cipher_message['paste'].encode()
if 'attachment' in cipher_message and 'attachment_name' in cipher_message:
self._attachment = cipher_message['attachment']
self._attachment_name = cipher_message['attachment_name']
def _decryptV1(self):
from sjcl import SJCL
from json import loads as json_decode from json import loads as json_decode
if self._version == 2: password = self.__preparePassKey()
iv = b64decode(self._data['adata'][0][0]) cipher_text = json_decode(self._data['data'])
salt = b64decode(self._data['adata'][0][1]) if self._debug: print("Text:\t{}\n".format(cipher_text))
key = self.__deriveKey(salt)
# Get compression type from received paste text = SJCL().decrypt(cipher_text, password)
self._compression = self._data['adata'][0][7]
cipher = self.__initializeCipher(key, iv, self._data['adata']) if len(text):
# Cut the cipher text into message and tag if self._debug: print("Decoded Text:\t{}\n".format(text))
cipher_text_tag = b64decode(self._data['ct']) self._text = self.__decompress(text.decode())
cipher_text = cipher_text_tag[:-CIPHER_TAG_BYTES]
cipher_tag = cipher_text_tag[-CIPHER_TAG_BYTES:]
cipher_message = json_decode(self.__decompress(cipher.decrypt_and_verify(cipher_text, cipher_tag)).decode())
self._text = cipher_message['paste'].encode() if 'attachment' in self._data and 'attachmentname' in self._data:
cipherfile = json_decode(self._data['attachment'])
cipherfilename = json_decode(self._data['attachmentname'])
if 'attachment' in cipher_message and 'attachment_name' in cipher_message: if self._debug: print("Name:\t{}\nData:\t{}".format(cipherfilename, cipherfile))
self._attachment = cipher_message['attachment']
self._attachment_name = cipher_message['attachment_name']
else:
from sjcl import SJCL
password = self.__preparePassKey() attachment = SJCL().decrypt(cipherfile, password)
attachmentname = SJCL().decrypt(cipherfilename, password)
cipher_text = json_decode(self._data['data']) self._attachment = self.__decompress(attachment.decode('utf-8')).decode('utf-8')
self._attachment_name = self.__decompress(attachmentname.decode('utf-8')).decode('utf-8')
if self._debug: print("Text:\t{}\n".format(cipher_text))
text = SJCL().decrypt(cipher_text, password)
if len(text):
if self._debug: print("Decoded Text:\t{}\n".format(text))
self._text = self.__decompress(text.decode())
if 'attachment' in self._data and 'attachmentname' in self._data:
cipherfile = json_decode(self._data['attachment'])
cipherfilename = json_decode(self._data['attachmentname'])
if self._debug: print("Name:\t{}\nData:\t{}".format(cipherfilename, cipherfile))
attachment = SJCL().decrypt(cipherfile, password)
attachmentname = SJCL().decrypt(cipherfilename, password)
self._attachment = self.__decompress(attachment.decode('utf-8')).decode('utf-8')
self._attachment_name = self.__decompress(attachmentname.decode('utf-8')).decode('utf-8')
def encrypt(self, formatter, burnafterreading, discussion, expiration): def encrypt(self, formatter, burnafterreading, discussion, expiration):
# that is wrapper which running needed function regrading to paste version
self._formatter = formatter
self._burnafterreading = burnafterreading
self._discussion = discussion
self._expiration = expiration
if self._version == 2: self._encryptV2()
else: self._encryptV1()
def _encryptV2(self):
from pbincli.utils import json_encode from pbincli.utils import json_encode
if self._version == 2:
iv = get_random_bytes(CIPHER_TAG_BYTES)
salt = get_random_bytes(CIPHER_SALT_BYTES)
key = self.__deriveKey(salt)
# prepare encryption authenticated data and message iv = get_random_bytes(int(self._tag_bits / 8))
adata = [ salt = get_random_bytes(self._salt_bytes)
[ key = self.__deriveKey(salt)
b64encode(iv).decode(),
b64encode(salt).decode(),
CIPHER_ITERATION_COUNT,
CIPHER_BLOCK_BITS,
CIPHER_TAG_BITS,
'aes',
'gcm',
self._compression
],
formatter,
int(burnafterreading),
int(discussion)
]
cipher_message = {'paste':self._text}
if self._attachment:
cipher_message['attachment'] = self._attachment
cipher_message['attachment_name'] = self._attachment_name
cipher = self.__initializeCipher(key, iv, adata) # prepare encryption authenticated data and message
ciphertext, tag = cipher.encrypt_and_digest(self.__compress(json_encode(cipher_message))) adata = [
[
b64encode(iv).decode(),
b64encode(salt).decode(),
self._iteration_count,
self._block_bits,
self._tag_bits,
'aes',
'gcm',
self._compression
],
self._formatter,
int(self._discussion),
int(self._burnafterreading)
]
cipher_message = {'paste':self._text}
if self._attachment:
cipher_message['attachment'] = self._attachment
cipher_message['attachment_name'] = self._attachment_name
self._data = {'v':2,'adata':adata,'ct':b64encode(ciphertext + tag).decode(),'meta':{'expire':expiration}} cipher = self.__initializeCipher(key, iv, adata, int(self._tag_bits /8 ))
ciphertext, tag = cipher.encrypt_and_digest(self.__compress(json_encode(cipher_message)))
else: if self._debug: print("PBKDF2 Key:\t{}\nCipherText:\t{}\nCipherTag:\t{}"
from sjcl import SJCL .format(b64encode(key), b64encode(ciphertext), b64encode(tag)))
self._data = {'expire':expiration,'formatter':formatter,'burnafterreading':int(burnafterreading),'opendiscussion':int(discussion)} self._data = {'v':2,'adata':adata,'ct':b64encode(ciphertext + tag).decode(),'meta':{'expire':self._expiration}}
password = self.__preparePassKey()
if self._debug: print("Password:\t{}".format(password)) def _encryptV1(self):
from sjcl import SJCL
from pbincli.utils import json_encode
# Encrypting text self._data = {'expire':self._expiration,'formatter':self._formatter,
cipher = SJCL().encrypt(self.__compress(self._text.encode('utf-8')), password, mode='gcm') 'burnafterreading':int(self._burnafterreading),'opendiscussion':int(self._discussion)}
for k in ['salt', 'iv', 'ct']: cipher[k] = cipher[k].decode()
self._data['data'] = json_encode(cipher) password = self.__preparePassKey()
if self._debug: print("Password:\t{}".format(password))
if self._attachment: # Encrypting text
cipherfile = SJCL().encrypt(self.__compress(self._attachment.encode('utf-8')), password, mode='gcm') cipher = SJCL().encrypt(self.__compress(self._text.encode('utf-8')), password, mode='gcm')
for k in ['salt', 'iv', 'ct']: cipherfile[k] = cipherfile[k].decode() for k in ['salt', 'iv', 'ct']: cipher[k] = cipher[k].decode()
cipherfilename = SJCL().encrypt(self.__compress(self._attachment_name.encode('utf-8')), password, mode='gcm') self._data['data'] = json_encode(cipher)
for k in ['salt', 'iv', 'ct']: cipherfilename[k] = cipherfilename[k].decode()
self._data['attachment'] = json_encode(cipherfile) if self._attachment:
self._data['attachmentname'] = json_encode(cipherfilename) cipherfile = SJCL().encrypt(self.__compress(self._attachment.encode('utf-8')), password, mode='gcm')
for k in ['salt', 'iv', 'ct']: cipherfile[k] = cipherfile[k].decode()
cipherfilename = SJCL().encrypt(self.__compress(self._attachment_name.encode('utf-8')), password, mode='gcm')
for k in ['salt', 'iv', 'ct']: cipherfilename[k] = cipherfilename[k].decode()
self._data['attachment'] = json_encode(cipherfile)
self._data['attachmentname'] = json_encode(cipherfilename)

View File

@@ -1,9 +1,14 @@
import json, ntpath, os import json, ntpath, os, sys
class PBinCLIException(Exception): class PBinCLIException(Exception):
pass pass
def PBinCLIError(message):
print("PBinCLI Error: {}".format(message), file=sys.stderr)
exit(1)
def path_leaf(path): def path_leaf(path):
head, tail = ntpath.split(path) head, tail = ntpath.split(path)
return tail or ntpath.basename(head) return tail or ntpath.basename(head)
@@ -12,14 +17,20 @@ def path_leaf(path):
def check_readable(f): def check_readable(f):
# Checks if path exists and readable # Checks if path exists and readable
if not os.path.exists(f) or not os.access(f, os.R_OK): if not os.path.exists(f) or not os.access(f, os.R_OK):
raise PBinCLIException("Error accessing path: {}".format(f)) PBinCLIError("Error accessing path: {}".format(f))
def check_writable(f): def check_writable(f):
# Checks if path is writable # Checks if path is writable
if not os.access(os.path.dirname(f) or ".", os.W_OK): if not os.access(os.path.dirname(f) or ".", os.W_OK):
raise PBinCLIException("Path is not writable: {}".format(f)) PBinCLIError("Path is not writable: {}".format(f))
def json_encode(s): def json_encode(s):
return json.dumps(s, separators=(',',':')).encode() return json.dumps(s, separators=(',',':')).encode()
def validate_url(s):
if not s.endswith('/'):
s = s + "/"
return s

2
setup.cfg Normal file
View File

@@ -0,0 +1,2 @@
[metadata]
license_files = LICENSE

View File

@@ -17,7 +17,7 @@ setup(
long_description_content_type='text/x-rst', long_description_content_type='text/x-rst',
author='R4SAS', author='R4SAS',
author_email='r4sas@i2pmail.org', author_email='r4sas@i2pmail.org',
url='https://github.com/r4sas/PBinCLI', url='https://github.com/r4sas/PBinCLI/',
keywords='privatebin cryptography security', keywords='privatebin cryptography security',
license='MIT', license='MIT',
classifiers=[ classifiers=[
@@ -31,9 +31,14 @@ setup(
], ],
packages=['pbincli'], packages=['pbincli'],
install_requires=install_requires, install_requires=install_requires,
python_requires='>=3',
entry_points={ entry_points={
'console_scripts': [ 'console_scripts': [
'pbincli=pbincli.cli:main', 'pbincli=pbincli.cli:main',
], ],
} },
project_urls={
'Bug Reports': 'https://github.com/r4sas/PBinCLI/issues',
'Source': 'https://github.com/r4sas/PBinCLI/',
},
) )