Compare commits
23 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
682b47fbd3 | ||
|
|
9d82c727b6 | ||
|
|
c425d86ed6 | ||
|
|
3ed06686ab | ||
|
|
7e4fb0a8c4 | ||
|
|
ebfe0c48a0 | ||
|
|
45d854e590 | ||
|
|
18d79c8e04 | ||
|
|
92c38344e3 | ||
|
|
b596f42b7e | ||
|
|
86c5051fcf | ||
|
|
f838f8ee94 | ||
|
|
fb7a93732d | ||
|
|
181763070c | ||
|
|
c3a491ac46 | ||
|
|
641c55a6a2 | ||
|
|
432675f2e6 | ||
|
|
635c87dabd | ||
|
|
6b6c33e545 | ||
|
|
7c5ba2fdbe | ||
|
|
692335ee62 | ||
|
|
c63256c628 | ||
|
|
66659da66d |
3
.gitattributes
vendored
Normal file
3
.gitattributes
vendored
Normal file
@@ -0,0 +1,3 @@
|
||||
.gitattributes export-ignore
|
||||
.gitignore export-ignore
|
||||
README.md export-ignore
|
||||
@@ -2,6 +2,6 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
__author__ = "R4SAS <r4sas@i2pmail.org>"
|
||||
__version__ = "0.2"
|
||||
__version__ = "0.3.0"
|
||||
__copyright__ = "Copyright (c) R4SAS"
|
||||
__license__ = "MIT"
|
||||
|
||||
@@ -1,14 +1,18 @@
|
||||
from pbincli.format import Paste
|
||||
from pbincli.utils import PBinCLIError
|
||||
|
||||
def send(args, api_client, settings=None):
|
||||
from pbincli.api import Shortener
|
||||
if args.short:
|
||||
shortener = Shortener(settings)
|
||||
|
||||
def send(args, api_client):
|
||||
if not args.notext:
|
||||
if args.text:
|
||||
text = args.text
|
||||
elif args.stdin:
|
||||
text = args.stdin.read()
|
||||
elif not args.file:
|
||||
print("Nothing to send!")
|
||||
exit(1)
|
||||
PBinCLIError("Nothing to send!")
|
||||
else:
|
||||
text = ""
|
||||
|
||||
@@ -60,29 +64,32 @@ def send(args, api_client):
|
||||
result['id'],
|
||||
passphrase,
|
||||
result['deletetoken'],
|
||||
api_client.server,
|
||||
settings['server'],
|
||||
result['id'],
|
||||
passphrase))
|
||||
elif result['status']: # return code is other then zero
|
||||
print("Something went wrong...\nError:\t\t{}".format(result['message']))
|
||||
exit(1)
|
||||
PBinCLIError("Something went wrong...\nError:\t\t{}".format(result['message']))
|
||||
else: # or here no status field in response or it is empty
|
||||
print("Something went wrong...\nError: Empty response.")
|
||||
exit(1)
|
||||
PBinCLIError("Something went wrong...\nError: Empty response.")
|
||||
|
||||
if args.short:
|
||||
print("\nQuerying URL shortening service...")
|
||||
shortener.getlink("{}?{}#{}".format(
|
||||
settings['server'],
|
||||
result['id'],
|
||||
passphrase))
|
||||
|
||||
|
||||
def get(args, api_client):
|
||||
def get(args, api_client, settings=None):
|
||||
from pbincli.utils import check_writable, json_encode
|
||||
|
||||
try:
|
||||
pasteid, passphrase = args.pasteinfo.split("#")
|
||||
except ValueError:
|
||||
print("PBinCLI error: provided info hasn't contain valid PasteID#Passphrase string")
|
||||
exit(1)
|
||||
PBinCLIError("Provided info hasn't contain valid PasteID#Passphrase string")
|
||||
|
||||
if not (pasteid and passphrase):
|
||||
print("PBinCLI error: Incorrect request")
|
||||
exit(1)
|
||||
PBinCLIError("Incorrect request")
|
||||
|
||||
if args.debug: print("PasteID:\t{}\nPassphrase:\t{}".format(pasteid, passphrase))
|
||||
|
||||
@@ -139,14 +146,12 @@ def get(args, api_client):
|
||||
api_client.delete(json_encode({'pasteid':pasteid,'deletetoken':'burnafterreading'}))
|
||||
|
||||
elif result['status']: # return code is other then zero
|
||||
print("Something went wrong...\nError:\t\t{}".format(result['message']))
|
||||
exit(1)
|
||||
PBinCLIError("Something went wrong...\nError:\t\t{}".format(result['message']))
|
||||
else: # or here no status field in response or it is empty
|
||||
print("Something went wrong...\nError: Empty response.")
|
||||
exit(1)
|
||||
PBinCLIError("Something went wrong...\nError: Empty response.")
|
||||
|
||||
|
||||
def delete(args, api_client):
|
||||
def delete(args, api_client, settings=None):
|
||||
from pbincli.utils import json_encode
|
||||
|
||||
pasteid = args.paste
|
||||
|
||||
206
pbincli/api.py
206
pbincli/api.py
@@ -1,21 +1,29 @@
|
||||
import requests
|
||||
from requests import HTTPError
|
||||
from pbincli.utils import PBinCLIError
|
||||
|
||||
class PrivateBin:
|
||||
def __init__(self, server, settings=None):
|
||||
self.server = server
|
||||
self.headers = {'X-Requested-With': 'JSONHttpRequest'}
|
||||
|
||||
def _config_requests(settings=None):
|
||||
if settings['proxy']:
|
||||
self.proxy = {settings['proxy'].split('://')[0]: settings['proxy']}
|
||||
proxy = {settings['proxy'].split('://')[0]: settings['proxy']}
|
||||
else:
|
||||
self.proxy = {}
|
||||
proxy = {}
|
||||
|
||||
if settings['noinsecurewarn']:
|
||||
if settings['no_insecure_warning']:
|
||||
from requests.packages.urllib3.exceptions import InsecureRequestWarning
|
||||
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
|
||||
|
||||
self.session = requests.Session()
|
||||
self.session.verify = settings['nocheckcert']
|
||||
session = requests.Session()
|
||||
session.verify = not settings['no_check_certificate']
|
||||
|
||||
return session, proxy
|
||||
|
||||
|
||||
class PrivateBin:
|
||||
def __init__(self, settings=None):
|
||||
self.server = settings['server']
|
||||
self.headers = {'X-Requested-With': 'JSONHttpRequest'}
|
||||
|
||||
self.session, self.proxy = _config_requests(settings)
|
||||
|
||||
def post(self, request):
|
||||
result = self.session.post(
|
||||
@@ -27,8 +35,7 @@ class PrivateBin:
|
||||
try:
|
||||
return result.json()
|
||||
except ValueError:
|
||||
print("ERROR: Unable parse response as json. Received (size = {}):\n{}".format(len(result.text), result.text))
|
||||
exit(1)
|
||||
PBinCLIError("Unable parse response as json. Received (size = {}):\n{}".format(len(result.text), result.text))
|
||||
|
||||
|
||||
def get(self, request):
|
||||
@@ -56,11 +63,9 @@ class PrivateBin:
|
||||
if not result['status']:
|
||||
print("Paste successfully deleted!")
|
||||
elif result['status']:
|
||||
print("Something went wrong...\nError:\t\t{}".format(result['message']))
|
||||
exit(1)
|
||||
PBinCLIError("Something went wrong...\nError:\t\t{}".format(result['message']))
|
||||
else:
|
||||
print("Something went wrong...\nError: Empty response.")
|
||||
exit(1)
|
||||
PBinCLIError("Something went wrong...\nError: Empty response.")
|
||||
|
||||
|
||||
def getVersion(self):
|
||||
@@ -72,3 +77,172 @@ class PrivateBin:
|
||||
'v' in jsonldSchema['@context'] and
|
||||
'@value' in jsonldSchema['@context']['v']) \
|
||||
else 1
|
||||
|
||||
class Shortener:
|
||||
"""Some parts of this class was taken from
|
||||
python-yourls (https://github.com/tflink/python-yourls/) library
|
||||
"""
|
||||
def __init__(self, settings=None):
|
||||
self.api = settings['short_api']
|
||||
|
||||
# we checking which service is used, because some services doesn't require
|
||||
# any authentication, or have only one domain on which it working
|
||||
if self.api == 'yourls':
|
||||
self._yourls_init(settings)
|
||||
elif self.api == 'isgd' or self.api == 'vgd':
|
||||
self._gd_init()
|
||||
|
||||
self.session, self.proxy = _config_requests(settings)
|
||||
|
||||
|
||||
def _yourls_init(self, settings):
|
||||
if not settings['short_url']:
|
||||
PBinCLIError("YOURLS: An API URL is required")
|
||||
|
||||
# setting API URL
|
||||
apiurl = settings['short_url']
|
||||
if apiurl.endswith('/yourls-api.php'):
|
||||
self.apiurl = apiurl
|
||||
elif apiurl.endswith('/'):
|
||||
self.apiurl = apiurl + 'yourls-api.php'
|
||||
else:
|
||||
PBinCLIError("YOURLS: Incorrect URL is provided.\n" +
|
||||
"It must contain full address to 'yourls-api.php' script (like https://example.com/yourls-api.php)\n" +
|
||||
"or just contain instance URL with '/' at the end (like https://example.com/)")
|
||||
|
||||
# validating for required credentials
|
||||
if settings['short_user'] and settings['short_pass'] and settings['short_token'] is None:
|
||||
self.auth_args = {'username': settings['short_user'], 'password': settings['short_pass']}
|
||||
elif settings['short_user'] is None and settings['short_pass'] is None and settings['short_token']:
|
||||
self.auth_args = {'signature': settings['short_token']}
|
||||
elif settings['short_user'] is None and settings['short_pass'] is None and settings['short_token'] is None:
|
||||
self.auth_args = {}
|
||||
else:
|
||||
PBinCLIError("YOURLS: either username and password or token are required. Otherwise set to default (None)")
|
||||
|
||||
|
||||
def _gd_init(self):
|
||||
if self.api == 'isgd':
|
||||
self.apiurl = 'https://is.gd/'
|
||||
else:
|
||||
self.apiurl = 'https://v.gd/'
|
||||
self.useragent = 'Mozilla/5.0 (compatible; pbincli - https://github.com/r4sas/pbincli/)'
|
||||
|
||||
|
||||
def getlink(self, url):
|
||||
# that is api -> function mapper for running service-related function when getlink() used
|
||||
servicesList = {
|
||||
'yourls': self._yourls,
|
||||
'clckru': self._clckru,
|
||||
'tinyurl': self._tinyurl,
|
||||
'isgd': self._gd,
|
||||
'vgd': self._gd,
|
||||
'cuttly': self._cuttly
|
||||
}
|
||||
# run function selected by choosen API
|
||||
servicesList[self.api](url)
|
||||
|
||||
|
||||
def _yourls(self,url):
|
||||
request = {'action': 'shorturl', 'format': 'json', 'url': url}
|
||||
request.update(self.auth_args)
|
||||
|
||||
result = self.session.post(
|
||||
url = self.apiurl,
|
||||
proxies = self.proxy,
|
||||
data = request)
|
||||
|
||||
try:
|
||||
result.raise_for_status()
|
||||
except HTTPError:
|
||||
try:
|
||||
response = result.json()
|
||||
except ValueError:
|
||||
PBinCLIError("YOURLS: Unable parse response. Received (size = {}):\n{}".format(len(result.text), result.text))
|
||||
else:
|
||||
PBinCLIError("YOURLS: Received error from API: {} with JSON {}".format(result, response))
|
||||
else:
|
||||
response = result.json()
|
||||
|
||||
if {'status', 'statusCode', 'message'} <= set(response.keys()):
|
||||
if response['status'] == 'fail':
|
||||
PBinCLIError("YOURLS: Received error from API: {}".format(response['message']))
|
||||
if not 'shorturl' in response:
|
||||
PBinCLIError("YOURLS: Unknown error: {}".format(response['message']))
|
||||
else:
|
||||
print("Short Link:\t{}".format(response['shorturl']))
|
||||
else:
|
||||
PBinCLIError("YOURLS: No status, statusCode or message fields in response! Received:\n{}".format(response))
|
||||
|
||||
|
||||
def _clckru(self, url):
|
||||
request = {'url': url}
|
||||
|
||||
try:
|
||||
result = self.session.post(
|
||||
url = "https://clck.ru/--",
|
||||
proxies = self.proxy,
|
||||
data = request)
|
||||
print("Short Link:\t{}".format(result.text))
|
||||
except Exception as ex:
|
||||
PBinCLIError("clck.ru: unexcepted behavior: {}".format(ex))
|
||||
|
||||
|
||||
def _tinyurl(self, url):
|
||||
request = {'url': url}
|
||||
|
||||
try:
|
||||
result = self.session.post(
|
||||
url = "https://tinyurl.com/api-create.php",
|
||||
proxies = self.proxy,
|
||||
data = request)
|
||||
print("Short Link:\t{}".format(result.text))
|
||||
except Exception as ex:
|
||||
PBinCLIError("TinyURL: unexcepted behavior: {}".format(ex))
|
||||
|
||||
|
||||
def _gd(self, url):
|
||||
request = {
|
||||
'format': 'json',
|
||||
'url': url,
|
||||
'logstats': 0 # we don't want use any statistics
|
||||
}
|
||||
headers = { 'User-Agent': self.useragent}
|
||||
|
||||
try:
|
||||
result = self.session.post(
|
||||
url = self.apiurl + "create.php",
|
||||
headers = headers,
|
||||
proxies = self.proxy,
|
||||
data = request)
|
||||
|
||||
response = result.json()
|
||||
|
||||
if 'shorturl' in response:
|
||||
print("Short Link:\t{}".format(response['shorturl']))
|
||||
else:
|
||||
PBinCLIError("{}: got error {} from API: {}".format(
|
||||
"is.gd" if self.api == 'isgd' else 'v.gd',
|
||||
response['errorcode'],
|
||||
response['errormessage']))
|
||||
|
||||
except Exception as ex:
|
||||
PBinCLIError("{}: unexcepted behavior: {}".format(
|
||||
"is.gd" if self.api == 'isgd' else 'v.gd',
|
||||
ex))
|
||||
|
||||
|
||||
def _cuttly(self, url):
|
||||
request = {
|
||||
'url': url,
|
||||
'domain': 0
|
||||
}
|
||||
|
||||
try:
|
||||
result = self.session.post(
|
||||
url = "https://cutt.ly/scripts/shortenUrl.php",
|
||||
proxies = self.proxy,
|
||||
data = request)
|
||||
print("Short Link:\t{}".format(result.text))
|
||||
except Exception as ex:
|
||||
PBinCLIError("cutt.ly: unexcepted behavior: {}".format(ex))
|
||||
|
||||
@@ -3,7 +3,7 @@ import os, sys, argparse
|
||||
|
||||
import pbincli.actions
|
||||
from pbincli.api import PrivateBin
|
||||
from pbincli.utils import PBinCLIException
|
||||
from pbincli.utils import PBinCLIException, validate_url
|
||||
|
||||
CONFIG_PATHS = [os.path.join(".", "pbincli.conf", ),
|
||||
os.path.join(os.getenv("HOME") or "~", ".config", "pbincli", "pbincli.conf") ]
|
||||
@@ -36,8 +36,19 @@ def main():
|
||||
send_parser.add_argument("-q", "--notext", default=False, action="store_true", help="don't send text in paste")
|
||||
send_parser.add_argument("-c", "--compression", default="zlib", action="store",
|
||||
choices=["zlib", "none"], help="set compression for paste (default: zlib). Note: works only on v2 paste format")
|
||||
send_parser.add_argument("--no-check-certificate", default=True, action="store_false", help="disable certificate validation")
|
||||
# URL shortener
|
||||
send_parser.add_argument("-S", "--short", default=False, action="store_true", help="use URL shortener")
|
||||
send_parser.add_argument("--short-api", default=argparse.SUPPRESS, action="store", choices=["tinyurl", "clckru", "isgd", "vgd", "cuttly", "yourls"], help="API used by shortener service")
|
||||
send_parser.add_argument("--short-url", default=argparse.SUPPRESS, help="URL of shortener service API")
|
||||
send_parser.add_argument("--short-user", default=argparse.SUPPRESS, help="Shortener username")
|
||||
send_parser.add_argument("--short-pass", default=argparse.SUPPRESS, help="Shortener password")
|
||||
send_parser.add_argument("--short-token", default=argparse.SUPPRESS, help="Shortener token")
|
||||
# Connection options
|
||||
send_parser.add_argument("-s", "--server", default=argparse.SUPPRESS, help="PrivateBin service URL (default: https://paste.i2pd.xyz/)")
|
||||
send_parser.add_argument("-x", "--proxy", default=argparse.SUPPRESS, help="Proxy server address (default: None)")
|
||||
send_parser.add_argument("--no-check-certificate", default=False, action="store_true", help="disable certificate validation")
|
||||
send_parser.add_argument("--no-insecure-warning", default=False, action="store_true", help="suppress InsecureRequestWarning (only with --no-check-certificate)")
|
||||
#
|
||||
send_parser.add_argument("-d", "--debug", default=False, action="store_true", help="enable debug")
|
||||
send_parser.add_argument("--dry", default=False, action="store_true", help="invoke dry run")
|
||||
send_parser.add_argument("stdin", help="input paste text from stdin", nargs="?", type=argparse.FileType("r"), default=sys.stdin)
|
||||
@@ -47,7 +58,7 @@ def main():
|
||||
get_parser = subparsers.add_parser("get", description="Get data from PrivateBin instance")
|
||||
get_parser.add_argument("pasteinfo", help="example: aabb#cccddd")
|
||||
get_parser.add_argument("-p", "--password", help="password for decrypting paste")
|
||||
get_parser.add_argument("--no-check-certificate", default=True, action="store_false", help="disable certificate validation")
|
||||
get_parser.add_argument("--no-check-certificate", default=False, action="store_true", help="disable certificate validation")
|
||||
get_parser.add_argument("--no-insecure-warning", default=False, action="store_true", help="suppress InsecureRequestWarning (only with --no-check-certificate)")
|
||||
get_parser.add_argument("-d", "--debug", default=False, action="store_true", help="enable debug")
|
||||
get_parser.set_defaults(func=pbincli.actions.get)
|
||||
@@ -56,7 +67,7 @@ def main():
|
||||
delete_parser = subparsers.add_parser("delete", description="Delete paste from PrivateBin instance using token")
|
||||
delete_parser.add_argument("-p", "--paste", required=True, help="paste id")
|
||||
delete_parser.add_argument("-t", "--token", required=True, help="paste deletion token")
|
||||
delete_parser.add_argument("--no-check-certificate", default=True, action="store_false", help="disable certificate validation")
|
||||
delete_parser.add_argument("--no-check-certificate", default=False, action="store_true", help="disable certificate validation")
|
||||
delete_parser.add_argument("--no-insecure-warning", default=False, action="store_true", help="suppress InsecureRequestWarning (only with --no-check-certificate)")
|
||||
delete_parser.add_argument("-d", "--debug", default=False, action="store_true", help="enable debug")
|
||||
delete_parser.set_defaults(func=pbincli.actions.delete)
|
||||
@@ -65,10 +76,23 @@ def main():
|
||||
args = parser.parse_args()
|
||||
|
||||
CONFIG = {
|
||||
"server": "https://paste.i2pd.xyz/",
|
||||
"proxy": None
|
||||
'server': 'https://paste.i2pd.xyz/',
|
||||
'proxy': None,
|
||||
'short_api': None,
|
||||
'short_url': None,
|
||||
'short_user': None,
|
||||
'short_pass': None,
|
||||
'short_token': None,
|
||||
'no_check_certificate': False,
|
||||
'no_insecure_warning': False
|
||||
}
|
||||
|
||||
# Configuration preference order:
|
||||
# 1. Command line switches
|
||||
# 2. Environment variables
|
||||
# 3. Configuration file
|
||||
# 4. Default values below
|
||||
|
||||
for p in CONFIG_PATHS:
|
||||
if os.path.exists(p):
|
||||
CONFIG.update(read_config(p))
|
||||
@@ -77,21 +101,21 @@ def main():
|
||||
for key in CONFIG.keys():
|
||||
var = "PRIVATEBIN_{}".format(key.upper())
|
||||
if var in os.environ: CONFIG[key] = os.getenv(var)
|
||||
# values from command line switches are preferred
|
||||
args_var = vars(args)
|
||||
if key in args_var:
|
||||
CONFIG[key] = args_var[key]
|
||||
|
||||
SETTINGS = {
|
||||
"proxy": CONFIG["proxy"],
|
||||
"nocheckcert": args.no_check_certificate,
|
||||
"noinsecurewarn": args.no_insecure_warning
|
||||
}
|
||||
# Re-validate PrivateBin instance URL
|
||||
CONFIG['server'] = validate_url(CONFIG['server'])
|
||||
|
||||
api_client = PrivateBin(CONFIG["server"], settings=SETTINGS)
|
||||
api_client = PrivateBin(CONFIG)
|
||||
|
||||
if hasattr(args, "func"):
|
||||
try:
|
||||
args.func(args, api_client)
|
||||
args.func(args, api_client, settings=CONFIG)
|
||||
except PBinCLIException as pe:
|
||||
print("PBinCLI error: {}".format(pe))
|
||||
sys.exit(1)
|
||||
raise PBinCLIException("error: {}".format(pe))
|
||||
else:
|
||||
parser.print_help()
|
||||
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
from Crypto.Random import get_random_bytes
|
||||
from Crypto.Cipher import AES
|
||||
from base64 import b64encode, b64decode
|
||||
from pbincli.utils import PBinCLIException
|
||||
from pbincli.utils import PBinCLIError
|
||||
import zlib
|
||||
|
||||
CIPHER_ITERATION_COUNT = 100000
|
||||
@@ -143,7 +143,7 @@ class Paste:
|
||||
elif self._version == 1:
|
||||
return zlib.decompress(bytearray(map(lambda c:ord(c)&255, b64decode(s.encode('utf-8')).decode('utf-8'))), -zlib.MAX_WBITS)
|
||||
else:
|
||||
raise PBinCLIException('Unknown compression type provided in paste!')
|
||||
PBinCLIError('Unknown compression type provided in paste!')
|
||||
|
||||
|
||||
def __compress(self, s):
|
||||
@@ -160,13 +160,17 @@ class Paste:
|
||||
b = co.compress(s) + co.flush()
|
||||
return b64encode(''.join(map(chr, b)).encode('utf-8'))
|
||||
else:
|
||||
raise PBinCLIException('Unknown compression type provided!')
|
||||
PBinCLIError('Unknown compression type provided!')
|
||||
|
||||
|
||||
def decrypt(self):
|
||||
from json import loads as json_decode
|
||||
# that is wrapper which running needed function regrading to paste version
|
||||
if self._version == 2: self._decryptV2()
|
||||
else: self._decryptV1()
|
||||
|
||||
if self._version == 2:
|
||||
|
||||
def _decryptV2(self):
|
||||
from json import loads as json_decode
|
||||
iv = b64decode(self._data['adata'][0][0])
|
||||
salt = b64decode(self._data['adata'][0][1])
|
||||
key = self.__deriveKey(salt)
|
||||
@@ -186,13 +190,14 @@ class Paste:
|
||||
if 'attachment' in cipher_message and 'attachment_name' in cipher_message:
|
||||
self._attachment = cipher_message['attachment']
|
||||
self._attachment_name = cipher_message['attachment_name']
|
||||
else:
|
||||
|
||||
|
||||
def _decryptV1(self):
|
||||
from sjcl import SJCL
|
||||
from json import loads as json_decode
|
||||
|
||||
password = self.__preparePassKey()
|
||||
|
||||
cipher_text = json_decode(self._data['data'])
|
||||
|
||||
if self._debug: print("Text:\t{}\n".format(cipher_text))
|
||||
|
||||
text = SJCL().decrypt(cipher_text, password)
|
||||
@@ -215,8 +220,19 @@ class Paste:
|
||||
|
||||
|
||||
def encrypt(self, formatter, burnafterreading, discussion, expiration):
|
||||
# that is wrapper which running needed function regrading to paste version
|
||||
self._formatter = formatter
|
||||
self._burnafterreading = burnafterreading
|
||||
self._discussion = discussion
|
||||
self._expiration = expiration
|
||||
|
||||
if self._version == 2: self._encryptV2()
|
||||
else: self._encryptV1()
|
||||
|
||||
|
||||
def _encryptV2(self):
|
||||
from pbincli.utils import json_encode
|
||||
if self._version == 2:
|
||||
|
||||
iv = get_random_bytes(CIPHER_TAG_BYTES)
|
||||
salt = get_random_bytes(CIPHER_SALT_BYTES)
|
||||
key = self.__deriveKey(salt)
|
||||
@@ -233,9 +249,9 @@ class Paste:
|
||||
'gcm',
|
||||
self._compression
|
||||
],
|
||||
formatter,
|
||||
int(burnafterreading),
|
||||
int(discussion)
|
||||
self._formatter,
|
||||
int(self._discussion),
|
||||
int(self._burnafterreading)
|
||||
]
|
||||
cipher_message = {'paste':self._text}
|
||||
if self._attachment:
|
||||
@@ -245,15 +261,16 @@ class Paste:
|
||||
cipher = self.__initializeCipher(key, iv, adata)
|
||||
ciphertext, tag = cipher.encrypt_and_digest(self.__compress(json_encode(cipher_message)))
|
||||
|
||||
self._data = {'v':2,'adata':adata,'ct':b64encode(ciphertext + tag).decode(),'meta':{'expire':expiration}}
|
||||
self._data = {'v':2,'adata':adata,'ct':b64encode(ciphertext + tag).decode(),'meta':{'expire':self._expiration}}
|
||||
|
||||
else:
|
||||
|
||||
def _encryptV1(self):
|
||||
from sjcl import SJCL
|
||||
from pbincli.utils import json_encode
|
||||
|
||||
self._data = {'expire':expiration,'formatter':formatter,'burnafterreading':int(burnafterreading),'opendiscussion':int(discussion)}
|
||||
self._data = {'expire':self._expiration,'formatter':self._formatter,'burnafterreading':int(self._burnafterreading),'opendiscussion':int(self._discussion)}
|
||||
|
||||
password = self.__preparePassKey()
|
||||
|
||||
if self._debug: print("Password:\t{}".format(password))
|
||||
|
||||
# Encrypting text
|
||||
@@ -271,4 +288,3 @@ class Paste:
|
||||
|
||||
self._data['attachment'] = json_encode(cipherfile)
|
||||
self._data['attachmentname'] = json_encode(cipherfilename)
|
||||
|
||||
|
||||
@@ -1,9 +1,14 @@
|
||||
import json, ntpath, os
|
||||
import json, ntpath, os, sys
|
||||
|
||||
class PBinCLIException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def PBinCLIError(message):
|
||||
print("PBinCLI Error: {}".format(message), file=sys.stderr)
|
||||
exit(1)
|
||||
|
||||
|
||||
def path_leaf(path):
|
||||
head, tail = ntpath.split(path)
|
||||
return tail or ntpath.basename(head)
|
||||
@@ -12,14 +17,20 @@ def path_leaf(path):
|
||||
def check_readable(f):
|
||||
# Checks if path exists and readable
|
||||
if not os.path.exists(f) or not os.access(f, os.R_OK):
|
||||
raise PBinCLIException("Error accessing path: {}".format(f))
|
||||
PBinCLIError("Error accessing path: {}".format(f))
|
||||
|
||||
|
||||
def check_writable(f):
|
||||
# Checks if path is writable
|
||||
if not os.access(os.path.dirname(f) or ".", os.W_OK):
|
||||
raise PBinCLIException("Path is not writable: {}".format(f))
|
||||
PBinCLIError("Path is not writable: {}".format(f))
|
||||
|
||||
|
||||
def json_encode(s):
|
||||
return json.dumps(s, separators=(',',':')).encode()
|
||||
|
||||
|
||||
def validate_url(s):
|
||||
if not s.endswith('/'):
|
||||
s = s + "/"
|
||||
return s
|
||||
|
||||
Reference in New Issue
Block a user