def get_company_by_company_full_name_tfidf(company_full_name: str, limit: int, threshold: float) -> list[dict]:
"""
根据公司全称的tfidf获取TOP部分名称相似的公司数据
:param company_full_name:
:param limit:
:param threshold:
:return:
"""
import time
if not company_full_name:
raise DefaultException(sc.E_PARAM, "company_full_name is empty")
# 接口仅允许前300个结果返回
if limit > 300:
raise DefaultException(sc.E_PARAM, "limit is too large")
res = db.session.execute(
sql_alchemy_utils.gen_query_statement(
Company, None, None, ['company_code', 'company_full_name'])).all()
res = dict(filter(lambda x: res[0] and res[1], res))
if not res:
return []
# tfidf
names = list(res.values()) + [company_full_name]
tfidf_matrix = TfidfVectorizer(tokenizer=jieba.lcut, token_pattern=None).fit_transform(names)
cs_values = list(cosine_similarity(tfidf_matrix[:-1], tfidf_matrix[-1:]).flatten())
data = [{"company_code": k, "company_full_name": v, "cosine_similarity": cs_values[i]} for i, (k, v) in
enumerate(res.items())]
if threshold is not None:
data = filter(lambda x: x['cosine_similarity'] >= threshold, data)
data = sorted(data, key=lambda x: x['cosine_similarity'], reverse=True)
return data[:limit]
def get_company_by_company_full_name_tfidf(company_full_name: str, limit: int, threshold: float) -> list[dict]:
"""
根据公司全称的tfidf获取TOP部分名称相似的公司数据
:param company_full_name:
:param limit:
:param threshold:
:return:
"""
if not company_full_name:
raise DefaultException(sc.E_PARAM, "company_full_name is empty")
# 接口仅允许前300个结果返回
if limit > 300:
raise DefaultException(sc.E_PARAM, "limit is too large")
tfidf_vectorizer_fn = '/tmp/tfidf_vectorizer4company_name.dill'
tfidf_matrix_fn = '/tmp/tfidf_matrix.dill'
d_company_code2name_fn = '/tmp/d_company_code.dill'
if os.path.exists(tfidf_vectorizer_fn) and os.path.exists(tfidf_matrix_fn) and os.path.exists(
d_company_code2name_fn):
vectorizer: TfidfVectorizer = dill.load(open(tfidf_vectorizer_fn, mode='rb'))
tfidf_matrix = dill.load(open(tfidf_matrix_fn, mode='rb'))
d_company_code2name = dill.load(open(d_company_code2name_fn, mode='rb'))
else:
if not tfidf_lock.acquire(False):
raise DefaultException(sc.E_PARAM, "数据处理中")
try:
d_company_code2name = db.session.execute(
sql_alchemy_utils.gen_query_statement(
Company, None, None, ['company_code', 'company_full_name'])).all()
d_company_code2name = dict(
filter(lambda x: d_company_code2name[0] and d_company_code2name[1], d_company_code2name))
if not d_company_code2name:
return []
names = list(d_company_code2name.values())
# tfidf
vectorizer = TfidfVectorizer(tokenizer=jieba.cut, token_pattern=None)
tfidf_matrix = vectorizer.fit_transform(names)
dill.dump(vectorizer, open(tfidf_vectorizer_fn, mode='wb'))
dill.dump(tfidf_matrix, open(tfidf_matrix_fn, mode='wb'))
dill.dump(d_company_code2name, open(d_company_code2name_fn, mode='wb'))
finally:
tfidf_lock.release()
tfidf_matrix_input = vectorizer.transform([company_full_name])
cs_values = list(cosine_similarity(tfidf_matrix, tfidf_matrix_input).flatten())
data = [{"company_code": k, "company_full_name": v, "cosine_similarity": cs_values[i]} for i, (k, v) in
enumerate(d_company_code2name.items())]
if threshold is not None:
data = filter(lambda x: x['cosine_similarity'] >= threshold, data)
data = sorted(data, key=lambda x: x['cosine_similarity'], reverse=True)
return data[:limit]
import jieba
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
names = ["我喜欢看电影", "我喜欢看电影和听音乐"] * 200000 + ["看电影和听"]
tfidf_matrix = TfidfVectorizer(tokenizer=jieba.cut).fit_transform(names)
print(list(cosine_similarity(tfidf_matrix[:-1], tfidf_matrix[-1:]).flatten()))