tfidf

予早 2026-04-30 23:53:31
Categories: Tags:
def get_company_by_company_full_name_tfidf(company_full_name: str, limit: int, threshold: float) -> list[dict]:
    """
    根据公司全称的tfidf获取TOP部分名称相似的公司数据
    :param company_full_name:
    :param limit:
    :param threshold:
    :return:
    """
    import time
    if not company_full_name:
        raise DefaultException(sc.E_PARAM, "company_full_name is empty")
    # 接口仅允许前300个结果返回
    if limit > 300:
        raise DefaultException(sc.E_PARAM, "limit is too large")
    res = db.session.execute(
        sql_alchemy_utils.gen_query_statement(
            Company, None, None, ['company_code', 'company_full_name'])).all()
    res = dict(filter(lambda x: res[0] and res[1], res))
    if not res:
        return []

    # tfidf
    names = list(res.values()) + [company_full_name]
    tfidf_matrix = TfidfVectorizer(tokenizer=jieba.lcut, token_pattern=None).fit_transform(names)
    cs_values = list(cosine_similarity(tfidf_matrix[:-1], tfidf_matrix[-1:]).flatten())
    data = [{"company_code": k, "company_full_name": v, "cosine_similarity": cs_values[i]} for i, (k, v) in
            enumerate(res.items())]

    if threshold is not None:
        data = filter(lambda x: x['cosine_similarity'] >= threshold, data)
    data = sorted(data, key=lambda x: x['cosine_similarity'], reverse=True)
    return data[:limit]
def get_company_by_company_full_name_tfidf(company_full_name: str, limit: int, threshold: float) -> list[dict]:
    """
    根据公司全称的tfidf获取TOP部分名称相似的公司数据
    :param company_full_name:
    :param limit:
    :param threshold:
    :return:
    """
    if not company_full_name:
        raise DefaultException(sc.E_PARAM, "company_full_name is empty")
    # 接口仅允许前300个结果返回
    if limit > 300:
        raise DefaultException(sc.E_PARAM, "limit is too large")

    tfidf_vectorizer_fn = '/tmp/tfidf_vectorizer4company_name.dill'
    tfidf_matrix_fn = '/tmp/tfidf_matrix.dill'
    d_company_code2name_fn = '/tmp/d_company_code.dill'

    if os.path.exists(tfidf_vectorizer_fn) and os.path.exists(tfidf_matrix_fn) and os.path.exists(
            d_company_code2name_fn):
        vectorizer: TfidfVectorizer = dill.load(open(tfidf_vectorizer_fn, mode='rb'))
        tfidf_matrix = dill.load(open(tfidf_matrix_fn, mode='rb'))
        d_company_code2name = dill.load(open(d_company_code2name_fn, mode='rb'))
    else:
        if not tfidf_lock.acquire(False):
            raise DefaultException(sc.E_PARAM, "数据处理中")
        try:
            d_company_code2name = db.session.execute(
                sql_alchemy_utils.gen_query_statement(
                    Company, None, None, ['company_code', 'company_full_name'])).all()
            d_company_code2name = dict(
                filter(lambda x: d_company_code2name[0] and d_company_code2name[1], d_company_code2name))
            if not d_company_code2name:
                return []
            names = list(d_company_code2name.values())
            # tfidf
            vectorizer = TfidfVectorizer(tokenizer=jieba.cut, token_pattern=None)
            tfidf_matrix = vectorizer.fit_transform(names)
            dill.dump(vectorizer, open(tfidf_vectorizer_fn, mode='wb'))
            dill.dump(tfidf_matrix, open(tfidf_matrix_fn, mode='wb'))
            dill.dump(d_company_code2name, open(d_company_code2name_fn, mode='wb'))
        finally:
            tfidf_lock.release()

    tfidf_matrix_input = vectorizer.transform([company_full_name])
    cs_values = list(cosine_similarity(tfidf_matrix, tfidf_matrix_input).flatten())
    data = [{"company_code": k, "company_full_name": v, "cosine_similarity": cs_values[i]} for i, (k, v) in
            enumerate(d_company_code2name.items())]

    if threshold is not None:
        data = filter(lambda x: x['cosine_similarity'] >= threshold, data)
    data = sorted(data, key=lambda x: x['cosine_similarity'], reverse=True)
    return data[:limit]
import jieba
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity

names = ["我喜欢看电影", "我喜欢看电影和听音乐"] * 200000 + ["看电影和听"]
tfidf_matrix = TfidfVectorizer(tokenizer=jieba.cut).fit_transform(names)

print(list(cosine_similarity(tfidf_matrix[:-1], tfidf_matrix[-1:]).flatten()))