注意
This legacy documentation does not necessarily reflect the current practices in MongoDB driver implementation, but may be useful for historical purposes. See the MongoDB Meta Driver for the current documentation of MongoDB driver implementation.
Note that with the database write operations can be sent asynchronously or synchronously, the latter indicating a getLastError request after the write.
When asynchronous, be careful to continue using the same connection (socket). This ensures that the next operation will not begin until after the write completes.
An individual socket connection to the database has associated authentication state. Thus, if you pool connections, you probably want a separate pool for each authentication case (database + username).
The following pseudo-code illustrates a recommended approach to implementing connection pooling in a driver’s connection class. This handles authentication, grouping operations from a single “request” onto the same socket, and a couple of other gotchas:
class Connection:
init(pool_size, addresses, auto_start_requests):
this.pool_size = pool_size
this.addresses = addresses
this.auto_start_requests = auto_start_requests
this.thread_map = {}
this.locks = Lock[pool_size]
this.sockets = Socket[pool_size]
this.socket_auth = String[pool_size][]
this.auth = {}
this.find_master()
find_master():
for address in this.addresses:
if address.is_master():
this.master = address
pick_and_acquire_socket():
choices = random permutation of [0, ..., this.pool_size - 1]
choices.sort(order: ascending,
value: size of preimage of choice under this.thread_map)
for choice in choices:
if this.locks[choice].non_blocking_acquire():
return choice
sock = choices[0]
this.locks[sock].blocking_acquire()
return sock
get_socket():
if this.thread_map[current_thread] >= 0:
sock_number = this.thread_map[current_thread]
this.locks[sock_number].blocking_acquire()
else:
sock_number = this.pick_and_lock_socket()
if this.auto_start_requests or current_thread in this.thread_map:
this.thread_map[current_thread] = sock_number
if not this.sockets[sock_number]:
this.sockets[sock_number] = Socket(this.master)
return sock_number
send_message_without_response(message):
sock_number = this.get_socket()
this.check_auth()
this.sockets[sock_number].send(message)
this.locks[sock_number].release()
send_message_with_response(message):
sock_number = this.get_socket()
this.check_auth()
this.sockets[sock_number].send(message)
result = this.sockets[sock_number].receive()
this.locks[sock_number].release()
return result
# start_request is only needed if auto_start_requests is False
start_request():
this.thread_map[current_thread] = -1
end_request():
delete this.thread_map[current_thread]
authenticate(database, username, password):
# TODO should probably make sure that these credentials are valid,
# otherwise errors are going to be delayed until first op.
this.auth[database] = (username, password)
logout(database):
delete this.auth[database]
check_auth(sock_number):
for db in this.socket_auth[sock_number]:
if db not in this.auth.keys():
this.sockets[sock_number].send(logout_message)
this.socket_auth[sock_number].remove(db)
for db in this.auth.keys():
if db not in this.socket_auth[sock_number]:
this.sockets[sock_number].send(authenticate_message)
this.socket_auth[sock_number].append(db)
# somewhere we need to do error checking - if you get not master then everything
# in this.sockets gets closed and set to null and we call find_master() again.
# we also need to reset the socket_auth information - nothing is authorized yet
# on the new master.
另见