Compare commits

...

4 Commits

Author SHA1 Message Date
qiangyanwen 1225366bed add test 2022-12-26 18:02:24 +08:00
qiangyanwen 6cd92aebe4 add test 2022-12-26 16:47:34 +08:00
qiangyanwen 378bf3632b add test 2022-12-26 16:39:13 +08:00
qiangyanwen e24619aa81 add test 2022-12-26 15:26:06 +08:00
4 changed files with 116 additions and 3 deletions

View File

@ -5,11 +5,9 @@
# @File :main.py.py
import uvicorn
from app import create_app
from config import Debug
from middleware.http_middleware import get_body
from utils.system import host
from config import settings
from fastapi.requests import Request
from fastapi import status

59
poetry.lock generated
View File

@ -77,6 +77,25 @@ category = "main"
optional = false
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*"
[[package]]
name = "cryptography"
version = "38.0.4"
description = "cryptography is a package which provides cryptographic recipes and primitives to Python developers."
category = "main"
optional = false
python-versions = ">=3.6"
[package.dependencies]
cffi = ">=1.12"
[package.extras]
docs = ["sphinx (>=1.6.5,!=1.8.0,!=3.1.0,!=3.1.1)", "sphinx-rtd-theme"]
docstest = ["pyenchant (>=1.6.11)", "twine (>=1.12.0)", "sphinxcontrib-spelling (>=4.0.1)"]
pep8test = ["black", "flake8", "flake8-import-order", "pep8-naming"]
sdist = ["setuptools-rust (>=0.11.4)"]
ssh = ["bcrypt (>=3.1.5)"]
test = ["pytest (>=6.2.0)", "pytest-benchmark", "pytest-cov", "pytest-subtests", "pytest-xdist", "pretend", "iso8601", "pytz", "hypothesis (>=1.11.4,!=3.79.2)"]
[[package]]
name = "dnspython"
version = "2.2.1"
@ -246,6 +265,26 @@ category = "main"
optional = false
python-versions = ">=3.7"
[[package]]
name = "paramiko"
version = "2.12.0"
description = "SSH2 protocol library"
category = "main"
optional = false
python-versions = "*"
[package.dependencies]
bcrypt = ">=3.1.3"
cryptography = ">=2.5"
pynacl = ">=1.0.1"
six = "*"
[package.extras]
all = ["pyasn1 (>=0.1.7)", "pynacl (>=1.0.1)", "bcrypt (>=3.1.3)", "invoke (>=1.3)", "gssapi (>=1.4.1)", "pywin32 (>=2.1.8)"]
ed25519 = ["pynacl (>=1.0.1)", "bcrypt (>=3.1.3)"]
gssapi = ["pyasn1 (>=0.1.7)", "gssapi (>=1.4.1)", "pywin32 (>=2.1.8)"]
invoke = ["invoke (>=1.3)"]
[[package]]
name = "passlib"
version = "1.7.4"
@ -314,6 +353,21 @@ python-versions = ">=3.6"
ed25519 = ["PyNaCl (>=1.4.0)"]
rsa = ["cryptography"]
[[package]]
name = "pynacl"
version = "1.5.0"
description = "Python binding to the Networking and Cryptography (NaCl) library"
category = "main"
optional = false
python-versions = ">=3.6"
[package.dependencies]
cffi = ">=1.4.1"
[package.extras]
docs = ["sphinx (>=1.6.5)", "sphinx-rtd-theme"]
tests = ["pytest (>=3.2.1,!=3.3.0)", "hypothesis (>=3.27.0)"]
[[package]]
name = "python-dotenv"
version = "0.21.0"
@ -524,7 +578,7 @@ testing = ["pytest (>=6)", "pytest-checkdocs (>=2.4)", "pytest-flake8", "pytest-
[metadata]
lock-version = "1.1"
python-versions = "^3.8"
content-hash = "8848d505f090a9da65a0a5a7b924dd04f043525a4423e15e55cf14f99cceaacc"
content-hash = "c44d226c5c05e73c9a3b2ab07465eafe3874e820049cfa63ea29bc4380a9ff5c"
[metadata.files]
alembic = [
@ -608,6 +662,7 @@ colorama = [
{file = "colorama-0.4.4-py2.py3-none-any.whl", hash = "sha256:9f47eda37229f68eee03b24b9748937c7dc3868f906e8ba69fbcbdd3bc5dc3e2"},
{file = "colorama-0.4.4.tar.gz", hash = "sha256:5941b2b48a20143d2267e95b1c2a7603ce057ee39fd88e7329b0c292aa16869b"},
]
cryptography = []
dnspython = []
ecdsa = []
email-validator = []
@ -698,6 +753,7 @@ mako = [
{file = "Mako-1.2.0.tar.gz", hash = "sha256:9a7c7e922b87db3686210cf49d5d767033a41d4010b284e747682c92bddd8b39"},
]
markupsafe = []
paramiko = []
passlib = [
{file = "passlib-1.7.4-py2.py3-none-any.whl", hash = "sha256:aa6bca462b8d8bda89c70b382f0c298a20b5560af6cbfa2dce410c0a2fb669f1"},
{file = "passlib-1.7.4.tar.gz", hash = "sha256:defd50f72b65c5402ab2c573830a6978e5f202ad0d984793c8dde2c4152ebe04"},
@ -760,6 +816,7 @@ pymysql = [
{file = "PyMySQL-1.0.2-py3-none-any.whl", hash = "sha256:41fc3a0c5013d5f039639442321185532e3e2c8924687abe6537de157d403641"},
{file = "PyMySQL-1.0.2.tar.gz", hash = "sha256:816927a350f38d56072aeca5dfb10221fe1dc653745853d30a216637f5d7ad36"},
]
pynacl = []
python-dotenv = []
python-jose = [
{file = "python-jose-3.3.0.tar.gz", hash = "sha256:55779b5e6ad599c6336191246e95eb2293a9ddebd555f796a65f838f07e5d78a"},

View File

@ -18,6 +18,7 @@ passlib = "^1.7.4"
email_validator = "^1.3.0"
pymysql = "^1.0.2"
psutil = "^5.9.4"
paramiko = "^2.12.0"
[tool.poetry.dev-dependencies]

57
utils/ssh_client.py Normal file
View File

@ -0,0 +1,57 @@
# _*_ coding: utf-8 _*_
# @Time :2022/12/26 16:33
# @Email :508737091@qq.com
# @Author :qiangyanwen
# @File :ssh_client.py
import paramiko
class SSHConnection(object):
def __init__(self, host, port, username, password):
self._host = host
self._port = port
self._username = username
self._password = password
self._transport = None
self._sftp = None
self._client = None
def __enter__(self):
transport = paramiko.Transport((self._host, self._port))
transport.connect(username=self._username, password=self._password)
self._transport = transport
return self
def download(self, remote_path, local_path):
if self._sftp is None:
self._sftp = paramiko.SFTPClient.from_transport(self._transport)
self._sftp.get(remote_path, local_path)
def put(self, local_path, remote_path):
if self._sftp is None:
self._sftp = paramiko.SFTPClient.from_transport(self._transport)
self._sftp.put(local_path, remote_path)
def exec_command(self, command):
if self._client is None:
self._client = paramiko.SSHClient()
self._client._transport = self._transport
stdin, stdout, stderr = self._client.exec_command(command)
out_data = stdout.read().decode('utf-8').strip()
code = stdout.channel.recv_exit_status()
if len(out_data) > 0:
return out_data
err_data = stderr.read().decode('utf-8').strip()
if len(err_data) > 0:
return err_data
return code
def __exit__(self, exc_type, exc_val, exc_tb):
if self._transport:
self._transport.close()
if self._client:
self._client.close()
with SSHConnection("47.96.135.132", 22, "root", "Qyw1994@520") as ssh:
ls = ssh.exec_command("ls -l")