扫描器需要实现的功能思维导图

Python实现SQL注入检测插件实例代码

爬虫编写思路

首先需要开发一个爬虫用于收集网站的链接,爬虫需要记录已经爬取的链接和待爬取的链接,并且去重,用 Python 的set()就可以解决,大概流程是:

  • 输入 URL
  • 下载解析出 URL
  • URL 去重,判断是否为本站
  • 加入到待爬列表
  • 重复循环

SQL 判断思路

  • 通过在 URL 后面加上AND %d=%d或者OR NOT (%d>%d)
  • %d后面的数字是随机可变的
  • 然后搜索网页中特殊关键词,比如:

MySQL 中是 SQL syntax.*MySQL
Microsoft SQL Server 是 Warning.*mssql_
Microsoft Access 是 Microsoft Access Driver
Oracle 是 Oracle error
IBM DB2 是 DB2 SQL error
SQLite 是 SQLite.Exception
...

通过这些关键词就可以判断出所用的数据库

  • 还需要判断一下 waf 之类的东西,有这种东西就直接停止。简单的方法就是用特定的 URL 访问,如果出现了像IP banned,fierwall之类的关键词,可以判断出是waf。具体的正则表达式是("color: #ff0000">请安装这些库

    pip install requests
    pip install beautifulsoup4

    实验环境是 Linux,创建一个Code目录,在其中创建一个work文件夹,将其作为工作目录

    目录结构

    /w8ay.py  // 项目启动主文件
    /lib/core // 核心文件存放目录
    /lib/core/config.py // 配置文件
    /script   // 插件存放
    /exp      // exp和poc存放

    步骤

    SQL 检测脚本编写

    DBMS_ERRORS = {
      'MySQL': (r"SQL syntax.*MySQL", r"Warning.*mysql_.*", r"valid MySQL result", r"MySqlClient\."),
      "PostgreSQL": (r"PostgreSQL.*ERROR", r"Warning.*\Wpg_.*", r"valid PostgreSQL result", r"Npgsql\."),
      "Microsoft SQL Server": (r"Driver.* SQL[\-\_\ ]*Server", r"OLE DB.* SQL Server", r"(\W|\A)SQL Server.*Driver", r"Warning.*mssql_.*", r"(\W|\A)SQL Server.*[0-9a-fA-F]{8}", r"(", r"("),
      "Microsoft Access": (r"Microsoft Access Driver", r"JET Database Engine", r"Access Database Engine"),
      "Oracle": (r"\bORA-[0-9][0-9][0-9][0-9]", r"Oracle error", r"Oracle.*Driver", r"Warning.*\Woci_.*", r"Warning.*\Wora_.*"),
      "IBM DB2": (r"CLI Driver.*DB2", r"DB2 SQL error", r"\bdb2_\w+\("),
      "SQLite": (r"SQLite/JDBCDriver", r"SQLite.Exception", r"System.Data.SQLite.SQLiteException", r"Warning.*sqlite_.*", r"Warning.*SQLite3::", r"\[SQLITE_ERROR\]"),
      "Sybase": (r"(", r"Sybase message", r"Sybase.*Server message.*"),
    }

    通过正则表达式就可以判断出是哪个数据库了

    for (dbms, regex) in ((dbms, regex) for dbms in DBMS_ERRORS for regex in DBMS_ERRORS[dbms]):
      if (re.search(regex,_content)):
        return True

    下面是我们测试语句的payload

    BOOLEAN_TESTS = (" AND %d=%d", " OR NOT (%d=%d)")

    用报错语句返回正确的内容和错误的内容进行对比

    for test_payload in BOOLEAN_TESTS:
      # Right Page
      RANDINT = random.randint(1, 255)
      _url = url + test_payload % (RANDINT, RANDINT)
      content["true"] = Downloader.get(_url)
      _url = url + test_payload % (RANDINT, RANDINT + 1)
      content["false"] = Downloader.get(_url)
      if content["origin"] == content["true"] != content["false"]:
        return "sql found: %" % url

    这句

    content["origin"] == content["true"] != content["false"]

    意思就是当原始网页等于正确的网页不等于错误的网页内容时,就可以判定这个地址存在注入漏洞

    完整代码:

    import re, random
    from lib.core import Download
    def sqlcheck(url):
      if (not url.find("")): # Pseudo-static page
        return false;
      Downloader = Download.Downloader()
      BOOLEAN_TESTS = (" AND %d=%d", " OR NOT (%d=%d)")
      DBMS_ERRORS = {
        # regular expressions used for DBMS recognition based on error message response
        "MySQL": (r"SQL syntax.*MySQL", r"Warning.*mysql_.*", r"valid MySQL result", r"MySqlClient\."),
        "PostgreSQL": (r"PostgreSQL.*ERROR", r"Warning.*\Wpg_.*", r"valid PostgreSQL result", r"Npgsql\."),
        "Microsoft SQL Server": (r"Driver.* SQL[\-\_\ ]*Server", r"OLE DB.* SQL Server", r"(\W|\A)SQL Server.*Driver", r"Warning.*mssql_.*", r"(\W|\A)SQL Server.*[0-9a-fA-F]{8}", r"(", r"("),
        "Microsoft Access": (r"Microsoft Access Driver", r"JET Database Engine", r"Access Database Engine"),
        "Oracle": (r"\bORA-[0-9][0-9][0-9][0-9]", r"Oracle error", r"Oracle.*Driver", r"Warning.*\Woci_.*", r"Warning.*\Wora_.*"),
        "IBM DB2": (r"CLI Driver.*DB2", r"DB2 SQL error", r"\bdb2_\w+\("),
        "SQLite": (r"SQLite/JDBCDriver", r"SQLite.Exception", r"System.Data.SQLite.SQLiteException", r"Warning.*sqlite_.*", r"Warning.*SQLite3::", r"\[SQLITE_ERROR\]"),
        "Sybase": (r"(", r"Sybase message", r"Sybase.*Server message.*"),
      }
      _url = url + "%29%28%22%27"
      _content = Downloader.get(_url)
      for (dbms, regex) in ((dbms, regex) for dbms in DBMS_ERRORS for regex in DBMS_ERRORS[dbms]):
        if (re.search(regex,_content)):
          return True
      content = {}
      content['origin'] = Downloader.get(_url)
      for test_payload in BOOLEAN_TESTS:
        # Right Page
        RANDINT = random.randint(1, 255)
        _url = url + test_payload % (RANDINT, RANDINT)
        content["true"] = Downloader.get(_url)
        _url = url + test_payload % (RANDINT, RANDINT + 1)
        content["false"] = Downloader.get(_url)
        if content["origin"] == content["true"] != content["false"]:
          return "sql found: %" % url

    将这个文件命名为sqlcheck.py,放在/script目录中。代码的第 4 行作用是查找 URL 是否包含"color: #ff0000">爬虫的编写

    爬虫的思路上面讲过了,先完成 URL 的管理,我们单独将它作为一个类,文件保存在/lib/core/UrlManager.py

    #-*- coding:utf-8 -*-
    
    class UrlManager(object):
      def __init__(self):
        self.new_urls = set()
        self.old_urls = set()
        
      def add_new_url(self, url):
        if url is None:
          return
        if url not in self.new_urls and url not in self.old_urls:
          self.new_urls.add(url)
       
      def add_new_urls(self, urls):
        if urls is None or len(urls) == 0:
          return
        for url in urls:
          self.add_new_url(url)
        
      def has_new_url(self):
        return len(self.new_urls) != 0
       
      def get_new_url(self):
        new_url = self.new_urls.pop()
        self.old_urls.add(new_url)
        return new_url

    为了方便,我们也将下载功能单独作为一个类使用,文件保存在lib/core/Downloader.py

    #-*- coding:utf-8 -*-
    import requests
    
    class Downloader(object):
      def get(self, url):
        r = requests.get(url, timeout = 10)
        if r.status_code != 200:
          return None
        _str = r.text
        return _str
      
      def post(self, url, data):
        r = requests.post(url, data)
        _str = r.text
        return _str
      
      def download(self, url, htmls):
        if url is None:
          return None
        _str = {}
        _str["url"] = url
        try:
          r = requests.get(url, timeout = 10)
          if r.status_code != 200:
            return None
          _str["html"] = r.text
        except Exception as e:
          return None
        htmls.append(_str)

    特别说明,因为我们要写的爬虫是多线程的,所以类中有个download方法是专门为多线程下载专用的

    在lib/core/Spider.py中编写爬虫

    #-*- coding:utf-8 -*-
    
    from lib.core import Downloader, UrlManager
    import threading
    from urllib import parse
    from urllib.parse import urljoin
    from bs4 import BeautifulSoup
    
    class SpiderMain(object):
      def __init__(self, root, threadNum):
        self.urls = UrlManager.UrlManager()
        self.download = Downloader.Downloader()
        self.root = root
        self.threadNum = threadNum
      
      def _judge(self, domain, url):
        if (url.find(domain) != -1):
          return True
        return False
      
      def _parse(self, page_url, content):
        if content is None:
          return
        soup = BeautifulSoup(content, 'html.parser')
        _news = self._get_new_urls(page_url, soup)
        return _news
        
      def _get_new_urls(self, page_url, soup):
        new_urls = set()
        links = soup.find_all('a')
        for link in links:
          new_url = link.get('href')
          new_full_url = urljoin(page_url, new_url)
          if (self._judge(self.root, new_full_url)):
            new_urls.add(new_full_url)
        return new_urls
        
      def craw(self):
        self.urls.add_new_url(self.root)
        while self.urls.has_new_url():
          _content = []
          th = []
          for i in list(range(self.threadNum)):
            if self.urls.has_new_url() is False:
              break
            new_url = self.urls.get_new_url()
            
            ## sql check
            try:
              if (sqlcheck.sqlcheck(new_url)):
                print("url:%s sqlcheck is valueable" % new_url)
            except:
              pass
                
            print("craw:" + new_url)
            t = threading.Thread(target = self.download.download, args = (new_url, _content))
            t.start()
            th.append(t)
          for t in th:
            t.join()
          for _str in _content:
            if _str is None:
              continue
            new_urls = self._parse(new_url, _str["html"])
            self.urls.add_new_urls(new_urls)

    爬虫通过调用craw()方法传入一个网址进行爬行,然后采用多线程的方法下载待爬行的网站,下载之后的源码用_parse方法调用BeautifulSoup进行解析,之后将解析出的 URL 列表丢入 URL 管理器,这样循环,最后只要爬完了网页,爬虫就会停止

    threading库可以自定义需要开启的线程数,线程开启后,每个线程会得到一个 url 进行下载,然后线程会阻塞,阻塞完毕后线程放行

    爬虫和 SQL 检查的结合

    在lib/core/Spider.py文件引用一下from script import sqlcheck,在craw()方法中,取出新的 URL 地方调用一下

    ##sql check
    try:
      if(sqlcheck.sqlcheck(new_url)):
        print("url:%s sqlcheck is valueable"%new_url)
    except:
      pass

    用try检测可能出现的异常,绕过它,在文件w8ay.py中进行测试

    #-*- coding:utf-8 -*-
    '''
    Name: w8ayScan
    Author: mathor
    Copyright (c) 2019
    '''
    import sys
    from lib.core.Spider import SpiderMain
    def main():
      root = "https://wmathor.com"
      threadNum = 50
      w8 = SpiderMain(root, threadNum)
      w8.craw()
     
    if __name__ == "__main__":
      main()

    很重要的一点!为了使得lib和script文件夹中的.py文件可以可以被认作是模块,请在lib、lib/core和script文件夹中创建__init__.py文件,文件中什么都不需要写

    总结

    SQL 注入检测通过一些payload使页面出错,判断原始网页,正确网页,错误网页即可检测出是否存在 SQL 注入漏洞
    通过匹配出 sql 报错出来的信息,可以正则判断所用的数据库

    好了,以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,如果有疑问大家可以留言交流,谢谢大家对的支持。

广告合作:本站广告合作请联系QQ:858582 申请时备注:广告合作(否则不回)
免责声明:本站资源来自互联网收集,仅供用于学习和交流,请遵循相关法律法规,本站一切资源不代表本站立场,如有侵权、后门、不妥请联系本站删除!

稳了!魔兽国服回归的3条重磅消息!官宣时间再确认!

昨天有一位朋友在大神群里分享,自己亚服账号被封号之后居然弹出了国服的封号信息对话框。

这里面让他访问的是一个国服的战网网址,com.cn和后面的zh都非常明白地表明这就是国服战网。

而他在复制这个网址并且进行登录之后,确实是网易的网址,也就是我们熟悉的停服之后国服发布的暴雪游戏产品运营到期开放退款的说明。这是一件比较奇怪的事情,因为以前都没有出现这样的情况,现在突然提示跳转到国服战网的网址,是不是说明了简体中文客户端已经开始进行更新了呢?