Browse Source

sql: test read-write permissions for given path and raise early

maybe fix #6485
patch-4
SomberNight 4 years ago
parent
commit
36178df875
No known key found for this signature in database GPG Key ID: B33B5F232C6271E9
  1. 3
      electrum/sql_db.py
  2. 30
      electrum/storage.py
  3. 22
      electrum/util.py

3
electrum/sql_db.py

@ -6,6 +6,7 @@ import asyncio
import sqlite3
from .logging import Logger
from .util import test_read_write_permissions
def sql(func):
@ -17,12 +18,14 @@ def sql(func):
return f
return wrapper
class SqlDB(Logger):
def __init__(self, asyncio_loop, path, commit_interval=None):
Logger.__init__(self)
self.asyncio_loop = asyncio_loop
self.path = path
test_read_write_permissions(path)
self.commit_interval = commit_interval
self.db_requests = queue.Queue()
self.sql_thread = threading.Thread(target=self.run_sql)

30
electrum/storage.py

@ -31,7 +31,8 @@ import zlib
from enum import IntEnum
from . import ecc
from .util import profiler, InvalidPassword, WalletFileException, bfh, standardize_path
from .util import (profiler, InvalidPassword, WalletFileException, bfh, standardize_path,
test_read_write_permissions)
from .wallet_db import WalletDB
from .logging import Logger
@ -62,7 +63,10 @@ class WalletStorage(Logger):
self.logger.info(f"wallet path {self.path}")
self.pubkey = None
self.decrypted = ''
self._test_read_write_permissions(self.path)
try:
test_read_write_permissions(self.path)
except IOError as e:
raise StorageReadWriteError(e) from e
if self.file_exists():
with open(self.path, "r", encoding='utf-8') as f:
self.raw = f.read()
@ -74,28 +78,6 @@ class WalletStorage(Logger):
def read(self):
return self.decrypted if self.is_encrypted() else self.raw
@classmethod
def _test_read_write_permissions(cls, path):
# note: There might already be a file at 'path'.
# Make sure we do NOT overwrite/corrupt that!
temp_path = "%s.tmptest.%s" % (path, os.getpid())
echo = "fs r/w test"
try:
# test READ permissions for actual path
if os.path.exists(path):
with open(path, "r", encoding='utf-8') as f:
f.read(1) # read 1 byte
# test R/W sanity for "similar" path
with open(temp_path, "w", encoding='utf-8') as f:
f.write(echo)
with open(temp_path, "r", encoding='utf-8') as f:
echo2 = f.read()
os.remove(temp_path)
except Exception as e:
raise StorageReadWriteError(e) from e
if echo != echo2:
raise StorageReadWriteError('echo sanity-check failed')
@profiler
def write(self, data):
s = self.encrypt_before_writing(data)

22
electrum/util.py

@ -1465,3 +1465,25 @@ def random_shuffled_copy(x: Iterable[T]) -> List[T]:
x_copy = list(x) # copy
random.shuffle(x_copy) # shuffle in-place
return x_copy
def test_read_write_permissions(path) -> None:
# note: There might already be a file at 'path'.
# Make sure we do NOT overwrite/corrupt that!
temp_path = "%s.tmptest.%s" % (path, os.getpid())
echo = "fs r/w test"
try:
# test READ permissions for actual path
if os.path.exists(path):
with open(path, "rb") as f:
f.read(1) # read 1 byte
# test R/W sanity for "similar" path
with open(temp_path, "w", encoding='utf-8') as f:
f.write(echo)
with open(temp_path, "r", encoding='utf-8') as f:
echo2 = f.read()
os.remove(temp_path)
except Exception as e:
raise IOError(e) from e
if echo != echo2:
raise IOError('echo sanity-check failed')

Loading…
Cancel
Save